Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Model Download Script for MiloMusic | |
| Downloads required model files from Hugging Face Hub at runtime to avoid storage limits. | |
| """ | |
| import os | |
| import subprocess | |
| import shutil | |
| from pathlib import Path | |
| def download_xcodec_models(): | |
| """Download xcodec_mini_infer using git clone (no LFS) + wget for large files""" | |
| # Base path for xcodec models - convert to absolute path to avoid working directory issues | |
| xcodec_base = Path("YuEGP/inference/xcodec_mini_infer").resolve() | |
| print("π₯ Downloading xcodec_mini_infer using git clone + wget strategy...") | |
| try: | |
| # Remove existing directory if it exists | |
| if xcodec_base.exists(): | |
| print("ποΈ Removing existing xcodec_mini_infer directory...") | |
| shutil.rmtree(xcodec_base) | |
| # Ensure parent directory exists | |
| os.makedirs(xcodec_base.parent, exist_ok=True) | |
| # Change to the parent directory for git clone | |
| original_cwd = os.getcwd() | |
| os.chdir(xcodec_base.parent) | |
| try: | |
| # Step 1: Clone repository structure without LFS files | |
| print("π Step 1: Cloning repository structure (no LFS)...") | |
| # Set environment variable to skip LFS files during clone | |
| env = os.environ.copy() | |
| env["GIT_LFS_SKIP_SMUDGE"] = "1" | |
| subprocess.run([ | |
| "git", "clone", | |
| "https://huggingface.co/m-a-p/xcodec_mini_infer", | |
| "xcodec_mini_infer" | |
| ], check=True, capture_output=True, text=True, timeout=300, env=env) | |
| print("β Repository structure downloaded successfully") | |
| # Step 2: Download critical LFS files using wget | |
| print("π Step 2: Downloading critical LFS files with wget...") | |
| # Define critical LFS files and their download URLs | |
| lfs_files = [ | |
| { | |
| "path": "decoders/decoder_131000.pth", | |
| "url": "https://huggingface.co/m-a-p/xcodec_mini_infer/resolve/main/decoders/decoder_131000.pth", | |
| "description": "Vocal decoder model (70MB)" | |
| }, | |
| { | |
| "path": "decoders/decoder_151000.pth", | |
| "url": "https://huggingface.co/m-a-p/xcodec_mini_infer/resolve/main/decoders/decoder_151000.pth", | |
| "description": "Instrumental decoder model (70MB)" | |
| }, | |
| { | |
| "path": "final_ckpt/ckpt_00360000.pth", | |
| "url": "https://huggingface.co/m-a-p/xcodec_mini_infer/resolve/main/final_ckpt/ckpt_00360000.pth", | |
| "description": "Main checkpoint (1.3GB)" | |
| }, | |
| { | |
| "path": "semantic_ckpts/hf_1_325000/pytorch_model.bin", | |
| "url": "https://huggingface.co/m-a-p/xcodec_mini_infer/resolve/main/semantic_ckpts/hf_1_325000/pytorch_model.bin", | |
| "description": "Semantic model (361MB)" | |
| } | |
| ] | |
| # Change to the cloned directory | |
| os.chdir("xcodec_mini_infer") | |
| # Download each LFS file | |
| success_count = 0 | |
| for file_info in lfs_files: | |
| try: | |
| file_path = Path(file_info["path"]) | |
| print(f"π₯ Downloading {file_info['description']}...") | |
| # Ensure directory exists | |
| os.makedirs(file_path.parent, exist_ok=True) | |
| # Download with wget | |
| subprocess.run([ | |
| "wget", "-O", str(file_path), | |
| file_info["url"] | |
| ], check=True, capture_output=True, text=True, timeout=1800) # 30min timeout for large files | |
| # Verify download | |
| if file_path.exists() and file_path.stat().st_size > 1024: # > 1KB | |
| print(f"β Successfully downloaded {file_info['path']} ({file_path.stat().st_size // (1024*1024)}MB)") | |
| success_count += 1 | |
| else: | |
| print(f"β οΈ {file_info['path']} download appears incomplete") | |
| except subprocess.CalledProcessError as e: | |
| print(f"β Failed to download {file_info['path']}: {e}") | |
| if e.stderr: | |
| print(f"Error details: {e.stderr[-500:]}...") # Last 500 chars | |
| except subprocess.TimeoutExpired: | |
| print(f"β Download timeout for {file_info['path']} (large file)") | |
| except Exception as e: | |
| print(f"β Unexpected error downloading {file_info['path']}: {e}") | |
| os.chdir("..") | |
| # Debug: Print current working directory and file locations | |
| print(f"π Current working directory: {os.getcwd()}") | |
| print(f"π Expected xcodec_base (absolute): {xcodec_base}") | |
| print(f"π xcodec_base exists: {xcodec_base.exists()}") | |
| # Additional debug: check if decoders directory exists | |
| decoders_dir = xcodec_base / "decoders" | |
| print(f"π Decoders directory: {decoders_dir}") | |
| print(f"π Decoders directory exists: {decoders_dir.exists()}") | |
| if decoders_dir.exists(): | |
| all_decoder_files = list(decoders_dir.iterdir()) | |
| print(f"π All files in decoders directory: {[f.name for f in all_decoder_files]}") | |
| print(f"π File sizes: {[(f.name, f.stat().st_size if f.is_file() else 'dir') for f in all_decoder_files]}") | |
| # Verify critical decoder files (minimum requirement) | |
| decoder_files = [ | |
| xcodec_base / "decoders" / "decoder_131000.pth", | |
| xcodec_base / "decoders" / "decoder_151000.pth" | |
| ] | |
| missing_decoders = [] | |
| for decoder_file in decoder_files: | |
| print(f"π Checking: {decoder_file}") | |
| exists = decoder_file.exists() | |
| print(f"π File exists: {exists}") | |
| if exists: | |
| size = decoder_file.stat().st_size | |
| print(f"π File size: {size} bytes ({size // (1024*1024)} MB)") | |
| if size < 1024: | |
| missing_decoders.append(decoder_file.name) | |
| print(f"β οΈ {decoder_file.name} is too small (likely an LFS pointer)") | |
| else: | |
| missing_decoders.append(decoder_file.name) | |
| print(f"β οΈ {decoder_file.name} does not exist") | |
| if missing_decoders: | |
| print(f"β Critical decoder files missing or incomplete: {missing_decoders}") | |
| print("Vocoder functionality will not work without decoder files.") | |
| return False | |
| print(f"β Successfully downloaded {success_count}/{len(lfs_files)} LFS files") | |
| print("β All critical decoder files verified present and complete") | |
| return True | |
| finally: | |
| os.chdir(original_cwd) | |
| except subprocess.CalledProcessError as e: | |
| print(f"β Git clone failed: {e}") | |
| if e.stdout: | |
| print(f"stdout: {e.stdout}") | |
| if e.stderr: | |
| print(f"stderr: {e.stderr}") | |
| return False | |
| except subprocess.TimeoutExpired: | |
| print("β Git clone timed out") | |
| return False | |
| except Exception as e: | |
| print(f"β Unexpected error: {e}") | |
| return False | |
| def ensure_model_availability(): | |
| """ | |
| Ensure all required models are available locally. | |
| Download them if they don't exist. | |
| """ | |
| xcodec_base = Path("YuEGP/inference/xcodec_mini_infer") | |
| # Check if critical files exist (both for recons and vocoder stages) | |
| critical_files = [ | |
| # Vocoder stage files | |
| xcodec_base / "decoders" / "decoder_131000.pth", | |
| xcodec_base / "decoders" / "decoder_151000.pth", | |
| xcodec_base / "decoders" / "config.yaml", | |
| # Recons stage files (critical for audio decoding) | |
| xcodec_base / "final_ckpt" / "ckpt_00360000.pth", | |
| xcodec_base / "final_ckpt" / "config.yaml", | |
| # Python modules | |
| xcodec_base / "models" / "soundstream_hubert_new.py" | |
| ] | |
| missing_files = [f for f in critical_files if not f.exists()] | |
| if missing_files: | |
| print(f"β οΈ Missing critical model files: {[f.name for f in missing_files]}") | |
| print("π Starting model download with git clone...") | |
| success = download_xcodec_models() | |
| if success: | |
| print("β Model download completed successfully!") | |
| else: | |
| print("β Model download failed. Vocoder functionality will not work.") | |
| return False | |
| else: | |
| print("β All critical model files are already present") | |
| return True | |
| if __name__ == "__main__": | |
| """ | |
| Run model download when script is executed directly | |
| """ | |
| print("π΅ MiloMusic Model Download Script") | |
| print("=" * 50) | |
| success = ensure_model_availability() | |
| if success: | |
| print("\nπ Setup complete! MiloMusic is ready to generate music.") | |
| else: | |
| print("\nβ οΈ Setup completed with warnings. Check the logs above.") | |
| exit(1) |