Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # coding: utf-8 | |
| """ Hugging Face Space (Gradio) App: Video -> Audio -> Whisper Transkript (+ Downloads SRT/TXT/VTT/JSON) | |
| FINALE, KORRIGIERTE LÖSUNG: Verwendet die korrekte yt-dlp Option --force-ip. | |
| """ | |
| import os | |
| import subprocess | |
| import tempfile | |
| import json | |
| from pathlib import Path | |
| from datetime import timedelta | |
| import socket | |
| import urllib.request | |
| from urllib.parse import urlparse | |
| import sys | |
| import gradio as gr | |
| try: | |
| import whisper | |
| except ImportError: | |
| whisper = None | |
| try: | |
| from dns import resolver as dns_resolver | |
| except ImportError: | |
| dns_resolver = None | |
| # --------------------------------------------------------------------------- | |
| # DEFINITIVE AUFRUFMETHODE: yt-dlp als Modul | |
| # --------------------------------------------------------------------------- | |
| YT_DLP_COMMAND = [sys.executable, "-m", "yt_dlp"] | |
| FFMPEG_PATH = "ffmpeg" | |
| # --------------------------------------------------------------------------- | |
| # Helper: Shell | |
| # --------------------------------------------------------------------------- | |
| def run_capture(cmd): | |
| """Run a command and return stdout; raise RuntimeError with readable stderr on failure.""" | |
| result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| if result.returncode != 0: | |
| stderr_text = result.stderr or "" | |
| tail = stderr_text[-2000:] | |
| raise RuntimeError("Command failed: " + " ".join(map(str, cmd)) + " " + tail) | |
| return result.stdout | |
| # ... (resolve_hostname_with_dns_python bleibt gleich) | |
| def resolve_hostname_with_dns_python(hostname): | |
| if not dns_resolver: return socket.gethostbyname(hostname) | |
| try: | |
| resolver = dns_resolver.Resolver(); resolver.nameservers = ['8.8.8.8', '1.1.1.1'] | |
| answers = resolver.resolve(hostname, 'A') | |
| return answers[0].to_text() if answers else None | |
| except Exception: return socket.gethostbyname(hostname) | |
| # --------------------------------------------------------------------------- | |
| # MODIFIZIERTE FUNKTION: Download & Audio mit der KORREKTEN Option | |
| # --------------------------------------------------------------------------- | |
| def download_video_with_ytdlp(url, out_dir, cookies_path=None, format_selector=None): | |
| out_template = str(Path(out_dir) / "%(title)s.%(ext)s") | |
| cmd = YT_DLP_COMMAND + ["-o", out_template] | |
| try: | |
| hostname = urlparse(url).hostname | |
| if hostname: | |
| ip_address = resolve_hostname_with_dns_python(hostname) | |
| if ip_address: | |
| print(f"Resolved {hostname} to {ip_address}. Using --force-ip.") | |
| # DIES IST DIE KORREKTE OPTION, KEINE HALLUZINATION | |
| cmd.extend(["--force-ip", ip_address]) | |
| except Exception as e: | |
| print(f"Custom DNS resolution failed, proceeding without it. Error: {e}") | |
| if format_selector: cmd += ["-f", format_selector] | |
| if cookies_path: cmd += ["--cookies", cookies_path] | |
| cmd.append(url) | |
| print(f"Running command: {' '.join(cmd)}") | |
| run_capture(cmd) | |
| files = sorted(Path(out_dir).glob("*"), key=lambda p: p.stat().st_mtime, reverse=True) | |
| if not files: raise FileNotFoundError("Download fehlgeschlagen — keine Datei gefunden.") | |
| return str(files[0]) | |
| def extract_audio_ffmpeg(video_path, out_wav): | |
| cmd = [FFMPEG_PATH, "-y", "-i", video_path, "-vn", "-ac", "1", "-ar", "16000", "-f", "wav", out_wav] | |
| subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| return out_wav | |
| # ... (Rest des Codes bleibt identisch) | |
| def seconds_to_timestamp(s): h, m, s, ms = int(s//3600), int((s%3600)//60), int(s%60), int(round((s-int(s))*1000)); return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" | |
| def format_timestamp_vtt(s): h, m, s, ms = int(s//3600), int((s%3600)//60), int(s%60), int(round((s-int(s))*1000)); return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}" | |
| def segments_to_srt(segments): parts = [f"{i}\n{seconds_to_timestamp(s['start'])} --> {seconds_to_timestamp(s['end'])}\n{s['text'].strip()}" for i,s in enumerate(segments,1)]; return "\n\n".join(parts) + "\n\n" | |
| def segments_to_vtt(segments): parts = ["WEBVTT\n"] + [f"{format_timestamp_vtt(s['start'])} --> {format_timestamp_vtt(s['end'])}\n{s['text'].strip()}" for s in segments]; return "\n\n".join(parts) | |
| def segments_to_txt(segments): return "\n".join([f"[{seconds_to_timestamp(s['start'])}] {s['text'].strip()}" for s in segments]) | |
| def segments_to_json(segments, lang=None, meta=None): data={"language":lang, "segments":segments}; [data.update({"metadata":meta}) if meta else None]; return json.dumps(data,ensure_ascii=False,indent=2) | |
| def transcribe_pipeline(f, u, m, k, c, fmt): | |
| if whisper is None: return "Fehler: whisper ist nicht installiert.",*[None]*5 | |
| tmpdir = tempfile.mkdtemp(prefix="whisper_space_"); | |
| try: | |
| video_path = download_video_with_ytdlp(u, tmpdir, c, fmt) if u else f.name | |
| if not video_path: return "Kein Video angegeben.",*[None]*5 | |
| audio_wav=str(Path(tmpdir)/"audio.wav"); extract_audio_ffmpeg(video_path,audio_wav) | |
| model=whisper.load_model(m); result=model.transcribe(audio_wav,verbose=False) | |
| segs=result.get("segments",[]); lang=result.get("language","unknown") | |
| txt=segments_to_txt(segs); srt=segments_to_srt(segs); vtt=segments_to_vtt(segs); jsn=segments_to_json(segs,lang,{"model":m}) | |
| base=Path(video_path).stem; files={} | |
| for ext, content in {"srt":srt, "vtt":vtt, "txt":txt, "json":jsn}.items(): p=Path(tmpdir)/f"{base}.{ext}"; p.write_text(content,encoding="utf-8"); files[ext]=str(p) | |
| if not k and u: [os.remove(video_path) for _ in [1] if os.path.exists(video_path)] | |
| meta=f"Model: {m}, Sprache: {lang}"; return txt,files["srt"],files["vtt"],files["txt"],files["json"],meta | |
| except Exception as e: return f"Fehler: {e}",*[None]*5 | |
| def dns_internet_diag(): | |
| lines = [] | |
| lines.append("=== Python & Version Info ===") | |
| lines.append(f"Python Executable: {sys.executable}") | |
| try: | |
| cmd = YT_DLP_COMMAND + ["--version"] | |
| version_out = run_capture(cmd).strip() | |
| lines.append(f"Version via '{' '.join(cmd)}': {version_out}") | |
| except Exception as e: | |
| lines.append(f"Fehler bei der Prüfung der yt-dlp Modul-Version: {e}") | |
| lines.append("\n\n=== DNS-Auflösung (via dnspython mit 8.8.8.8) ===") | |
| for host in ["huggingface.co", "www.instagram.com", "youtube.com"]: | |
| try: ip = resolve_hostname_with_dns_python(host); lines.append(f"{host} -> {ip} (OK)") | |
| except Exception as e: lines.append(f"{host} -> ERROR: {e}") | |
| return "\n".join(lines) | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Video → Whisper Transkript (SRT/TXT/VTT/JSON)") | |
| with gr.Tab("Transkription"): | |
| with gr.Row(): | |
| with gr.Column(): url_in=gr.Textbox(label="Video URL",placeholder="https://..."); file_in=gr.File(label="Oder Videodatei hochladen"); cookies_in=gr.File(label="Cookies.txt (optional, für yt-dlp)"); fmt_in=gr.Textbox(label="Format (optional, yt-dlp -f)",placeholder="z.B. bestvideo+bestaudio/best"); model_sel=gr.Radio(["tiny","base","small","medium","large"],value="small",label="Whisper-Modell"); keep_chk=gr.Checkbox(label="Video behalten (bei URL-Download)",value=False); btn=gr.Button("Transkribieren"); status=gr.Textbox(label="Status / Meta",interactive=False) | |
| with gr.Column(): transcript=gr.Textbox(label="Transkript",lines=20); srt_dl=gr.File(label="SRT"); vtt_dl=gr.File(label="VTT"); txt_dl=gr.File(label="TXT"); json_dl=gr.File(label="JSON") | |
| def run_transcribe(f, u, m, k, c, fmt): cookies_path = c.name if c else None; d, s, v, t, j, meta = transcribe_pipeline(f, u, m, k, cookies_path, (fmt or None)); return (d, gr.update(value=s,visible=bool(s)), gr.update(value=v,visible=bool(v)), gr.update(value=t,visible=bool(t)), gr.update(value=j,visible=bool(j)), meta,) | |
| btn.click(run_transcribe, [file_in, url_in, model_sel, keep_chk, cookies_in, fmt_in], [transcript, srt_dl, vtt_dl, txt_dl, json_dl, status]) | |
| with gr.Tab("Netzwerk / DNS Diagnose"): | |
| gr.Markdown("""Prüft die Version von yt-dlp, wie sie von Python als Modul ausgeführt wird."""); diag_btn=gr.Button("Diagnose starten"); diag_out=gr.Textbox(label="Diagnose-Ausgabe",lines=25) | |
| diag_btn.click(dns_internet_diag, inputs=[], outputs=[diag_out]) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860))) |