Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Hugging Face Space (Gradio) App: Video -> Audio -> Whisper Transkript (+ Downloads SRT/TXT/VTT/JSON) | |
| Rechtlicher Hinweis: | |
| - Verwende diese App nur für eigene Inhalte oder Inhalte, für die du explizit die Erlaubnis hast. | |
| - Respektiere Urheberrecht und die Terms of Service der jeweiligen Plattformen. | |
| Benötigt: | |
| - ffmpeg (systemweit) | |
| - Python-Pakete siehe requirements.txt | |
| """ | |
| import os | |
| import subprocess | |
| import tempfile | |
| import json | |
| from pathlib import Path | |
| from datetime import timedelta | |
| import gradio as gr | |
| # Versuch, whisper zu importieren (installiert via requirements.txt as git+repo) | |
| try: | |
| import whisper | |
| except Exception as e: | |
| whisper = None | |
| # Hilfsfunktionen ---------------------------------------------------------- | |
| def run(cmd, hide_output=False): | |
| """Run shell command, raise on error.""" | |
| if hide_output: | |
| subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| else: | |
| subprocess.run(cmd, check=True) | |
| def download_video_with_ytdlp(url: str, out_dir: str) -> str: | |
| """Download best video using yt-dlp into out_dir, return filepath""" | |
| out_template = str(Path(out_dir) / "%(title)s.%(ext)s") | |
| cmd = ["yt-dlp", "-f", "best", "-o", out_template, url] | |
| run(cmd) | |
| # pick most recently modified file | |
| files = sorted(Path(out_dir).glob("*"), key=lambda p: p.stat().st_mtime, reverse=True) | |
| if not files: | |
| raise FileNotFoundError("Download erfolglos — keine Datei gefunden.") | |
| return str(files[0]) | |
| def extract_audio_ffmpeg(video_path: str, out_wav: str): | |
| """Extract mono 16k WAV for Whisper""" | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", video_path, | |
| "-vn", | |
| "-ac", "1", | |
| "-ar", "16000", | |
| "-f", "wav", | |
| out_wav | |
| ] | |
| run(cmd, hide_output=True) | |
| return out_wav | |
| def seconds_to_timestamp(s: float, always_ms: bool = True) -> str: | |
| """Convert seconds (float) to SRT/VTT time format HH:MM:SS,mmm""" | |
| td = timedelta(seconds=float(s)) | |
| total_seconds = int(td.total_seconds()) | |
| hours = total_seconds // 3600 | |
| minutes = (total_seconds % 3600) // 60 | |
| seconds = total_seconds % 60 | |
| milliseconds = int(td.microseconds / 1000 + (td.seconds - int(td.seconds)) * 1000) | |
| # Better approach using fractional part: | |
| frac = s - int(s) | |
| ms = int(round((s - int(s)) * 1000)) if s >= 0 else 0 | |
| return f"{hours:02d}:{minutes:02d}:{seconds:02d},{ms:03d}" | |
| def format_timestamp_vtt(s: float) -> str: | |
| td = timedelta(seconds=float(s)) | |
| total_seconds = int(td.total_seconds()) | |
| hours = total_seconds // 3600 | |
| minutes = (total_seconds % 3600) // 60 | |
| seconds = total_seconds % 60 | |
| ms = int(round((s - int(s)) * 1000)) | |
| return f"{hours:02d}:{minutes:02d}:{seconds:02d}.{ms:03d}" | |
| def segments_to_srt(segments): | |
| """Create SRT string from whisper segments""" | |
| parts = [] | |
| for i, seg in enumerate(segments, start=1): | |
| start = seconds_to_timestamp(seg['start']) | |
| end = seconds_to_timestamp(seg['end']) | |
| text = seg['text'].strip() | |
| parts.append(f"{i}\n{start} --> {end}\n{text}\n") | |
| return "\n".join(parts) | |
| def segments_to_vtt(segments): | |
| """Create VTT string from whisper segments""" | |
| parts = ["WEBVTT\n"] | |
| for seg in segments: | |
| start = format_timestamp_vtt(seg['start']) | |
| end = format_timestamp_vtt(seg['end']) | |
| text = seg['text'].strip() | |
| parts.append(f"{start} --> {end}\n{text}\n") | |
| return "\n".join(parts) | |
| def segments_to_txt(segments): | |
| """Create plain TXT with timestamps per segment""" | |
| lines = [] | |
| for seg in segments: | |
| start = seconds_to_timestamp(seg['start']) | |
| text = seg['text'].strip() | |
| lines.append(f"[{start}] {text}") | |
| return "\n".join(lines) | |
| def segments_to_json(segments, language=None, metadata=None): | |
| obj = { | |
| "language": language, | |
| "segments": segments | |
| } | |
| if metadata: | |
| obj["metadata"] = metadata | |
| return json.dumps(obj, ensure_ascii=False, indent=2) | |
| # Haupt-Workflow ---------------------------------------------------------- | |
| def transcribe_pipeline(file_obj, url, model_size, keep_video=False): | |
| """ | |
| file_obj: uploaded file (temp path) or None | |
| url: optional URL to download via yt-dlp | |
| model_size: whisper model size | |
| """ | |
| if whisper is None: | |
| return "Fehler: lokales whisper nicht verfügbar. Stelle sicher, dass das Repo installiert ist.", None, None, None, None, None | |
| tmpdir = tempfile.mkdtemp(prefix="whisper_space_") | |
| try: | |
| # 1) Get video path either from uploaded file or by downloading URL | |
| if url: | |
| video_path = download_video_with_ytdlp(url, tmpdir) | |
| elif file_obj: | |
| # file_obj is a tuple (name, file-like) or a path depending on Gradio version. | |
| # Gradio typically supplies a filesystem path. | |
| if isinstance(file_obj, str) and os.path.exists(file_obj): | |
| video_path = file_obj | |
| else: | |
| # try to write content to temp file | |
| uploaded_path = Path(tmpdir) / Path(getattr(file_obj, "name", "upload")).name | |
| with open(uploaded_path, "wb") as f: | |
| # file_obj may be a SpooledTemporaryFile or similar with .read() | |
| f.write(file_obj.read()) | |
| video_path = str(uploaded_path) | |
| else: | |
| return "Kein Video angegeben (weder Datei noch URL).", None, None, None, None, None | |
| # 2) Extract audio | |
| audio_wav = str(Path(tmpdir) / "audio.wav") | |
| extract_audio_ffmpeg(video_path, audio_wav) | |
| # 3) Load whisper model and transcribe | |
| model = whisper.load_model(model_size) | |
| # transcribe: get segments to generate SRT/VTT etc. | |
| result = model.transcribe(audio_wav, verbose=False) | |
| segments = result.get("segments", []) | |
| language = result.get("language", None) | |
| # 4) Create output strings | |
| srt_text = segments_to_srt(segments) | |
| vtt_text = segments_to_vtt(segments) | |
| txt_text = segments_to_txt(segments) | |
| json_text = segments_to_json(segments, language=language, metadata={"model": model_size}) | |
| # 5) Save files to tmpdir for download via Gradio | |
| out_files = {} | |
| base_name = Path(video_path).stem | |
| files_map = { | |
| f"{base_name}.srt": srt_text, | |
| f"{base_name}.vtt": vtt_text, | |
| f"{base_name}.txt": txt_text, | |
| f"{base_name}.json": json_text | |
| } | |
| for fname, content in files_map.items(): | |
| path = Path(tmpdir) / fname | |
| path.write_text(content, encoding="utf-8") | |
| out_files[fname] = str(path) | |
| # 6) prepare display text with timestamps for UI (simple combined view) | |
| display_lines = [] | |
| for seg in segments: | |
| start = seconds_to_timestamp(seg['start']) | |
| display_lines.append(f"[{start}] {seg['text'].strip()}") | |
| display_text = "\n".join(display_lines) | |
| # Optionally remove video to save space | |
| if not keep_video and url: | |
| try: | |
| os.remove(video_path) | |
| except Exception: | |
| pass | |
| return display_text, out_files[f"{base_name}.srt"], out_files[f"{base_name}.vtt"], out_files[f"{base_name}.txt"], out_files[f"{base_name}.json"], f"Model: {model_size}, Language: {language}" | |
| except Exception as e: | |
| return f"Fehler während Verarbeitung: {e}", None, None, None, None, None | |
| finally: | |
| # Do not delete tmpdir immediately if the user wants to download |