Spaces:
Runtime error
Runtime error
| import glob | |
| import io | |
| import json | |
| import logging | |
| import os | |
| import re | |
| import textwrap | |
| from typing import Union, Optional, List | |
| import markdown2 | |
| import numpy as np | |
| from PIL import Image | |
| from hbutils.string import plural_word | |
| from hbutils.system import TemporaryDirectory | |
| from imgutils.data import load_image | |
| from imgutils.detect import detect_faces | |
| from imgutils.metrics import ccip_extract_feature, ccip_batch_differences, ccip_default_threshold | |
| from imgutils.validate import anime_rating_score | |
| from pycivitai import civitai_find_online | |
| from pycivitai.client import find_version_id_by_hash | |
| from tqdm.auto import tqdm | |
| from waifuc.source import LocalSource | |
| from .export import draw_with_repo | |
| from ..dataset import load_dataset_for_character | |
| from ..publish.civitai import _tag_decode, try_find_title, try_get_title_from_repo | |
| from ..utils import srequest, get_hf_fs, load_tags_from_directory | |
| def publish_samples_to_civitai(images_dir, model: Union[int, str], model_version: Optional[str] = None, | |
| model_creator='narugo1992', safe_only: bool = False, | |
| extra_tags: Optional[List[str]] = None, post_title: str = None, | |
| session_repo: str = 'narugo/civitai_session_p1'): | |
| resource = civitai_find_online(model, model_version, creator=model_creator) | |
| model_version_id = resource.version_id | |
| post_title = post_title or f"{resource.model_name} - {resource.version_name} Review" | |
| images = [] | |
| for img_file in glob.glob(os.path.join(images_dir, '*.png')): | |
| img_filename = os.path.basename(img_file) | |
| img_name = os.path.splitext(img_filename)[0] | |
| img_info_filename = f'{img_name}_info.txt' | |
| local_img_file = os.path.join(images_dir, img_filename) | |
| local_info_file = os.path.join(images_dir, img_info_filename) | |
| info = {} | |
| with open(local_info_file, 'r', encoding='utf-8') as iif: | |
| for line in iif: | |
| line = line.strip() | |
| if line: | |
| info_name, info_text = line.split(':', maxsplit=1) | |
| info[info_name.strip()] = info_text.strip() | |
| meta = { | |
| 'cfgScale': int(round(float(info.get('Guidance Scale')))), | |
| 'negativePrompt': info.get('Neg Prompt'), | |
| 'prompt': info.get('Prompt'), | |
| 'sampler': info.get('Sample Method', "Euler a"), | |
| 'seed': int(info.get('Seed')), | |
| 'steps': int(info.get('Infer Steps')), | |
| 'Size': f"{info['Width']}x{info['Height']}", | |
| } | |
| if info.get('Clip Skip'): | |
| meta['clipSkip'] = int(info['Clip Skip']) | |
| if info.get('Model'): | |
| meta['Model'] = info['Model'] | |
| pil_img_file = Image.open(local_img_file) | |
| if pil_img_file.info.get('parameters'): | |
| png_info_text = pil_img_file.info['parameters'] | |
| find_hash = re.findall(r'Model hash:\s*([a-zA-Z\d]+)', png_info_text, re.IGNORECASE) | |
| if find_hash: | |
| model_hash = find_hash[0].lower() | |
| meta['hashes'] = {"model": model_hash} | |
| meta["resources"] = [ | |
| { | |
| "hash": model_hash, | |
| "name": info['Model'], | |
| "type": "model" | |
| } | |
| ] | |
| meta["Model hash"] = model_hash | |
| nsfw = (info.get('Safe For Word', info.get('Safe For Work')) or '').lower() != 'yes' | |
| rating_score = anime_rating_score(local_img_file) | |
| safe_v = int(round(rating_score['safe'] * 10)) | |
| safe_r15 = int(round(rating_score['r15'] * 10)) | |
| safe_r18 = int(round(rating_score['r18'] * 10)) | |
| faces = detect_faces(local_img_file) | |
| if faces: | |
| (x0, y0, x1, y1), _, _ = faces[0] | |
| width, height = load_image(local_img_file).size | |
| face_area = abs((x1 - x0) * (y1 - y0)) | |
| face_ratio = face_area * 1.0 / (width * height) | |
| face_ratio = int(round(face_ratio * 50)) | |
| else: | |
| continue | |
| images.append(( | |
| (-safe_v, -safe_r15, -safe_r18) if safe_only else (0,), | |
| -face_ratio, | |
| 1 if nsfw else 0, | |
| 0 if img_name.startswith('pattern_') else 1, | |
| img_name, | |
| (local_img_file, img_filename, meta) | |
| )) | |
| images = [item[-1] for item in sorted(images)] | |
| from ..publish.civitai import civitai_upload_images, get_civitai_session, parse_publish_at | |
| def _custom_pc_func(mvid): | |
| return { | |
| "json": { | |
| "modelVersionId": mvid, | |
| "title": post_title, | |
| "tag": None, | |
| "authed": True, | |
| }, | |
| "meta": { | |
| "values": { | |
| "tag": ["undefined"] | |
| } | |
| } | |
| } | |
| session = get_civitai_session(session_repo) | |
| post_id = civitai_upload_images( | |
| model_version_id, images, | |
| tags=[*resource.tags, *extra_tags], | |
| model_id=resource.model_id, | |
| pc_func=_custom_pc_func, | |
| session=session, | |
| ) | |
| logging.info(f'Publishing post {post_id!r} ...') | |
| resp = srequest( | |
| session, 'POST', 'https://civitai.com/api/trpc/post.update', | |
| json={ | |
| "json": { | |
| "id": post_id, | |
| "publishedAt": parse_publish_at('now'), | |
| "authed": True, | |
| }, | |
| "meta": { | |
| "values": { | |
| "publishedAt": ["Date"] | |
| } | |
| } | |
| }, | |
| headers={'Referer': f'https://civitai.com/models/{resource.model_id}/wizard?step=4'}, | |
| ) | |
| resp.raise_for_status() | |
| return images | |
| def civitai_review(model: Union[int, str], model_version: Optional[str] = None, | |
| model_creator='narugo1992', rating: int = 5, description_md: Optional[str] = None, | |
| session_repo: str = 'narugo/civitai_session_p1'): | |
| resource = civitai_find_online(model, model_version, creator=model_creator) | |
| from ..publish.civitai import get_civitai_session | |
| session = get_civitai_session(session_repo) | |
| logging.info(f'Try find exist review of model version #{resource.version_id} ...') | |
| _err = None | |
| try: # Add this shit for the 500 of this API (2023-09-14) | |
| resp = srequest( | |
| session, 'GET', 'https://civitai.com/api/trpc/resourceReview.getUserResourceReview', | |
| params={'input': json.dumps({"json": {"modelVersionId": resource.version_id, "authed": True}})}, | |
| headers={ | |
| 'Referer': f'https://civitai.com/posts/create?modelId={resource.model_id}&' | |
| f'modelVersionId={resource.version_id}&' | |
| f'returnUrl=/models/{resource.model_id}?' | |
| f'modelVersionId={resource.version_id}reviewing=true' | |
| }, | |
| raise_for_status=False | |
| ) | |
| except AssertionError: | |
| _err = True | |
| resp = None | |
| if _err or resp.status_code == 404: | |
| logging.info(f'Creating review for #{resource.version_id} ...') | |
| resp = srequest( | |
| session, 'POST', 'https://civitai.com/api/trpc/resourceReview.create', | |
| json={ | |
| "json": { | |
| "modelVersionId": resource.version_id, | |
| "modelId": resource.model_id, | |
| "rating": rating, | |
| "authed": True, | |
| } | |
| }, | |
| headers={'Referer': f'https://civitai.com/models/{resource.model_id}/wizard?step=4'} | |
| ) | |
| resp.raise_for_status() | |
| else: | |
| if resp is not None: | |
| resp.raise_for_status() | |
| review_id = resp.json()['result']['data']['json']['id'] | |
| logging.info(f'Updating review #{review_id}\'s rating ...') | |
| resp = srequest( | |
| session, 'POST', 'https://civitai.com/api/trpc/resourceReview.update', | |
| json={ | |
| "json": { | |
| "id": review_id, | |
| "rating": rating, | |
| "details": None, | |
| "authed": True, | |
| }, | |
| "meta": {"values": {"details": ["undefined"]}} | |
| }, | |
| headers={'Referer': f'https://civitai.com/models/{resource.model_id}/wizard?step=4'} | |
| ) | |
| resp.raise_for_status() | |
| if description_md: | |
| logging.info(f'Updating review #{review_id}\'s description ...') | |
| resp = srequest( | |
| session, 'POST', 'https://civitai.com/api/trpc/resourceReview.update', | |
| json={ | |
| "json": { | |
| "id": review_id, | |
| "details": markdown2.markdown(textwrap.dedent(description_md)), | |
| 'rating': None, | |
| "authed": True, | |
| }, | |
| "meta": {"values": {"rating": ["undefined"]}} | |
| }, | |
| headers={'Referer': f'https://civitai.com/models/{resource.model_id}/wizard?step=4'} | |
| ) | |
| resp.raise_for_status() | |
| _BASE_MODEL_LIST = [ | |
| 'AIARTCHAN/anidosmixV2', | |
| # 'stablediffusionapi/anything-v5', | |
| # 'Lykon/DreamShaper', | |
| 'Meina/Unreal_V4.1', | |
| 'digiplay/majicMIX_realistic_v6', | |
| 'jzli/XXMix_9realistic-v4', | |
| 'stablediffusionapi/abyssorangemix2nsfw', | |
| 'AIARTCHAN/expmixLine_v2', | |
| # 'Yntec/CuteYuki2', | |
| 'stablediffusionapi/counterfeit-v30', | |
| 'stablediffusionapi/flat-2d-animerge', | |
| 'redstonehero/cetusmix_v4', | |
| # 'KBlueLeaf/kohaku-v4-rev1.2', | |
| # 'stablediffusionapi/night-sky-yozora-sty', | |
| 'Meina/MeinaHentai_V4', | |
| # 'Meina/MeinaPastel_V6', | |
| ] | |
| def civitai_auto_review(repository: str, model: Optional[Union[int, str]] = None, | |
| model_version: Optional[str] = None, | |
| model_creator='narugo1992', step: Optional[int] = None, | |
| base_models: Optional[List[str]] = None, | |
| rating: Optional[int] = 5, description_md: Optional[str] = None, | |
| session_repo: str = 'narugo/civitai_session_p1'): | |
| game_name = repository.split('/')[-1].split('_')[-1] | |
| char_name = ' '.join(repository.split('/')[-1].split('_')[:-1]) | |
| model = model or try_find_title(char_name, game_name) or \ | |
| try_get_title_from_repo(repository) or repository.split('/')[-1] | |
| logging.info(f'Model name on civitai: {model!r}') | |
| from ..publish.export import KNOWN_MODEL_HASHES | |
| hf_fs = get_hf_fs() | |
| model_info = json.loads(hf_fs.read_text(f'{repository}/meta.json')) | |
| dataset_info = model_info['dataset'] | |
| # load dataset | |
| ds_size = (384, 512) if not dataset_info or not dataset_info['type'] else dataset_info['type'] | |
| with load_dataset_for_character(repository, size=ds_size) as (_, ds_dir): | |
| core_tags, _ = load_tags_from_directory(ds_dir) | |
| all_tags = [ | |
| game_name, f"{game_name} {char_name}", char_name, | |
| 'female', 'girl', 'character', 'fully-automated', 'random prompt', 'random seed', | |
| *map(_tag_decode, core_tags.keys()), | |
| ] | |
| ds_source = LocalSource(ds_dir) | |
| ds_feats = [] | |
| for item in tqdm(list(ds_source), desc='Extract Dataset Feature'): | |
| ds_feats.append(ccip_extract_feature(item.image)) | |
| all_feats = [] | |
| model_results = [] | |
| for base_model in (base_models or _BASE_MODEL_LIST): | |
| logging.info(f'Reviewing with {base_model!r} ...') | |
| with TemporaryDirectory() as td: | |
| if KNOWN_MODEL_HASHES.get(base_model): | |
| bm_id, bm_version_id, _ = find_version_id_by_hash(KNOWN_MODEL_HASHES[base_model]) | |
| resource = civitai_find_online(bm_id, bm_version_id) | |
| m_name = f'{resource.model_name} - {resource.version_name}' | |
| m_url = f'https://civitai.com/models/{resource.model_id}?modelVersionId={resource.version_id}' | |
| else: | |
| m_name = base_model | |
| m_url = None | |
| draw_with_repo(repository, td, step=step, pretrained_model=base_model) | |
| images = publish_samples_to_civitai( | |
| td, model, model_version, | |
| model_creator=model_creator, | |
| extra_tags=all_tags, | |
| post_title=f"AI Review (Base Model: {m_name})", | |
| session_repo=session_repo | |
| ) | |
| images_count = len(images) | |
| gp_feats = [] | |
| for local_imgfile, _, _ in tqdm(images, desc='Extract Images Feature'): | |
| gp_feats.append(ccip_extract_feature(local_imgfile)) | |
| all_feats.extend(gp_feats) | |
| gp_diffs = ccip_batch_differences([*gp_feats, *ds_feats])[:len(gp_feats), len(gp_feats):] | |
| gp_batch = gp_diffs <= ccip_default_threshold() | |
| scores = gp_batch.mean(axis=1) | |
| losses = gp_diffs.mean(axis=1) | |
| ret = { | |
| 'model_name': m_name, | |
| 'model_homepage': m_url, | |
| 'images': images_count, | |
| 'mean_score': scores.mean().item(), | |
| 'median_score': np.median(scores).item(), | |
| 'mean_loss': losses.mean().item(), | |
| 'median_loss': np.median(losses).item(), | |
| } | |
| logging.info(f'Result of model: {ret!r}') | |
| model_results.append(ret) | |
| all_diffs = ccip_batch_differences([*all_feats, *ds_feats])[:len(all_feats), len(all_feats):] | |
| all_batch = all_diffs <= ccip_default_threshold() | |
| all_scores = all_batch.mean(axis=1) | |
| all_losses = all_diffs.mean(axis=1) | |
| all_mean_score = all_scores.mean().item() | |
| all_median_score = np.median(all_scores).item() | |
| all_mean_loss = all_losses.mean().item() | |
| all_median_loss = np.median(all_losses).item() | |
| if rating is not None: | |
| logging.info('Making review ...') | |
| with io.StringIO() as ds: | |
| print('Tested on the following models:', file=ds) | |
| print('', file=ds) | |
| all_total_images = 0 | |
| for mr in model_results: | |
| if mr['model_homepage']: | |
| mx = f'[{mr["model_name"]}]({mr["model_homepage"]})' | |
| else: | |
| mx = mr['model_name'] | |
| all_total_images += mr['images'] | |
| print( | |
| f'When using model {mx}, {plural_word(mr["images"], "image")} in total, ' | |
| f'recognition score (mean/median): {mr["mean_score"]:.3f}/{mr["median_score"]:.3f}, ' | |
| f'character image loss (mean/median): {mr["mean_loss"]:.4f}/{mr["median_loss"]:.4f}.', | |
| file=ds | |
| ) | |
| print('', file=ds) | |
| print( | |
| f'Overall, {plural_word(all_total_images, "image")} in total, ' | |
| f'recognition score (mean/median): {all_mean_score:.3f}/{all_median_score:.3f}, ' | |
| f'character image loss (mean/median): {all_mean_loss:.4f}/{all_median_loss:.4f}.', | |
| file=ds | |
| ) | |
| print('', file=ds) | |
| description_md = description_md or ds.getvalue() | |
| try: | |
| civitai_review(model, model_version, model_creator, rating, description_md, session_repo) | |
| except: | |
| print('This is the description md:') | |
| print(description_md) | |
| raise | |