Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # coding: utf-8 | |
| """ Hugging Face Space (Gradio) App: Video -> Audio -> Whisper Transkript (+ Downloads SRT/TXT/VTT/JSON) | |
| ENDGÜLTIGE LÖSUNG: Verwendet die präzise yt-dlp Option --force-ipv4, um DNS-Probleme zu umgehen. | |
| """ | |
| import os | |
| import subprocess | |
| import tempfile | |
| import json | |
| from pathlib import Path | |
| from datetime import timedelta | |
| import socket | |
| import urllib.request | |
| from urllib.parse import urlparse | |
| import sys | |
| import gradio as gr | |
| try: | |
| import whisper | |
| except ImportError: | |
| whisper = None | |
| try: | |
| from dns import resolver as dns_resolver | |
| except ImportError: | |
| dns_resolver = None | |
| # --------------------------------------------------------------------------- | |
| # DEFINITIVE AUFRUFMETHODE: yt-dlp als Modul, um PATH-Konflikte zu vermeiden | |
| # --------------------------------------------------------------------------- | |
| YT_DLP_COMMAND = [sys.executable, "-m", "yt_dlp"] | |
| FFMPEG_PATH = "ffmpeg" | |
| # --------------------------------------------------------------------------- | |
| # Helper: Shell | |
| # --------------------------------------------------------------------------- | |
| def run_capture(cmd): | |
| """Run a command and return stdout; raise RuntimeError with readable stderr on failure.""" | |
| result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| if result.returncode != 0: | |
| stderr_text = result.stderr or "" | |
| tail = stderr_text[-2000:] | |
| raise RuntimeError("Command failed: " + " ".join(map(str, cmd)) + " " + tail) | |
| return result.stdout | |
| # --------------------------------------------------------------------------- | |
| # DNS-Helfer | |
| # --------------------------------------------------------------------------- | |
| def resolve_hostname_with_dns_python(hostname): | |
| """Resolves a hostname to an IPv4 address using a public DNS server.""" | |
| if not dns_resolver: | |
| # Fallback auf System-DNS, wenn dnspython nicht installiert ist | |
| return socket.gethostbyname(hostname) | |
| try: | |
| resolver = dns_resolver.Resolver() | |
| resolver.nameservers = ['8.8.8.8', '1.1.1.1'] # Google & Cloudflare DNS | |
| # Explizit nach A-Record (IPv4) fragen, passend zu --force-ipv4 | |
| answers = resolver.resolve(hostname, 'A') | |
| return answers[0].to_text() if answers else None | |
| except Exception: | |
| # Wenn der externe DNS fehlschlägt, versuchen wir es als letzte Rettung mit dem System-DNS | |
| return socket.gethostbyname(hostname) | |
| # --------------------------------------------------------------------------- | |
| # Download & Audio Extraktion | |
| # --------------------------------------------------------------------------- | |
| def download_video_with_ytdlp(url, out_dir, cookies_path=None, format_selector=None): | |
| """Downloads a video using yt-dlp, bypassing DNS blocks with --force-ipv4.""" | |
| out_template = str(Path(out_dir) / "%(title)s.%(ext)s") | |
| cmd = YT_DLP_COMMAND + ["-o", out_template] | |
| try: | |
| hostname = urlparse(url).hostname | |
| if hostname: | |
| ip_address = resolve_hostname_with_dns_python(hostname) | |
| if ip_address: | |
| print(f"Resolved {hostname} to IPv4 {ip_address}. Using --force-ipv4.") | |
| # DIES IST DIE KORREKTE, PRÄZISE OPTION. | |
| cmd.extend(["--force-ipv4", ip_address]) | |
| except Exception as e: | |
| print(f"Custom DNS resolution failed, proceeding without it. Error: {e}") | |
| if format_selector: cmd += ["-f", format_selector] | |
| if cookies_path: cmd += ["--cookies", cookies_path] | |
| cmd.append(url) | |
| print(f"Running command: {' '.join(cmd)}") | |
| run_capture(cmd) | |
| files = sorted(Path(out_dir).glob("*"), key=lambda p: p.stat().st_mtime, reverse=True) | |
| if not files: raise FileNotFoundError("Download fehlgeschlagen — keine Datei gefunden.") | |
| return str(files[0]) | |
| def extract_audio_ffmpeg(video_path, out_wav): | |
| """Extracts a 16kHz mono WAV audio track from a video file.""" | |
| cmd = [FFMPEG_PATH, "-y", "-i", video_path, "-vn", "-ac", "1", "-ar", "16000", "-f", "wav", out_wav] | |
| subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| return out_wav | |
| # --------------------------------------------------------------------------- | |
| # Zeit- und Untertitel-Formatierer | |
| # --------------------------------------------------------------------------- | |
| def seconds_to_timestamp(s): | |
| h, m, s, ms = int(s//3600), int((s%3600)//60), int(s%60), int(round((s-int(s))*1000)) | |
| return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" | |
| def format_timestamp_vtt(s): | |
| h, m, s, ms = int(s//3600), int((s%3600)//60), int(s%60), int(round((s-int(s))*1000)) | |
| return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}" | |
| def segments_to_srt(segments): | |
| parts = [f"{i}\n{seconds_to_timestamp(s['start'])} --> {seconds_to_timestamp(s['end'])}\n{s['text'].strip()}" for i,s in enumerate(segments,1)] | |
| return "\n\n".join(parts) + "\n\n" | |
| def segments_to_vtt(segments): | |
| parts = ["WEBVTT\n"] + [f"{format_timestamp_vtt(s['start'])} --> {format_timestamp_vtt(s['end'])}\n{s['text'].strip()}" for s in segments] | |
| return "\n\n".join(parts) | |
| def segments_to_txt(segments): | |
| return "\n".join([f"[{seconds_to_timestamp(s['start'])}] {s['text'].strip()}" for s in segments]) | |
| def segments_to_json(segments, lang=None, meta=None): | |
| data={"language":lang, "segments":segments} | |
| if meta: data.update({"metadata":meta}) | |
| return json.dumps(data,ensure_ascii=False,indent=2) | |
| # --------------------------------------------------------------------------- | |
| # Kern-Pipeline: Transkription | |
| # --------------------------------------------------------------------------- | |
| def transcribe_pipeline(file_obj, url, model_size, keep_video, cookies_file, format_selector): | |
| if whisper is None: return "Fehler: whisper ist nicht installiert.",*[None]*5 | |
| tmpdir = tempfile.mkdtemp(prefix="whisper_space_"); | |
| try: | |
| if url: | |
| video_path = download_video_with_ytdlp(url, tmpdir, cookies_file, format_selector) | |
| elif file_obj: | |
| video_path = file_obj.name | |
| else: | |
| return "Kein Video angegeben.",*[None]*5 | |
| audio_wav=str(Path(tmpdir)/"audio.wav") | |
| extract_audio_ffmpeg(video_path,audio_wav) | |
| model=whisper.load_model(model_size) | |
| result=model.transcribe(audio_wav,verbose=False) | |
| segs=result.get("segments",[]) | |
| lang=result.get("language","unknown") | |
| txt=segments_to_txt(segs) | |
| srt=segments_to_srt(segs) | |
| vtt=segments_to_vtt(segs) | |
| jsn=segments_to_json(segs,lang,{"model":model_size}) | |
| base=Path(video_path).stem | |
| files={} | |
| for ext, content in {"srt":srt, "vtt":vtt, "txt":txt, "json":jsn}.items(): | |
| p=Path(tmpdir)/f"{base}.{ext}" | |
| p.write_text(content,encoding="utf-8") | |
| files[ext]=str(p) | |
| if not keep_video and url: | |
| try: os.remove(video_path) | |
| except OSError: pass | |
| meta=f"Model: {model_size}, Sprache: {lang}" | |
| return txt, files["srt"], files["vtt"], files["txt"], files["json"], meta | |
| except Exception as e: | |
| # Räume im Fehlerfall das temporäre Verzeichnis auf | |
| # shutil.rmtree(tmpdir, ignore_errors=True) | |
| return f"Fehler: {e}",*[None]*5 | |
| # --------------------------------------------------------------------------- | |
| # Netzwerk-Diagnose-Tab | |
| # --------------------------------------------------------------------------- | |
| def dns_internet_diag(): | |
| lines = [] | |
| lines.append("=== Python & Version Info ===") | |
| lines.append(f"Python Executable: {sys.executable}") | |
| try: | |
| # Führe yt-dlp als Modul aus, um die ECHTE, von pip installierte Version zu prüfen | |
| cmd = YT_DLP_COMMAND + ["--version"] | |
| version_out = run_capture(cmd).strip() | |
| lines.append(f"Version via '{' '.join(cmd)}': {version_out}") | |
| except Exception as e: | |
| lines.append(f"Fehler bei der Prüfung der yt-dlp Modul-Version: {e}") | |
| lines.append("\n\n=== DNS-Auflösung (via dnspython mit 8.8.8.8) ===") | |
| for host in ["huggingface.co", "www.instagram.com", "youtube.com"]: | |
| try: | |
| ip = resolve_hostname_with_dns_python(host) | |
| lines.append(f"{host} -> {ip} (OK)") | |
| except Exception as e: | |
| lines.append(f"{host} -> ERROR: {e}") | |
| return "\n".join(lines) | |
| # --------------------------------------------------------------------------- | |
| # Gradio UI | |
| # --------------------------------------------------------------------------- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Video → Whisper Transkript (SRT/TXT/VTT/JSON)") | |
| with gr.Tab("Transkription"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| url_in=gr.Textbox(label="Video URL",placeholder="https://...") | |
| file_in=gr.File(label="Oder Videodatei hochladen") | |
| cookies_in=gr.File(label="Cookies.txt (optional, für yt-dlp)") | |
| fmt_in=gr.Textbox(label="Format (optional, yt-dlp -f)",placeholder="z.B. bestvideo+bestaudio/best") | |
| model_sel=gr.Radio(["tiny","base","small","medium","large"],value="small",label="Whisper-Modell") | |
| keep_chk=gr.Checkbox(label="Video behalten (bei URL-Download)",value=False) | |
| btn=gr.Button("Transkribieren") | |
| status=gr.Textbox(label="Status / Meta",interactive=False) | |
| with gr.Column(): | |
| transcript=gr.Textbox(label="Transkript",lines=20) | |
| srt_dl=gr.File(label="SRT") | |
| vtt_dl=gr.File(label="VTT") | |
| txt_dl=gr.File(label="TXT") | |
| json_dl=gr.File(label="JSON") | |
| def run_transcribe_wrapper(f, u, m, k, c, fmt): | |
| cookies_path = c.name if c else None | |
| display, srt, vtt, txt, jsn, meta = transcribe_pipeline(f, u, m, k, cookies_path, (fmt or None)) | |
| return ( | |
| display, | |
| gr.update(value=srt, visible=bool(srt)), | |
| gr.update(value=vtt, visible=bool(vtt)), | |
| gr.update(value=txt, visible=bool(txt)), | |
| gr.update(value=jsn, visible=bool(jsn)), | |
| meta, | |
| ) | |
| btn.click( | |
| run_transcribe_wrapper, | |
| [file_in, url_in, model_sel, keep_chk, cookies_in, fmt_in], | |
| [transcript, srt_dl, vtt_dl, txt_dl, json_dl, status] | |
| ) | |
| with gr.Tab("Netzwerk / DNS Diagnose"): | |
| gr.Markdown("""Prüft die Version von yt-dlp, wie sie von Python als Modul ausgeführt wird, und testet die DNS-Auflösung.""") | |
| diag_btn=gr.Button("Diagnose starten") | |
| diag_out=gr.Textbox(label="Diagnose-Ausgabe",lines=25) | |
| diag_btn.click(dns_internet_diag, inputs=[], outputs=[diag_out]) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860))) |