File size: 7,635 Bytes
a9d392f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Hugging Face Space (Gradio) App: Video -> Audio -> Whisper Transkript (+ Downloads SRT/TXT/VTT/JSON)

Rechtlicher Hinweis:
- Verwende diese App nur für eigene Inhalte oder Inhalte, für die du explizit die Erlaubnis hast.
- Respektiere Urheberrecht und die Terms of Service der jeweiligen Plattformen.

Benötigt:
- ffmpeg (systemweit)
- Python-Pakete siehe requirements.txt
"""
import os
import subprocess
import tempfile
import json
from pathlib import Path
from datetime import timedelta

import gradio as gr

# Versuch, whisper zu importieren (installiert via requirements.txt as git+repo)
try:
    import whisper
except Exception as e:
    whisper = None

# Hilfsfunktionen ----------------------------------------------------------

def run(cmd, hide_output=False):
    """Run shell command, raise on error."""
    if hide_output:
        subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    else:
        subprocess.run(cmd, check=True)

def download_video_with_ytdlp(url: str, out_dir: str) -> str:
    """Download best video using yt-dlp into out_dir, return filepath"""
    out_template = str(Path(out_dir) / "%(title)s.%(ext)s")
    cmd = ["yt-dlp", "-f", "best", "-o", out_template, url]
    run(cmd)
    # pick most recently modified file
    files = sorted(Path(out_dir).glob("*"), key=lambda p: p.stat().st_mtime, reverse=True)
    if not files:
        raise FileNotFoundError("Download erfolglos — keine Datei gefunden.")
    return str(files[0])

def extract_audio_ffmpeg(video_path: str, out_wav: str):
    """Extract mono 16k WAV for Whisper"""
    cmd = [
        "ffmpeg",
        "-y",
        "-i", video_path,
        "-vn",
        "-ac", "1",
        "-ar", "16000",
        "-f", "wav",
        out_wav
    ]
    run(cmd, hide_output=True)
    return out_wav

def seconds_to_timestamp(s: float, always_ms: bool = True) -> str:
    """Convert seconds (float) to SRT/VTT time format HH:MM:SS,mmm"""
    td = timedelta(seconds=float(s))
    total_seconds = int(td.total_seconds())
    hours = total_seconds // 3600
    minutes = (total_seconds % 3600) // 60
    seconds = total_seconds % 60
    milliseconds = int(td.microseconds / 1000 + (td.seconds - int(td.seconds)) * 1000)
    # Better approach using fractional part:
    frac = s - int(s)
    ms = int(round((s - int(s)) * 1000)) if s >= 0 else 0
    return f"{hours:02d}:{minutes:02d}:{seconds:02d},{ms:03d}"

def format_timestamp_vtt(s: float) -> str:
    td = timedelta(seconds=float(s))
    total_seconds = int(td.total_seconds())
    hours = total_seconds // 3600
    minutes = (total_seconds % 3600) // 60
    seconds = total_seconds % 60
    ms = int(round((s - int(s)) * 1000))
    return f"{hours:02d}:{minutes:02d}:{seconds:02d}.{ms:03d}"

def segments_to_srt(segments):
    """Create SRT string from whisper segments"""
    parts = []
    for i, seg in enumerate(segments, start=1):
        start = seconds_to_timestamp(seg['start'])
        end = seconds_to_timestamp(seg['end'])
        text = seg['text'].strip()
        parts.append(f"{i}\n{start} --> {end}\n{text}\n")
    return "\n".join(parts)

def segments_to_vtt(segments):
    """Create VTT string from whisper segments"""
    parts = ["WEBVTT\n"]
    for seg in segments:
        start = format_timestamp_vtt(seg['start'])
        end = format_timestamp_vtt(seg['end'])
        text = seg['text'].strip()
        parts.append(f"{start} --> {end}\n{text}\n")
    return "\n".join(parts)

def segments_to_txt(segments):
    """Create plain TXT with timestamps per segment"""
    lines = []
    for seg in segments:
        start = seconds_to_timestamp(seg['start'])
        text = seg['text'].strip()
        lines.append(f"[{start}] {text}")
    return "\n".join(lines)

def segments_to_json(segments, language=None, metadata=None):
    obj = {
        "language": language,
        "segments": segments
    }
    if metadata:
        obj["metadata"] = metadata
    return json.dumps(obj, ensure_ascii=False, indent=2)

# Haupt-Workflow ----------------------------------------------------------

def transcribe_pipeline(file_obj, url, model_size, keep_video=False):
    """
    file_obj: uploaded file (temp path) or None
    url: optional URL to download via yt-dlp
    model_size: whisper model size
    """
    if whisper is None:
        return "Fehler: lokales whisper nicht verfügbar. Stelle sicher, dass das Repo installiert ist.", None, None, None, None, None

    tmpdir = tempfile.mkdtemp(prefix="whisper_space_")
    try:
        # 1) Get video path either from uploaded file or by downloading URL
        if url:
            video_path = download_video_with_ytdlp(url, tmpdir)
        elif file_obj:
            # file_obj is a tuple (name, file-like) or a path depending on Gradio version.
            # Gradio typically supplies a filesystem path.
            if isinstance(file_obj, str) and os.path.exists(file_obj):
                video_path = file_obj
            else:
                # try to write content to temp file
                uploaded_path = Path(tmpdir) / Path(getattr(file_obj, "name", "upload")).name
                with open(uploaded_path, "wb") as f:
                    # file_obj may be a SpooledTemporaryFile or similar with .read()
                    f.write(file_obj.read())
                video_path = str(uploaded_path)
        else:
            return "Kein Video angegeben (weder Datei noch URL).", None, None, None, None, None

        # 2) Extract audio
        audio_wav = str(Path(tmpdir) / "audio.wav")
        extract_audio_ffmpeg(video_path, audio_wav)

        # 3) Load whisper model and transcribe
        model = whisper.load_model(model_size)
        # transcribe: get segments to generate SRT/VTT etc.
        result = model.transcribe(audio_wav, verbose=False)
        segments = result.get("segments", [])
        language = result.get("language", None)

        # 4) Create output strings
        srt_text = segments_to_srt(segments)
        vtt_text = segments_to_vtt(segments)
        txt_text = segments_to_txt(segments)
        json_text = segments_to_json(segments, language=language, metadata={"model": model_size})

        # 5) Save files to tmpdir for download via Gradio
        out_files = {}
        base_name = Path(video_path).stem
        files_map = {
            f"{base_name}.srt": srt_text,
            f"{base_name}.vtt": vtt_text,
            f"{base_name}.txt": txt_text,
            f"{base_name}.json": json_text
        }
        for fname, content in files_map.items():
            path = Path(tmpdir) / fname
            path.write_text(content, encoding="utf-8")
            out_files[fname] = str(path)

        # 6) prepare display text with timestamps for UI (simple combined view)
        display_lines = []
        for seg in segments:
            start = seconds_to_timestamp(seg['start'])
            display_lines.append(f"[{start}] {seg['text'].strip()}")
        display_text = "\n".join(display_lines)

        # Optionally remove video to save space
        if not keep_video and url:
            try:
                os.remove(video_path)
            except Exception:
                pass

        return display_text, out_files[f"{base_name}.srt"], out_files[f"{base_name}.vtt"], out_files[f"{base_name}.txt"], out_files[f"{base_name}.json"], f"Model: {model_size}, Language: {language}"
    except Exception as e:
        return f"Fehler während Verarbeitung: {e}", None, None, None, None, None
    finally:
        # Do not delete tmpdir immediately if the user wants to download