Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # coding: utf-8 | |
| """ Hugging Face Space (Gradio) App: Video -> Audio -> Whisper Transkript (+ Downloads SRT/TXT/VTT/JSON) | |
| Tab 1: Transkription | |
| - Video per URL (yt-dlp) oder Upload | |
| - Audio-Extraktion via ffmpeg | |
| - Transkription mit Whisper (lokal) | |
| - Downloads: SRT, VTT, TXT, JSON | |
| Tab 2: Netzwerk / DNS Diagnose | |
| - Testet DNS-Auflösung für mehrere Hosts | |
| - Testet HTTP-Requests auf Basis-URLs | |
| - Zeigt Version/Verfügbarkeit von yt-dlp und ffmpeg | |
| Hinweis: Verwende diese App nur für eigene oder freigegebene Inhalte. """ | |
| import os | |
| import subprocess | |
| import tempfile | |
| import json | |
| from pathlib import Path | |
| from datetime import timedelta | |
| import socket | |
| import urllib.request | |
| from urllib.parse import urlparse | |
| import gradio as gr | |
| try: | |
| import whisper | |
| except ImportError: | |
| whisper = None | |
| try: | |
| from dns import resolver as dns_resolver | |
| except ImportError: | |
| dns_resolver = None | |
| # --------------------------------------------------------------------------- | |
| # Helper: Shell | |
| # --------------------------------------------------------------------------- | |
| def run_capture(cmd): | |
| """Run a command and return stdout; raise RuntimeError with readable stderr on failure.""" | |
| result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| if result.returncode != 0: | |
| stderr_text = result.stderr or "" | |
| tail = stderr_text[-2000:] | |
| raise RuntimeError("Command failed: " + " ".join(cmd) + " " + tail) | |
| return result.stdout | |
| # --------------------------------------------------------------------------- | |
| # NEUE FUNKTION: DNS-Auflösung via dnspython | |
| # --------------------------------------------------------------------------- | |
| def resolve_hostname_with_dns_python(hostname): | |
| """Resolves a hostname using a public DNS server to bypass local DNS blocks.""" | |
| if not dns_resolver: | |
| # Fallback auf System-DNS, wenn dnspython nicht installiert ist | |
| print("Warning: dnspython not found. Falling back to system DNS.") | |
| return socket.gethostbyname(hostname) | |
| try: | |
| resolver = dns_resolver.Resolver() | |
| resolver.nameservers = ['8.8.8.8', '1.1.1.1'] # Google & Cloudflare DNS | |
| answers = resolver.resolve(hostname, 'A') | |
| if answers: | |
| return answers[0].to_text() | |
| except Exception as e: | |
| print(f"DNS resolution with dnspython failed for {hostname}: {e}") | |
| # Als letzten Ausweg versuchen wir es mit dem System-Resolver | |
| try: | |
| return socket.gethostbyname(hostname) | |
| except Exception as se: | |
| raise se # Den ursprünglichen Systemfehler auslösen | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # MODIFIZIERTE FUNKTION: Download & Audio | |
| # --------------------------------------------------------------------------- | |
| def download_video_with_ytdlp(url, out_dir, cookies_path=None, format_selector=None): | |
| """Download a video with yt-dlp, using custom DNS resolution if necessary.""" | |
| out_template = str(Path(out_dir) / "%(title)s.%(ext)s") | |
| cmd = ["yt-dlp", "-o", out_template] | |
| # DNS-Umgehung implementieren | |
| try: | |
| parsed_url = urlparse(url) | |
| hostname = parsed_url.hostname | |
| if hostname: | |
| print(f"Resolving hostname: {hostname}") | |
| ip_address = resolve_hostname_with_dns_python(hostname) | |
| if ip_address: | |
| print(f"Resolved {hostname} to {ip_address}. Using --resolve.") | |
| # --resolve weist yt-dlp an, diese IP für den Hostnamen auf Port 443 zu verwenden | |
| resolve_arg = f"{hostname}:443:{ip_address}" | |
| cmd.extend(["--resolve", resolve_arg]) | |
| except Exception as e: | |
| print(f"Could not perform custom DNS resolution, proceeding without it. Error: {e}") | |
| if format_selector: | |
| cmd += ["-f", format_selector] | |
| if cookies_path: | |
| cmd += ["--cookies", cookies_path] | |
| cmd.append(url) | |
| print(f"Running command: {' '.join(cmd)}") | |
| try: | |
| run_capture(cmd) | |
| except RuntimeError as e: | |
| msg = str(e) | |
| if "Failed to resolve" in msg or "Name or service not known" in msg: | |
| raise RuntimeError( | |
| "DNS/Internet-Problem: Der Host konnte nicht aufgelöst werden. " | |
| "Selbst die DNS-Umgehung ist fehlgeschlagen. Möglicherweise blockiert eine Firewall auch die IP-Adressen." | |
| ) | |
| raise | |
| files = sorted(Path(out_dir).glob("*"), key=lambda p: p.stat().st_mtime, reverse=True) | |
| if not files: | |
| raise FileNotFoundError("Download fehlgeschlagen — keine Datei gefunden.") | |
| return str(files[0]) | |
| def extract_audio_ffmpeg(video_path, out_wav): | |
| cmd = ["ffmpeg", "-y", "-i", video_path, "-vn", "-ac", "1", "-ar", "16000", "-f", "wav", out_wav] | |
| subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| return out_wav | |
| # ... (der Rest des Codes von "Zeit- und Format-Helfer" bis zum Ende bleibt identisch) | |
| # Ich füge ihn hier zur Vollständigkeit ein. | |
| # --------------------------------------------------------------------------- | |
| # Zeit- und Format-Helfer | |
| # --------------------------------------------------------------------------- | |
| def seconds_to_timestamp(s): | |
| hours = int(s // 3600) | |
| minutes = int((s % 3600) // 60) | |
| seconds = int(s % 60) | |
| ms = int(round((s - int(s)) * 1000)) | |
| return f"{hours:02d}:{minutes:02d}:{seconds:02d},{ms:03d}" | |
| def format_timestamp_vtt(s): | |
| hours = int(s // 3600) | |
| minutes = int((s % 3600) // 60) | |
| seconds = int(s % 60) | |
| ms = int(round((s - int(s)) * 1000)) | |
| return f"{hours:02d}:{minutes:02d}:{seconds:02d}.{ms:03d}" | |
| def segments_to_srt(segments): | |
| parts = [] | |
| for i, seg in enumerate(segments, start=1): | |
| start = seconds_to_timestamp(seg['start']) | |
| end = seconds_to_timestamp(seg['end']) | |
| text = seg['text'].strip() | |
| parts.append(f"{i}\n{start} --> {end}\n{text}") | |
| return "\n\n".join(parts) + "\n\n" | |
| def segments_to_vtt(segments): | |
| parts = ["WEBVTT\n"] | |
| for seg in segments: | |
| start = format_timestamp_vtt(seg['start']) | |
| end = format_timestamp_vtt(seg['end']) | |
| text = seg['text'].strip() | |
| parts.append(f"{start} --> {end}\n{text}") | |
| return "\n\n".join(parts) | |
| def segments_to_txt(segments): | |
| return "\n".join([f"[{seconds_to_timestamp(seg['start'])}] {seg['text'].strip()}" for seg in segments]) | |
| def segments_to_json(segments, language=None, metadata=None): | |
| data = {"language": language, "segments": segments} | |
| if metadata: | |
| data["metadata"] = metadata | |
| return json.dumps(data, ensure_ascii=False, indent=2) | |
| # --------------------------------------------------------------------------- | |
| # Kern-Pipeline: Transkription | |
| # --------------------------------------------------------------------------- | |
| def transcribe_pipeline(file_obj, url, model_size, keep_video=False, cookies_file=None, format_selector=None): | |
| if whisper is None: | |
| return "Fehler: whisper ist nicht installiert.", None, None, None, None, None | |
| tmpdir = tempfile.mkdtemp(prefix="whisper_space_") | |
| try: | |
| if url: | |
| video_path = download_video_with_ytdlp(url, tmpdir, cookies_path=cookies_file, format_selector=format_selector) | |
| elif file_obj: | |
| video_path = file_obj.name | |
| else: | |
| return "Kein Video angegeben.", None, None, None, None, None | |
| audio_wav = str(Path(tmpdir) / "audio.wav") | |
| extract_audio_ffmpeg(video_path, audio_wav) | |
| model = whisper.load_model(model_size) | |
| result = model.transcribe(audio_wav, verbose=False) | |
| segments = result.get("segments", []) | |
| language = result.get("language", "unknown") | |
| txt_text = segments_to_txt(segments) | |
| srt_text = segments_to_srt(segments) | |
| vtt_text = segments_to_vtt(segments) | |
| json_text = segments_to_json(segments, language, {"model": model_size}) | |
| base = Path(video_path).stem | |
| files = {} | |
| for ext, content in {"srt": srt_text, "vtt": vtt_text, "txt": txt_text, "json": json_text}.items(): | |
| p = Path(tmpdir) / f"{base}.{ext}" | |
| p.write_text(content, encoding="utf-8") | |
| files[ext] = str(p) | |
| if not keep_video and url: | |
| try: | |
| os.remove(video_path) | |
| except Exception: | |
| pass | |
| meta = f"Model: {model_size}, Sprache: {language}" | |
| return txt_text, files["srt"], files["vtt"], files["txt"], files["json"], meta | |
| except Exception as e: | |
| return f"Fehler: {e}", None, None, None, None, None | |
| # --------------------------------------------------------------------------- | |
| # Netzwerk / DNS Diagnose | |
| # --------------------------------------------------------------------------- | |
| def dns_internet_diag(): | |
| lines = [] | |
| lines.append("=== DNS-Auflösung (System) ===") | |
| for host in ["huggingface.co", "www.google.com", "www.instagram.com", "youtube.com"]: | |
| try: | |
| ip = socket.gethostbyname(host) | |
| lines.append(f"{host} -> {ip} (OK)") | |
| except Exception as e: | |
| lines.append(f"{host} -> ERROR: {e}") | |
| if dns_resolver: | |
| lines.append("\n\n=== DNS-Auflösung (via dnspython mit 8.8.8.8) ===") | |
| for host in ["huggingface.co", "www.google.com", "www.instagram.com", "youtube.com"]: | |
| try: | |
| ip = resolve_hostname_with_dns_python(host) | |
| lines.append(f"{host} -> {ip} (OK)") | |
| except Exception as e: | |
| lines.append(f"{host} -> ERROR: {e}") | |
| lines.append("\n\n=== HTTP-Requests (GET) ===") | |
| for url in ["https://huggingface.co", "https://www.google.com"]: | |
| try: | |
| with urllib.request.urlopen(url, timeout=5) as resp: | |
| code = getattr(resp, "status", None) or resp.getcode() | |
| lines.append(f"{url} -> OK (Status {code})") | |
| except Exception as e: | |
| lines.append(f"{url} -> ERROR: {e}") | |
| lines.append("\n\n=== yt-dlp ===") | |
| try: | |
| out = run_capture(["yt-dlp", "--version"]) | |
| lines.append(f"yt-dlp Version: {out.strip()}") | |
| except Exception as e: | |
| lines.append(f"yt-dlp Fehler: {e}") | |
| lines.append("\n\n=== ffmpeg ===") | |
| try: | |
| out = run_capture(["ffmpeg", "-version"]) | |
| first = out.splitlines()[0] if out else "(keine Ausgabe)" | |
| lines.append(first) | |
| except Exception as e: | |
| lines.append(f"ffmpeg Fehler: {e}") | |
| return "\n".join(lines) | |
| # --------------------------------------------------------------------------- | |
| # Gradio UI mit zwei Tabs | |
| # --------------------------------------------------------------------------- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Video → Whisper Transkript (SRT/TXT/VTT/JSON)") | |
| with gr.Tab("Transkription"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| url_in = gr.Textbox(label="Video URL", placeholder="https://...") | |
| file_in = gr.File(label="Oder Videodatei hochladen") | |
| cookies_in = gr.File(label="Cookies.txt (optional, für yt-dlp)") | |
| fmt_in = gr.Textbox(label="Format (optional, yt-dlp -f)", placeholder="z.B. bestvideo+bestaudio/best") | |
| model_sel = gr.Radio(["tiny", "base", "small", "medium", "large"], value="small", label="Whisper-Modell") | |
| keep_chk = gr.Checkbox(label="Video behalten (bei URL-Download)", value=False) | |
| btn = gr.Button("Transkribieren") | |
| status = gr.Textbox(label="Status / Meta", interactive=False) | |
| with gr.Column(): | |
| transcript = gr.Textbox(label="Transkript", lines=20) | |
| srt_dl = gr.File(label="SRT") | |
| vtt_dl = gr.File(label="VTT") | |
| txt_dl = gr.File(label="TXT") | |
| json_dl = gr.File(label="JSON") | |
| def run_transcribe(f, u, m, k, c, fmt): | |
| cookies_path = c.name if c else None | |
| display, srtf, vttf, txtf, jsonf, meta = transcribe_pipeline( | |
| f, u, m, k, cookies_file=cookies_path, format_selector=(fmt or None) | |
| ) | |
| return ( | |
| display, | |
| gr.update(value=srtf, visible=bool(srtf)), | |
| gr.update(value=vttf, visible=bool(vttf)), | |
| gr.update(value=txtf, visible=bool(txtf)), | |
| gr.update(value=jsonf, visible=bool(jsonf)), | |
| meta, | |
| ) | |
| btn.click( | |
| run_transcribe, | |
| [file_in, url_in, model_sel, keep_chk, cookies_in, fmt_in], | |
| [transcript, srt_dl, vtt_dl, txt_dl, json_dl, status], | |
| ) | |
| with gr.Tab("Netzwerk / DNS Diagnose"): | |
| gr.Markdown( | |
| """Führt Tests für den System-DNS und einen externen DNS (via dnspython) durch. | |
| Wenn der System-DNS fehlschlägt, der externe aber funktioniert, ist die DNS-Umgehung aktiv. | |
| Wenn beides fehlschlägt, blockiert eine Firewall wahrscheinlich auch die IP-Adressen.""" | |
| ) | |
| diag_btn = gr.Button("Diagnose starten") | |
| diag_out = gr.Textbox(label="Diagnose-Ausgabe", lines=25) | |
| diag_btn.click(dns_internet_diag, inputs=[], outputs=[diag_out]) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860))) |