Spaces:
Runtime error
Runtime error
| import datetime | |
| import glob | |
| import json | |
| import os.path | |
| import zipfile | |
| from typing import Union, Tuple, List, Optional | |
| import pandas as pd | |
| from ditk import logging | |
| from gchar.games import get_character | |
| from gchar.games.base import Character | |
| from hbutils.string import plural_word | |
| from hbutils.system import TemporaryDirectory | |
| from huggingface_hub import CommitOperationAdd, hf_hub_url | |
| from waifuc.action import NoMonochromeAction, FilterSimilarAction, \ | |
| TaggingAction, PersonSplitAction, FaceCountAction, CCIPAction, ModeConvertAction, ClassFilterAction, \ | |
| FileOrderAction, RatingFilterAction, BaseAction, RandomFilenameAction, PaddingAlignAction, ThreeStageSplitAction, \ | |
| AlignMinSizeAction, MinSizeFilterAction, FilterAction | |
| from waifuc.action.filter import MinAreaFilterAction | |
| from waifuc.export import SaveExporter, TextualInversionExporter | |
| from waifuc.model import ImageItem | |
| from waifuc.source import GcharAutoSource, BaseDataSource, LocalSource | |
| from waifuc.utils import task_ctx | |
| from ..utils import number_to_tag, get_ch_name, get_alphabet_name, get_hf_client, download_file, get_hf_fs | |
| def get_source(source) -> BaseDataSource: | |
| if isinstance(source, (str, Character)): | |
| source = GcharAutoSource(source, main_sources_count=5) | |
| elif isinstance(source, BaseDataSource): | |
| pass | |
| else: | |
| raise TypeError(f'Unknown source type - {source!r}.') | |
| return source | |
| def get_main_source(source, no_r18: bool = False, bg_color: str = 'white', | |
| no_monochrome_check: bool = False, | |
| drop_multi: bool = True, skip: bool = False) -> BaseDataSource: | |
| source: BaseDataSource = get_source(source) | |
| if not skip: | |
| actions = [ModeConvertAction('RGB', bg_color)] | |
| if not no_monochrome_check: | |
| actions.append(NoMonochromeAction()) # no monochrome, greyscale or sketch | |
| actions.append(ClassFilterAction(['illustration', 'bangumi'])) # no comic or 3d | |
| if no_r18: | |
| actions.append(RatingFilterAction(['safe', 'r15'])) | |
| actions.append(FilterSimilarAction('all')) # filter duplicated images | |
| if drop_multi: | |
| actions.append(FaceCountAction(count=1, level='n')) # drop images with 0 or >1 faces | |
| actions.extend([ | |
| PersonSplitAction(level='n'), # crop for each person | |
| FaceCountAction(count=1, level='n'), | |
| FileOrderAction(), # Rename files in order | |
| # CCIPAction(min_val_count=15), # CCIP, filter the character you may not want to see in dataset | |
| FilterSimilarAction('all'), # filter duplicated images | |
| MinSizeFilterAction(320), | |
| TaggingAction(force=True, character_threshold=1.01), | |
| ]) | |
| actions.append(RandomFilenameAction(ext='.png')) | |
| else: | |
| actions = [] | |
| return source.attach(*actions) | |
| def actions_parse(actions: Union[int, Tuple[int, int], List[BaseAction]], bg_color: str = 'white'): | |
| if isinstance(actions, list): | |
| return actions | |
| elif isinstance(actions, tuple): | |
| width, height = actions | |
| return [PaddingAlignAction((width, height), bg_color)] | |
| elif isinstance(actions, int): | |
| return [AlignMinSizeAction(actions)] | |
| else: | |
| raise TypeError(f'Unknown post action type - {actions!r}.') | |
| class CustomMinSizeAction(FilterAction): | |
| def __init__(self, main_size: int = 280, min_eye_size: int = 180): | |
| self.main_size = main_size | |
| self.min_eye_size = min_eye_size | |
| def check(self, item: ImageItem) -> bool: | |
| min_size = min(item.image.width, item.image.height) | |
| if 'crop' in item.meta and item.meta['crop']['type'] == 'eye': | |
| return min_size >= self.min_eye_size | |
| else: | |
| return min_size >= self.main_size | |
| _SOURCES = { | |
| 'native': [ | |
| TaggingAction(force=False, character_threshold=1.01), | |
| ], | |
| 'stage3': [ | |
| ThreeStageSplitAction(split_person=False), | |
| FilterSimilarAction(), | |
| MinSizeFilterAction(280), | |
| TaggingAction(force=False, character_threshold=1.01), | |
| ], | |
| 'stage3-eyes': [ | |
| ThreeStageSplitAction(split_person=False, split_eyes=True), | |
| FilterSimilarAction(), | |
| CustomMinSizeAction(280, 180), | |
| TaggingAction(force=False, character_threshold=1.01), | |
| ] | |
| } | |
| _DEFAULT_RESOLUTIONS = { | |
| 'raw': ('native', [], 'Raw data with meta information.'), | |
| 'raw-stage3': ('stage3', [], '3-stage cropped raw data with meta information.'), | |
| 'raw-stage3-eyes': ('stage3-eyes', [], '3-stage cropped (with eye-focus) raw data with meta information.'), | |
| '384x512': ('native', (384, 512), '384x512 aligned dataset.'), | |
| # '512x512': ('native', (512, 512), '512x512 aligned dataset.'), | |
| '512x704': ('native', (512, 704), '512x704 aligned dataset.'), | |
| # '640x640': ('native', (640, 640), '640x640 aligned dataset.'), | |
| '640x880': ('native', (640, 880), '640x880 aligned dataset.'), | |
| 'stage3-640': ('stage3', 640, '3-stage cropped dataset with the shorter side not exceeding 640 pixels.'), | |
| 'stage3-800': ('stage3', 800, '3-stage cropped dataset with the shorter side not exceeding 800 pixels.'), | |
| 'stage3-p512-640': ('stage3', [MinAreaFilterAction(512), AlignMinSizeAction(640)], | |
| '3-stage cropped dataset with the area not less than 512x512 pixels.'), | |
| # 'stage3-1200': ('stage3', 1200, '3-stage cropped dataset with the shorter side not exceeding 1200 pixels.'), | |
| 'stage3-eyes-640': ('stage3-eyes', 640, '3-stage cropped (with eye-focus) dataset ' | |
| 'with the shorter side not exceeding 640 pixels.'), | |
| 'stage3-eyes-800': ('stage3-eyes', 800, '3-stage cropped (with eye-focus) dataset ' | |
| 'with the shorter side not exceeding 800 pixels.'), | |
| } | |
| DATASET_PVERSION = 'v1.4' | |
| def crawl_dataset_to_huggingface( | |
| source: Union[str, Character, BaseDataSource], repository: Optional[str] = None, | |
| name: Optional[str] = None, limit: Optional[int] = 10000, min_images: int = 450, | |
| no_r18: bool = False, bg_color: str = 'white', drop_multi: bool = True, skip_preprocess: bool = False, | |
| no_monochrome_check: bool = False, | |
| repo_type: str = 'dataset', revision: str = 'main', path_in_repo: str = '.', private: bool = False, | |
| ): | |
| if isinstance(source, (str, Character)): | |
| if isinstance(source, str): | |
| source = get_character(source) | |
| name = f'{source.enname} ({source.__official_name__})' | |
| if not repository: | |
| repository = f'AppleHarem/{get_ch_name(source)}' | |
| else: | |
| if name is None: | |
| raise ValueError('Name must be specified when source is not str or character.') | |
| if not repository: | |
| repository = f'AppleHarem/{get_alphabet_name(name)}' | |
| hf_fs = get_hf_fs() | |
| if hf_fs.exists(f'datasets/{repository}/.gitattributes'): | |
| logging.warn(f'{repository} exists, skipped.') | |
| return | |
| origin_source = get_main_source(source, no_r18, bg_color, no_monochrome_check, drop_multi, skip_preprocess) | |
| with TemporaryDirectory() as td: | |
| # save origin directory | |
| origin_dir = os.path.join(td, 'origin') | |
| os.makedirs(origin_dir, exist_ok=True) | |
| if limit is not None: | |
| origin_source = origin_source[:limit] | |
| with task_ctx('origin'): | |
| origin_source.export(SaveExporter(origin_dir)) | |
| img_count = len(glob.glob(os.path.join(origin_dir, '*.png'))) | |
| if img_count < min_images: | |
| logging.warn(f'Only {plural_word(img_count, "image")} found for {name} which is too few, ' | |
| f'skip post-processing and uploading.') | |
| return | |
| source_dir = os.path.join(td, 'source') | |
| os.makedirs(source_dir, exist_ok=True) | |
| for sname, actions in _SOURCES.items(): | |
| with task_ctx(f'source/{sname}'): | |
| LocalSource(origin_dir).attach(*actions).export(SaveExporter(os.path.join(source_dir, sname))) | |
| processed_dir = os.path.join(td, 'processed') | |
| os.makedirs(processed_dir, exist_ok=True) | |
| archive_dir = os.path.join(td, 'archives') | |
| os.makedirs(archive_dir, exist_ok=True) | |
| files_to_upload: List[Tuple[str, str]] = [] | |
| resolutions = _DEFAULT_RESOLUTIONS | |
| columns = ['Name', 'Images', 'Download', 'Description'] | |
| rows = [] | |
| for rname, (sname, actions, description) in resolutions.items(): | |
| actions = actions_parse(actions, bg_color) | |
| ox = LocalSource(os.path.join(source_dir, sname)) | |
| current_processed_dir = os.path.join(processed_dir, rname) | |
| with task_ctx(f'archive/{rname}'): | |
| if not rname.startswith('raw'): # raw is preserved for exporting json data | |
| ox.attach(*actions).export(TextualInversionExporter(current_processed_dir)) | |
| else: | |
| ox.attach(*actions).export(SaveExporter(current_processed_dir)) | |
| current_img_cnt = len(glob.glob(os.path.join(current_processed_dir, '*.png'))) | |
| zip_file = os.path.join(archive_dir, f'dataset-{rname}.zip') | |
| with zipfile.ZipFile(zip_file, mode='w') as zf: | |
| for directory, _, files in os.walk(current_processed_dir): | |
| for file in files: | |
| file_path = os.path.join(directory, file) | |
| rel_file_path = os.path.relpath(file_path, current_processed_dir) | |
| zf.write( | |
| file_path, | |
| '/'.join(rel_file_path.split(os.sep)) | |
| ) | |
| rows.append(( | |
| rname, | |
| current_img_cnt, | |
| f'[Download]({os.path.basename(zip_file)})', | |
| description, | |
| )) | |
| files_to_upload.append((zip_file, os.path.basename(zip_file))) | |
| meta_file = os.path.join(td, 'meta.json') | |
| with open(meta_file, 'w', encoding='utf-8') as mf: | |
| json.dump({ | |
| 'name': name, | |
| 'version': DATASET_PVERSION, | |
| }, mf, indent=4, sort_keys=True, ensure_ascii=False) | |
| files_to_upload.append((meta_file, 'meta.json')) | |
| readme_file = os.path.join(td, 'README.md') | |
| with open(readme_file, 'w', encoding='utf-8') as rf: | |
| print(f'---', file=rf) | |
| print(f'license: mit', file=rf) | |
| print(f'task_categories:', file=rf) | |
| print(f'- text-to-image', file=rf) | |
| print(f'tags:', file=rf) | |
| print(f'- art', file=rf) | |
| print(f'- not-for-all-audiences', file=rf) | |
| print(f'size_categories:', file=rf) | |
| print(f'- {number_to_tag(img_count)}', file=rf) | |
| print(f'---', file=rf) | |
| print(f'', file=rf) | |
| print(f'# Dataset of {name}', file=rf) | |
| print(f'', file=rf) | |
| print(f'This is the dataset of {name}, ' | |
| f'containing {plural_word(img_count, "images")} and their tags.', file=rf) | |
| print(f'', file=rf) | |
| print(f'Images are crawled from many sites (e.g. danbooru, pixiv, zerochan ...), ' | |
| f'the auto-crawling system is powered by [DeepGHS Team](https://github.com/deepghs)' | |
| f'([huggingface organization](https://huggingface.co/deepghs)). ', file=rf) | |
| print(f'This is a WebUI contains crawlers and other thing: ' | |
| f'([LittleAppleWebUI](https://github.com/LittleApple-fp16/LittleAppleWebUI))', file=rf) | |
| print(f'', file=rf) | |
| df = pd.DataFrame(columns=columns, data=rows) | |
| print(df.to_markdown(index=False), file=rf) | |
| print('', file=rf) | |
| files_to_upload.append((readme_file, 'README.md')) | |
| hf_client = get_hf_client() | |
| hf_fs = get_hf_fs() | |
| logging.info(f'Initialize repository {repository!r}') | |
| if not hf_fs.exists(f'datasets/{repository}/.gitattributes'): | |
| hf_client.create_repo(repo_id=repository, repo_type=repo_type, exist_ok=True, private=private) | |
| current_time = datetime.datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S %Z') | |
| commit_message = f"Publish character {name}, on {current_time}" | |
| logging.info(f'Publishing character {name!r} to repository {repository!r} ...') | |
| hf_client.create_commit( | |
| repository, | |
| [ | |
| CommitOperationAdd( | |
| path_in_repo=f'{path_in_repo}/{filename}', | |
| path_or_fileobj=local_file, | |
| ) for local_file, filename in files_to_upload | |
| ], | |
| commit_message=commit_message, | |
| repo_type=repo_type, | |
| revision=revision, | |
| run_as_future=False, | |
| ) | |
| def remake_dataset_to_huggingface( | |
| repository: Optional[str] = None, limit: Optional[int] = 200, min_images: int = 10, | |
| no_r18: bool = False, bg_color: str = 'white', drop_multi: bool = True, | |
| repo_type: str = 'dataset', revision: str = 'main', path_in_repo: str = '.', | |
| ): | |
| hf_fs = get_hf_fs() | |
| with TemporaryDirectory() as td: | |
| zip_file = os.path.join(td, 'dataset-raw.zip') | |
| download_file(hf_hub_url(repository, 'dataset-raw.zip', repo_type='dataset'), zip_file) | |
| source_dir = os.path.join(td, 'source') | |
| os.makedirs(source_dir, exist_ok=True) | |
| with zipfile.ZipFile(zip_file, 'r') as zf: | |
| zf.extractall(source_dir) | |
| source = LocalSource(source_dir) | |
| name = None | |
| if hf_fs.exists(f'datasets/{repository}/meta.json'): | |
| meta_json = json.loads(hf_fs.read_text(f'datasets/{repository}/meta.json')) | |
| if 'name' in meta_json: | |
| name = meta_json['name'] | |
| name = name or repository.split('/')[-1] | |
| return crawl_dataset_to_huggingface( | |
| source, repository, name, | |
| limit, min_images, no_r18, bg_color, drop_multi, True, | |
| repo_type, revision, path_in_repo | |
| ) | |