neuralworm's picture
Create app.py
a9d392f verified
raw
history blame
7.64 kB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Hugging Face Space (Gradio) App: Video -> Audio -> Whisper Transkript (+ Downloads SRT/TXT/VTT/JSON)
Rechtlicher Hinweis:
- Verwende diese App nur für eigene Inhalte oder Inhalte, für die du explizit die Erlaubnis hast.
- Respektiere Urheberrecht und die Terms of Service der jeweiligen Plattformen.
Benötigt:
- ffmpeg (systemweit)
- Python-Pakete siehe requirements.txt
"""
import os
import subprocess
import tempfile
import json
from pathlib import Path
from datetime import timedelta
import gradio as gr
# Versuch, whisper zu importieren (installiert via requirements.txt as git+repo)
try:
import whisper
except Exception as e:
whisper = None
# Hilfsfunktionen ----------------------------------------------------------
def run(cmd, hide_output=False):
"""Run shell command, raise on error."""
if hide_output:
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
else:
subprocess.run(cmd, check=True)
def download_video_with_ytdlp(url: str, out_dir: str) -> str:
"""Download best video using yt-dlp into out_dir, return filepath"""
out_template = str(Path(out_dir) / "%(title)s.%(ext)s")
cmd = ["yt-dlp", "-f", "best", "-o", out_template, url]
run(cmd)
# pick most recently modified file
files = sorted(Path(out_dir).glob("*"), key=lambda p: p.stat().st_mtime, reverse=True)
if not files:
raise FileNotFoundError("Download erfolglos — keine Datei gefunden.")
return str(files[0])
def extract_audio_ffmpeg(video_path: str, out_wav: str):
"""Extract mono 16k WAV for Whisper"""
cmd = [
"ffmpeg",
"-y",
"-i", video_path,
"-vn",
"-ac", "1",
"-ar", "16000",
"-f", "wav",
out_wav
]
run(cmd, hide_output=True)
return out_wav
def seconds_to_timestamp(s: float, always_ms: bool = True) -> str:
"""Convert seconds (float) to SRT/VTT time format HH:MM:SS,mmm"""
td = timedelta(seconds=float(s))
total_seconds = int(td.total_seconds())
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
milliseconds = int(td.microseconds / 1000 + (td.seconds - int(td.seconds)) * 1000)
# Better approach using fractional part:
frac = s - int(s)
ms = int(round((s - int(s)) * 1000)) if s >= 0 else 0
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{ms:03d}"
def format_timestamp_vtt(s: float) -> str:
td = timedelta(seconds=float(s))
total_seconds = int(td.total_seconds())
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
ms = int(round((s - int(s)) * 1000))
return f"{hours:02d}:{minutes:02d}:{seconds:02d}.{ms:03d}"
def segments_to_srt(segments):
"""Create SRT string from whisper segments"""
parts = []
for i, seg in enumerate(segments, start=1):
start = seconds_to_timestamp(seg['start'])
end = seconds_to_timestamp(seg['end'])
text = seg['text'].strip()
parts.append(f"{i}\n{start} --> {end}\n{text}\n")
return "\n".join(parts)
def segments_to_vtt(segments):
"""Create VTT string from whisper segments"""
parts = ["WEBVTT\n"]
for seg in segments:
start = format_timestamp_vtt(seg['start'])
end = format_timestamp_vtt(seg['end'])
text = seg['text'].strip()
parts.append(f"{start} --> {end}\n{text}\n")
return "\n".join(parts)
def segments_to_txt(segments):
"""Create plain TXT with timestamps per segment"""
lines = []
for seg in segments:
start = seconds_to_timestamp(seg['start'])
text = seg['text'].strip()
lines.append(f"[{start}] {text}")
return "\n".join(lines)
def segments_to_json(segments, language=None, metadata=None):
obj = {
"language": language,
"segments": segments
}
if metadata:
obj["metadata"] = metadata
return json.dumps(obj, ensure_ascii=False, indent=2)
# Haupt-Workflow ----------------------------------------------------------
def transcribe_pipeline(file_obj, url, model_size, keep_video=False):
"""
file_obj: uploaded file (temp path) or None
url: optional URL to download via yt-dlp
model_size: whisper model size
"""
if whisper is None:
return "Fehler: lokales whisper nicht verfügbar. Stelle sicher, dass das Repo installiert ist.", None, None, None, None, None
tmpdir = tempfile.mkdtemp(prefix="whisper_space_")
try:
# 1) Get video path either from uploaded file or by downloading URL
if url:
video_path = download_video_with_ytdlp(url, tmpdir)
elif file_obj:
# file_obj is a tuple (name, file-like) or a path depending on Gradio version.
# Gradio typically supplies a filesystem path.
if isinstance(file_obj, str) and os.path.exists(file_obj):
video_path = file_obj
else:
# try to write content to temp file
uploaded_path = Path(tmpdir) / Path(getattr(file_obj, "name", "upload")).name
with open(uploaded_path, "wb") as f:
# file_obj may be a SpooledTemporaryFile or similar with .read()
f.write(file_obj.read())
video_path = str(uploaded_path)
else:
return "Kein Video angegeben (weder Datei noch URL).", None, None, None, None, None
# 2) Extract audio
audio_wav = str(Path(tmpdir) / "audio.wav")
extract_audio_ffmpeg(video_path, audio_wav)
# 3) Load whisper model and transcribe
model = whisper.load_model(model_size)
# transcribe: get segments to generate SRT/VTT etc.
result = model.transcribe(audio_wav, verbose=False)
segments = result.get("segments", [])
language = result.get("language", None)
# 4) Create output strings
srt_text = segments_to_srt(segments)
vtt_text = segments_to_vtt(segments)
txt_text = segments_to_txt(segments)
json_text = segments_to_json(segments, language=language, metadata={"model": model_size})
# 5) Save files to tmpdir for download via Gradio
out_files = {}
base_name = Path(video_path).stem
files_map = {
f"{base_name}.srt": srt_text,
f"{base_name}.vtt": vtt_text,
f"{base_name}.txt": txt_text,
f"{base_name}.json": json_text
}
for fname, content in files_map.items():
path = Path(tmpdir) / fname
path.write_text(content, encoding="utf-8")
out_files[fname] = str(path)
# 6) prepare display text with timestamps for UI (simple combined view)
display_lines = []
for seg in segments:
start = seconds_to_timestamp(seg['start'])
display_lines.append(f"[{start}] {seg['text'].strip()}")
display_text = "\n".join(display_lines)
# Optionally remove video to save space
if not keep_video and url:
try:
os.remove(video_path)
except Exception:
pass
return display_text, out_files[f"{base_name}.srt"], out_files[f"{base_name}.vtt"], out_files[f"{base_name}.txt"], out_files[f"{base_name}.json"], f"Model: {model_size}, Language: {language}"
except Exception as e:
return f"Fehler während Verarbeitung: {e}", None, None, None, None, None
finally:
# Do not delete tmpdir immediately if the user wants to download