File size: 4,437 Bytes
3eba160 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
"""Utility helpers for Kaloscope-related model management."""
from __future__ import annotations
import os
from typing import Any, Dict, Iterable, Tuple
def _iter_repo_files(repo_info: Any) -> Iterable[Any]:
"""Yield file metadata objects from a repository info response."""
siblings = getattr(repo_info, "siblings", None)
if not siblings:
return []
return siblings
def auto_register_kaloscope_variants(models: Dict[str, Dict[str, Any]]) -> None:
"""Discover Kaloscope checkpoints in subfolders and append them to the model list."""
try:
from huggingface_hub import HfApi
except ImportError:
print("huggingface_hub is not available; skipping Kaloscope auto-discovery.")
return
repo_id = "heathcliff01/Kaloscope"
api = HfApi()
try:
repo_info = api.model_info(repo_id=repo_id)
except Exception as exc:
print(f"Failed to query Hugging Face repo info: {exc}")
return
existing_entries: set[Tuple[str, str, str]] = {
(
config.get("repo_id", ""),
config.get("subfolder", ""),
config.get("filename", ""),
)
for config in models.values()
if config.get("repo_id") and config.get("filename")
}
discovered: list[Tuple[str, Dict[str, Any], Tuple[str, str, str]]] = []
for sibling in _iter_repo_files(repo_info):
path = getattr(sibling, "rfilename", None) or getattr(sibling, "path", None)
if not path or not path.endswith(".pth"):
continue
subfolder = os.path.dirname(path).strip("/")
filename = os.path.basename(path)
key = (repo_id, subfolder, filename)
if key in existing_entries:
continue
# Use repo_id/folder format if in subfolder, otherwise repo_id/filename
repo_name = repo_id.split("/")[-1]
if subfolder:
display_name = f"{repo_name}/{subfolder.split('/')[-1]}"
else:
display_name = f"{repo_name}/{os.path.splitext(filename)[0]}"
config: Dict[str, Any] = {
"type": "pytorch",
"path": "",
"repo_id": repo_id,
"filename": filename,
"arch": "lsnet_xl_artist",
}
if subfolder:
config["subfolder"] = subfolder
discovered.append((display_name, config, key))
# Sort alphabetically by display name
discovered.sort(key=lambda item: item[0])
for display_name, config, key in discovered:
candidate_name = display_name
suffix = 2
while candidate_name in models:
candidate_name = f"{display_name} #{suffix}"
suffix += 1
models[candidate_name] = config
existing_entries.add(key)
print(f"Auto-registered: {candidate_name}")
def validate_models(models: Dict[str, Dict[str, Any]]):
"""
Validate models at startup and remove invalid entries.
Returns list of valid model names.
"""
valid_models = {}
for model_name, config in models.items():
model_path = config.get("path") or ""
# If local path exists, it's valid
if model_path and os.path.exists(model_path):
valid_models[model_name] = config
continue
# Check if it can be downloaded from HF
if "repo_id" in config and "filename" in config:
try:
from huggingface_hub import file_exists
# Build kwargs for checking file existence
check_kwargs = {
"repo_id": config["repo_id"],
"filename": config["filename"],
"repo_type": "model",
}
if config.get("subfolder"):
check_kwargs["filename"] = f"{config['subfolder']}/{config['filename']}"
if config.get("revision"):
check_kwargs["revision"] = config["revision"]
# Check if file exists on HF
if file_exists(**check_kwargs):
valid_models[model_name] = config
else:
print(f"Skipping {model_name}: file not found on Hugging Face")
except Exception as e:
print(f"Skipping {model_name}: validation error - {e}")
else:
print(f"Skipping {model_name}: no valid path or repo configuration")
return valid_models
|