Spaces:
Runtime error
Runtime error
| import os.path | |
| import re | |
| from typing import Optional, Iterator, List, Tuple, Union, Literal | |
| from hbutils.system import urlsplit | |
| from requests.auth import HTTPBasicAuth | |
| from .web import NoURL, WebDataSource | |
| from ..config.meta import __TITLE__, __VERSION__ | |
| from ..utils import get_requests_session, srequest | |
| _DanbooruSiteTyping = Literal['konachan', 'yandere', 'danbooru', 'safebooru', 'lolibooru'] | |
| class DanbooruLikeSource(WebDataSource): | |
| def __init__(self, tags: List[str], min_size: Optional[int] = 800, download_silent: bool = True, | |
| username: Optional[str] = None, api_key: Optional[str] = None, | |
| site_name: Optional[str] = 'danbooru', site_url: Optional[str] = 'https://danbooru.donmai.us/', | |
| group_name: Optional[str] = None): | |
| WebDataSource.__init__(self, group_name or site_name, None, download_silent) | |
| self.session = get_requests_session(headers={ | |
| "User-Agent": f"{__TITLE__}/{__VERSION__}", | |
| 'Content-Type': 'application/json; charset=utf-8', | |
| }) | |
| self.auth = HTTPBasicAuth(username, api_key) if username and api_key else None | |
| self.site_name, self.site_url = site_name, site_url | |
| self.tags = tags | |
| self.min_size = min_size | |
| def _get_data_from_raw(self, raw): | |
| return raw | |
| def _select_url(self, data): | |
| if self.min_size is not None and "media_asset" in data and "variants" in data["media_asset"]: | |
| variants = data["media_asset"]["variants"] | |
| width, height, url = None, None, None | |
| for item in variants: | |
| if 'width' in item and 'height' in item and \ | |
| item['width'] >= self.min_size and item['height'] >= self.min_size: | |
| if url is None or item['width'] < width: | |
| width, height, url = item['width'], item['height'], item['url'] | |
| if url is not None: | |
| return url | |
| if 'file_url' not in data: | |
| raise NoURL | |
| return data['file_url'] | |
| def _get_tags(self, data): | |
| return re.split(r'\s+', data["tag_string"]) | |
| def _iter_data(self) -> Iterator[Tuple[Union[str, int], str, dict]]: | |
| page = 1 | |
| while True: | |
| resp = srequest(self.session, 'GET', f'{self.site_url}/posts.json', params={ | |
| "format": "json", | |
| "limit": "100", | |
| "page": str(page), | |
| "tags": ' '.join(self.tags), | |
| }, auth=self.auth) | |
| resp.raise_for_status() | |
| page_items = self._get_data_from_raw(resp.json()) | |
| if not page_items: | |
| break | |
| for data in page_items: | |
| try: | |
| url = self._select_url(data) | |
| except NoURL: | |
| continue | |
| _, ext_name = os.path.splitext(urlsplit(url).filename) | |
| filename = f'{self.group_name}_{data["id"]}{ext_name}' | |
| meta = { | |
| self.site_name: data, | |
| 'group_id': f'{self.group_name}_{data["id"]}', | |
| 'filename': filename, | |
| 'tags': {key: 1.0 for key in self._get_tags(data)} | |
| } | |
| yield data['id'], url, meta | |
| page += 1 | |
| class DanbooruSource(DanbooruLikeSource): | |
| def __init__(self, tags: List[str], | |
| min_size: Optional[int] = 800, download_silent: bool = True, | |
| username: Optional[str] = None, api_key: Optional[str] = None, | |
| group_name: Optional[str] = None): | |
| DanbooruLikeSource.__init__(self, tags, min_size, download_silent, username, api_key, | |
| 'danbooru', 'https://danbooru.donmai.us/', group_name) | |
| class SafebooruSource(DanbooruLikeSource): | |
| def __init__(self, tags: List[str], | |
| min_size: Optional[int] = 800, download_silent: bool = True, | |
| username: Optional[str] = None, api_key: Optional[str] = None, | |
| group_name: Optional[str] = None): | |
| DanbooruLikeSource.__init__(self, tags, min_size, download_silent, username, api_key, | |
| 'safebooru', 'https://safebooru.donmai.us', group_name) | |
| class ATFBooruSource(DanbooruLikeSource): | |
| def __init__(self, tags: List[str], | |
| min_size: Optional[int] = 800, download_silent: bool = True, | |
| username: Optional[str] = None, api_key: Optional[str] = None, | |
| group_name: Optional[str] = None): | |
| DanbooruLikeSource.__init__(self, tags, min_size, download_silent, username, api_key, | |
| 'danbooru', 'https://booru.allthefallen.moe', group_name) | |
| class E621LikeSource(DanbooruLikeSource): | |
| def __init__(self, tags: List[str], | |
| min_size: Optional[int] = 800, download_silent: bool = True, | |
| username: Optional[str] = None, api_key: Optional[str] = None, | |
| site_name: Optional[str] = 'e621', site_url: Optional[str] = 'https://e621.net/', | |
| group_name: Optional[str] = None): | |
| DanbooruLikeSource.__init__(self, tags, min_size, download_silent, username, api_key, | |
| site_name, site_url, group_name or site_name) | |
| def _get_data_from_raw(self, raw): | |
| return raw['posts'] | |
| def _select_url(self, data): | |
| urls = [] | |
| urls.append((data['file']['url'], data['file']['width'], data['file']['height'])) | |
| urls.append((data['preview']['url'], data['preview']['width'], data['preview']['height'])) | |
| if 'sample' in data and data['sample']['has']: | |
| urls.append((data['sample']['url'], data['sample']['width'], data['sample']['height'])) | |
| if self.min_size is not None: | |
| f_url, f_width, f_height = None, None, None | |
| for url, width, height in urls: | |
| if width >= self.min_size and height >= self.min_size: | |
| if f_url is None or width < f_width: | |
| f_url, f_width, f_height = url, width, height | |
| if f_url is not None: | |
| return f_url | |
| return urls[0][0] | |
| def _get_tags(self, data): | |
| tags = [] | |
| for value in data['tags'].values(): | |
| tags.extend(value) | |
| return tags | |
| class E621Source(E621LikeSource): | |
| def __init__(self, tags: List[str], | |
| min_size: Optional[int] = 800, download_silent: bool = True, | |
| username: Optional[str] = None, api_key: Optional[str] = None, | |
| group_name: Optional[str] = 'e621'): | |
| E621LikeSource.__init__(self, tags, min_size, download_silent, username, api_key, | |
| 'e621', 'https://e621.net/', group_name) | |
| class E926Source(E621LikeSource): | |
| def __init__(self, tags: List[str], | |
| min_size: Optional[int] = 800, download_silent: bool = True, | |
| username: Optional[str] = None, api_key: Optional[str] = None, | |
| group_name: Optional[str] = 'e926'): | |
| E621LikeSource.__init__(self, tags, min_size, download_silent, username, api_key, | |
| 'e926', 'https://e926.net/', group_name) | |