#!/usr/bin/env python3 """ Model Download Script for MiloMusic Downloads required model files from Hugging Face Hub at runtime to avoid storage limits. """ import os import subprocess import shutil from pathlib import Path def download_xcodec_models(): """Download xcodec_mini_infer using git clone (no LFS) + wget for large files""" # Base path for xcodec models - convert to absolute path to avoid working directory issues xcodec_base = Path("YuEGP/inference/xcodec_mini_infer").resolve() print("šŸ“„ Downloading xcodec_mini_infer using git clone + wget strategy...") try: # Remove existing directory if it exists if xcodec_base.exists(): print("šŸ—‘ļø Removing existing xcodec_mini_infer directory...") shutil.rmtree(xcodec_base) # Ensure parent directory exists os.makedirs(xcodec_base.parent, exist_ok=True) # Change to the parent directory for git clone original_cwd = os.getcwd() os.chdir(xcodec_base.parent) try: # Step 1: Clone repository structure without LFS files print("šŸ”„ Step 1: Cloning repository structure (no LFS)...") # Set environment variable to skip LFS files during clone env = os.environ.copy() env["GIT_LFS_SKIP_SMUDGE"] = "1" subprocess.run([ "git", "clone", "https://huggingface.co/m-a-p/xcodec_mini_infer", "xcodec_mini_infer" ], check=True, capture_output=True, text=True, timeout=300, env=env) print("āœ… Repository structure downloaded successfully") # Step 2: Download critical LFS files using wget print("šŸ”„ Step 2: Downloading critical LFS files with wget...") # Define critical LFS files and their download URLs lfs_files = [ { "path": "decoders/decoder_131000.pth", "url": "https://huggingface.co/m-a-p/xcodec_mini_infer/resolve/main/decoders/decoder_131000.pth", "description": "Vocal decoder model (70MB)" }, { "path": "decoders/decoder_151000.pth", "url": "https://huggingface.co/m-a-p/xcodec_mini_infer/resolve/main/decoders/decoder_151000.pth", "description": "Instrumental decoder model (70MB)" }, { "path": "final_ckpt/ckpt_00360000.pth", "url": "https://huggingface.co/m-a-p/xcodec_mini_infer/resolve/main/final_ckpt/ckpt_00360000.pth", "description": "Main checkpoint (1.3GB)" }, { "path": "semantic_ckpts/hf_1_325000/pytorch_model.bin", "url": "https://huggingface.co/m-a-p/xcodec_mini_infer/resolve/main/semantic_ckpts/hf_1_325000/pytorch_model.bin", "description": "Semantic model (361MB)" } ] # Change to the cloned directory os.chdir("xcodec_mini_infer") # Download each LFS file success_count = 0 for file_info in lfs_files: try: file_path = Path(file_info["path"]) print(f"šŸ“„ Downloading {file_info['description']}...") # Ensure directory exists os.makedirs(file_path.parent, exist_ok=True) # Download with wget subprocess.run([ "wget", "-O", str(file_path), file_info["url"] ], check=True, capture_output=True, text=True, timeout=1800) # 30min timeout for large files # Verify download if file_path.exists() and file_path.stat().st_size > 1024: # > 1KB print(f"āœ… Successfully downloaded {file_info['path']} ({file_path.stat().st_size // (1024*1024)}MB)") success_count += 1 else: print(f"āš ļø {file_info['path']} download appears incomplete") except subprocess.CalledProcessError as e: print(f"āŒ Failed to download {file_info['path']}: {e}") if e.stderr: print(f"Error details: {e.stderr[-500:]}...") # Last 500 chars except subprocess.TimeoutExpired: print(f"āŒ Download timeout for {file_info['path']} (large file)") except Exception as e: print(f"āŒ Unexpected error downloading {file_info['path']}: {e}") os.chdir("..") # Debug: Print current working directory and file locations print(f"šŸ” Current working directory: {os.getcwd()}") print(f"šŸ” Expected xcodec_base (absolute): {xcodec_base}") print(f"šŸ” xcodec_base exists: {xcodec_base.exists()}") # Additional debug: check if decoders directory exists decoders_dir = xcodec_base / "decoders" print(f"šŸ” Decoders directory: {decoders_dir}") print(f"šŸ” Decoders directory exists: {decoders_dir.exists()}") if decoders_dir.exists(): all_decoder_files = list(decoders_dir.iterdir()) print(f"šŸ” All files in decoders directory: {[f.name for f in all_decoder_files]}") print(f"šŸ” File sizes: {[(f.name, f.stat().st_size if f.is_file() else 'dir') for f in all_decoder_files]}") # Verify critical decoder files (minimum requirement) decoder_files = [ xcodec_base / "decoders" / "decoder_131000.pth", xcodec_base / "decoders" / "decoder_151000.pth" ] missing_decoders = [] for decoder_file in decoder_files: print(f"šŸ” Checking: {decoder_file}") exists = decoder_file.exists() print(f"šŸ” File exists: {exists}") if exists: size = decoder_file.stat().st_size print(f"šŸ” File size: {size} bytes ({size // (1024*1024)} MB)") if size < 1024: missing_decoders.append(decoder_file.name) print(f"āš ļø {decoder_file.name} is too small (likely an LFS pointer)") else: missing_decoders.append(decoder_file.name) print(f"āš ļø {decoder_file.name} does not exist") if missing_decoders: print(f"āŒ Critical decoder files missing or incomplete: {missing_decoders}") print("Vocoder functionality will not work without decoder files.") return False print(f"āœ… Successfully downloaded {success_count}/{len(lfs_files)} LFS files") print("āœ… All critical decoder files verified present and complete") return True finally: os.chdir(original_cwd) except subprocess.CalledProcessError as e: print(f"āŒ Git clone failed: {e}") if e.stdout: print(f"stdout: {e.stdout}") if e.stderr: print(f"stderr: {e.stderr}") return False except subprocess.TimeoutExpired: print("āŒ Git clone timed out") return False except Exception as e: print(f"āŒ Unexpected error: {e}") return False def ensure_model_availability(): """ Ensure all required models are available locally. Download them if they don't exist. """ xcodec_base = Path("YuEGP/inference/xcodec_mini_infer") # Check if critical files exist (both for recons and vocoder stages) critical_files = [ # Vocoder stage files xcodec_base / "decoders" / "decoder_131000.pth", xcodec_base / "decoders" / "decoder_151000.pth", xcodec_base / "decoders" / "config.yaml", # Recons stage files (critical for audio decoding) xcodec_base / "final_ckpt" / "ckpt_00360000.pth", xcodec_base / "final_ckpt" / "config.yaml", # Python modules xcodec_base / "models" / "soundstream_hubert_new.py" ] missing_files = [f for f in critical_files if not f.exists()] if missing_files: print(f"āš ļø Missing critical model files: {[f.name for f in missing_files]}") print("šŸš€ Starting model download with git clone...") success = download_xcodec_models() if success: print("āœ… Model download completed successfully!") else: print("āŒ Model download failed. Vocoder functionality will not work.") return False else: print("āœ… All critical model files are already present") return True if __name__ == "__main__": """ Run model download when script is executed directly """ print("šŸŽµ MiloMusic Model Download Script") print("=" * 50) success = ensure_model_availability() if success: print("\nšŸŽ‰ Setup complete! MiloMusic is ready to generate music.") else: print("\nāš ļø Setup completed with warnings. Check the logs above.") exit(1)