Spaces:
Build error
Build error
| import gradio as gr | |
| from gradio import utils | |
| import os | |
| import re | |
| import requests | |
| from concurrent.futures import ThreadPoolExecutor | |
| import time | |
| from yt_dlp import YoutubeDL | |
| from yt_dlp.utils import DownloadError | |
| import subprocess | |
| import shutil | |
| from typing import List, Tuple | |
| import pandas as pd | |
| import asyncio | |
| import aiohttp | |
| import aiofiles | |
| import json | |
| from functools import lru_cache | |
| import tenacity | |
| from tqdm import tqdm | |
| from pathlib import Path | |
| import logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| logger = logging.getLogger(__name__) | |
| def sanitize_title(title): | |
| return re.sub(r'[\\/*?:"<>|]', "", title) | |
| def format_time(seconds): | |
| return time.strftime('%H:%M:%S', time.gmtime(seconds)) | |
| def get_video_info(video_url): | |
| with YoutubeDL({'quiet': True, 'no_warnings': True}) as ydl: | |
| try: | |
| info = ydl.extract_info(video_url, download=False) | |
| formats = info.get('formats', []) | |
| # Function to safely get bitrate | |
| def get_bitrate(format_dict, key): | |
| return format_dict.get(key, 0) or 0 | |
| # Prefer adaptive formats (separate video and audio) | |
| video_formats = [f for f in formats if f.get('vcodec') != 'none' and f.get('acodec') == 'none'] | |
| audio_formats = [f for f in formats if f.get('acodec') != 'none' and f.get('vcodec') == 'none'] | |
| if video_formats and audio_formats: | |
| video_format = max(video_formats, key=lambda f: get_bitrate(f, 'vbr')) | |
| audio_format = max(audio_formats, key=lambda f: get_bitrate(f, 'abr')) | |
| return info['title'], video_format['url'], audio_format['url'] | |
| else: | |
| # Fallback to best combined format | |
| combined_formats = [f for f in formats if f.get('vcodec') != 'none' and f.get('acodec') != 'none'] | |
| if combined_formats: | |
| best_format = max(combined_formats, key=lambda f: get_bitrate(f, 'tbr')) | |
| return info['title'], best_format['url'], None | |
| else: | |
| raise Exception("No suitable video formats found") | |
| except DownloadError as e: | |
| raise Exception(f"Error extracting video info: {str(e)}") | |
| except Exception as e: | |
| raise Exception(f"Unexpected error: {str(e)}") | |
| async def download_segment_async(url, start_time, end_time, output_path): | |
| output_path = Path(output_path) | |
| command = [ | |
| 'ffmpeg', | |
| '-ss', format_time(start_time), | |
| '-i', url, | |
| '-t', format_time(end_time - start_time), | |
| '-c', 'copy', | |
| '-avoid_negative_ts', 'make_zero', | |
| '-y', | |
| str(output_path) | |
| ] | |
| logger.debug(f"Executing command: {' '.join(command)}") | |
| process = await asyncio.create_subprocess_exec( | |
| *command, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE | |
| ) | |
| stdout, stderr = await process.communicate() | |
| if process.returncode != 0: | |
| error_message = f"FFmpeg error: {stderr.decode()}" | |
| logger.error(error_message) | |
| yield error_message | |
| raise Exception(error_message) | |
| if not output_path.exists(): | |
| error_message = f"Output file not created: {output_path}" | |
| logger.error(error_message) | |
| yield error_message | |
| raise FileNotFoundError(error_message) | |
| logger.info(f"Successfully downloaded segment to {output_path}") | |
| yield f"Successfully downloaded segment to {output_path}" | |
| async def combine_segments_async(video_segments, audio_segments, output_path): | |
| if not audio_segments: | |
| raise Exception("No audio segments to combine") | |
| video_list = None | |
| audio_list = None | |
| temp_video = None | |
| temp_audio = None | |
| try: | |
| if video_segments: | |
| temp_video = 'temp_video.mp4' | |
| temp_audio = 'temp_audio.m4a' | |
| # Concatenate video segments | |
| video_list = 'video_list.txt' | |
| async with aiofiles.open(video_list, 'w') as f: | |
| await f.write('\n'.join(f"file '{segment}'" for segment in video_segments)) | |
| process = await asyncio.create_subprocess_exec( | |
| 'ffmpeg', '-f', 'concat', '-safe', '0', '-i', video_list, '-c', 'copy', temp_video, | |
| stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE | |
| ) | |
| stdout, stderr = await process.communicate() | |
| if process.returncode != 0: | |
| raise Exception(f"Error concatenating video segments: {stderr.decode()}") | |
| # Concatenate audio segments | |
| audio_list = 'audio_list.txt' | |
| async with aiofiles.open(audio_list, 'w') as f: | |
| await f.write('\n'.join(f"file '{segment}'" for segment in audio_segments)) | |
| process = await asyncio.create_subprocess_exec( | |
| 'ffmpeg', '-f', 'concat', '-safe', '0', '-i', audio_list, '-c', 'copy', temp_audio, | |
| stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE | |
| ) | |
| stdout, stderr = await process.communicate() | |
| if process.returncode != 0: | |
| raise Exception(f"Error concatenating audio segments: {stderr.decode()}") | |
| # Combine video and audio | |
| process = await asyncio.create_subprocess_exec( | |
| 'ffmpeg', '-i', temp_video, '-i', temp_audio, '-c', 'copy', output_path, | |
| stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE | |
| ) | |
| stdout, stderr = await process.communicate() | |
| if process.returncode != 0: | |
| raise Exception(f"Error combining video and audio: {stderr.decode()}") | |
| else: | |
| # Audio only | |
| audio_list = 'audio_list.txt' | |
| async with aiofiles.open(audio_list, 'w') as f: | |
| await f.write('\n'.join(f"file '{segment}'" for segment in audio_segments)) | |
| process = await asyncio.create_subprocess_exec( | |
| 'ffmpeg', '-f', 'concat', '-safe', '0', '-i', audio_list, '-c', 'copy', output_path, | |
| stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE | |
| ) | |
| stdout, stderr = await process.communicate() | |
| if process.returncode != 0: | |
| raise Exception(f"Error combining audio segments: {stderr.decode()}") | |
| return output_path | |
| except Exception as e: | |
| raise Exception(f"Error in combine_segments_async: {str(e)}") | |
| finally: | |
| # Clean up temporary files | |
| for file in [f for f in [video_list, audio_list, temp_video, temp_audio] if f]: | |
| try: | |
| Path(file).unlink(missing_ok=True) | |
| except Exception as e: | |
| print(f"Error removing temporary file {file}: {str(e)}") | |
| def add_segment(start_hours, start_minutes, start_seconds, end_hours, end_minutes, end_seconds, segments): | |
| start_time = f"{start_hours:02d}:{start_minutes:02d}:{start_seconds:02d}" | |
| end_time = f"{end_hours:02d}:{end_minutes:02d}:{end_seconds:02d}" | |
| new_segment = f"{start_time}-{end_time}" | |
| new_row = pd.DataFrame([new_segment], columns=["Segment"]) | |
| return pd.concat([segments, new_row], ignore_index=True) | |
| def remove_segment(segments, index): | |
| return segments.drop(index).reset_index(drop=True) | |
| def move_segment(segments, old_index, new_index): | |
| if 0 <= old_index < len(segments) and 0 <= new_index < len(segments): | |
| segment = segments.iloc[old_index] | |
| segments = segments.drop(old_index).reset_index(drop=True) | |
| segments = pd.concat([segments.iloc[:new_index], pd.DataFrame([segment]), segments.iloc[new_index:]]).reset_index(drop=True) | |
| return segments | |
| def parse_segments(segments: pd.DataFrame) -> List[Tuple[int, int]]: | |
| parsed_segments = [] | |
| for segment in segments['Segment']: | |
| if not isinstance(segment, str) or '-' not in segment: | |
| continue | |
| try: | |
| start, end = segment.split('-') | |
| start_seconds = sum(int(i) * 60 ** j for j, i in enumerate(reversed(start.split(':'))) if i) | |
| end_seconds = sum(int(i) * 60 ** j for j, i in enumerate(reversed(end.split(':'))) if i) | |
| if start_seconds < end_seconds: | |
| parsed_segments.append((start_seconds, end_seconds)) | |
| except ValueError: | |
| continue # Skip invalid segments | |
| return parsed_segments | |
| async def process_video(video_url, segments, combine, audio_only, progress=gr.Progress()): | |
| if not video_url.strip(): | |
| yield 0, "Error: Please provide a valid YouTube URL", None | |
| return | |
| parsed_segments = parse_segments(segments) | |
| if not parsed_segments: | |
| yield 0, "Error: No valid segments provided", None | |
| return | |
| output_dir = Path('output') | |
| # Clean up the output directory | |
| if output_dir.exists(): | |
| try: | |
| shutil.rmtree(output_dir) | |
| yield 0, "Cleaned up existing output directory", None | |
| except Exception as e: | |
| yield 0, f"Error cleaning up output directory: {str(e)}", None | |
| return | |
| output_dir.mkdir(exist_ok=True) | |
| try: | |
| progress(0, "Extracting video info...") | |
| video_title, video_url, audio_url = get_video_info(video_url) | |
| except Exception as e: | |
| yield 0, f"Error: {str(e)}", None | |
| return | |
| video_segments = [] | |
| audio_segments = [] | |
| total_segments = len(parsed_segments) | |
| for i, (start_time, end_time) in enumerate(parsed_segments): | |
| progress((i / total_segments) * 0.8, f"Downloading segment {i+1}/{total_segments}") | |
| try: | |
| if not audio_only: | |
| video_output = output_dir / f"{sanitize_title(video_title)}_video_segment_{i+1}.mp4" | |
| async for output in download_segment_async(video_url, start_time, end_time, str(video_output)): | |
| progress((i / total_segments) * 0.8 + (1 / total_segments) * 0.4, f"Downloading video segment {i+1}/{total_segments}: {output}") | |
| if video_output.exists(): | |
| video_segments.append(str(video_output)) | |
| else: | |
| raise FileNotFoundError(f"Video segment file not found: {video_output}") | |
| audio_output = output_dir / f"{sanitize_title(video_title)}_audio_segment_{i+1}.m4a" | |
| async for output in download_segment_async(audio_url or video_url, start_time, end_time, str(audio_output)): | |
| progress((i / total_segments) * 0.8 + (1 / total_segments) * 0.8, f"Downloading audio segment {i+1}/{total_segments}: {output}") | |
| if audio_output.exists(): | |
| audio_segments.append(str(audio_output)) | |
| else: | |
| raise FileNotFoundError(f"Audio segment file not found: {audio_output}") | |
| except Exception as e: | |
| yield (i / total_segments) * 100, f"Error downloading segment {i+1}: {str(e)}", None | |
| return | |
| try: | |
| if combine: | |
| progress(90, "Combining segments...") | |
| if audio_only: | |
| output_path = output_dir / f"{sanitize_title(video_title)}_combined.m4a" | |
| else: | |
| output_path = output_dir / f"{sanitize_title(video_title)}_combined.mp4" | |
| output_path = await combine_segments_async(video_segments if not audio_only else [], audio_segments, str(output_path)) | |
| yield 100, f"Segments combined and saved as {output_path}", output_path | |
| else: | |
| output_path = audio_segments[0] if audio_only else video_segments[0] | |
| yield 100, "All segments downloaded successfully", output_path | |
| except Exception as e: | |
| yield 100, f"Error: {str(e)}", None | |
| finally: | |
| # Clean up individual segment files if they were combined | |
| if combine: | |
| for segment in video_segments + audio_segments: | |
| try: | |
| Path(segment).unlink(missing_ok=True) | |
| except Exception as e: | |
| print(f"Error removing segment file {segment}: {str(e)}") | |
| def get_video_qualities(video_url): | |
| if not video_url.strip(): | |
| return [] | |
| with YoutubeDL({'quiet': True, 'no_warnings': True}) as ydl: | |
| try: | |
| info = ydl.extract_info(video_url, download=False) | |
| formats = info.get('formats', []) | |
| qualities = set() | |
| for f in formats: | |
| if f.get('vcodec') != 'none' and f.get('acodec') != 'none': | |
| height = f.get('height') | |
| if height: | |
| qualities.add(f"{height}p") | |
| return sorted(list(qualities), key=lambda x: int(x[:-1]), reverse=True) | |
| except DownloadError: | |
| return [] | |
| except Exception as e: | |
| print(f"Error in get_video_qualities: {str(e)}") | |
| return [] | |
| # Disable Gradio analytics | |
| utils.colab_check = lambda: True | |
| custom_css = """ | |
| /* Reset and base styles */ | |
| * { box-sizing: border-box; margin: 0; padding: 0; } | |
| body, #component-0 { | |
| height: 100vh; | |
| max-height: 100vh; | |
| overflow: hidden; | |
| display: flex; | |
| flex-direction: column; | |
| } | |
| /* Gradio container modifications */ | |
| .gradio-container { | |
| flex-grow: 1; | |
| display: flex; | |
| flex-direction: column; | |
| max-height: 100vh; | |
| overflow: hidden; | |
| } | |
| /* Header */ | |
| .header { | |
| padding: 1rem; | |
| background-color: #f0f0f0; | |
| } | |
| /* Main content area */ | |
| .main-content { | |
| flex-grow: 1; | |
| display: flex; | |
| overflow: hidden; | |
| padding: 1rem; | |
| gap: 1rem; | |
| } | |
| /* Columns */ | |
| .input-section, .options-section, .output-section { | |
| flex: 1; | |
| display: flex; | |
| flex-direction: column; | |
| overflow: hidden; | |
| background-color: #ffffff; | |
| border-radius: 8px; | |
| box-shadow: 0 2px 10px rgba(0,0,0,0.1); | |
| } | |
| /* Scrollable areas */ | |
| .scroll-area { | |
| flex-grow: 1; | |
| overflow-y: auto; | |
| padding: 1rem; | |
| } | |
| /* Responsive adjustments */ | |
| @media (max-width: 768px) { | |
| .main-content { | |
| flex-direction: column; | |
| } | |
| } | |
| /* Additional styles */ | |
| .segment-input, .time-input, .button-row { | |
| display: flex; | |
| gap: 0.5rem; | |
| margin-bottom: 1rem; | |
| } | |
| .time-input { width: 60px !important; } | |
| .url-input { flex-grow: 1; } | |
| .quality-dropdown { width: 100px !important; } | |
| """ | |
| with gr.Blocks(title="YouTube Segment Downloader", theme=gr.themes.Default(), css=custom_css) as iface: | |
| gr.Markdown("# 🎬 YouTube Segment Downloader", elem_classes="header") | |
| with gr.Row(elem_classes="main-content"): | |
| with gr.Column(elem_classes="input-section"): | |
| with gr.Column(elem_classes="scroll-area"): | |
| with gr.Row(): | |
| video_url = gr.Textbox(label="YouTube URL", placeholder="Paste URL here", elem_classes="url-input") | |
| quality = gr.Dropdown(label="Quality", choices=[], interactive=True, elem_classes="quality-dropdown", visible=False) | |
| url_status = gr.Markdown(visible=False) | |
| gr.Markdown("### Add Segments") | |
| with gr.Row(elem_classes="segment-input"): | |
| start_hours = gr.Number(label="Start HH", minimum=0, maximum=23, step=1, value=0, elem_classes="time-input") | |
| start_minutes = gr.Number(label="MM", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input") | |
| start_seconds = gr.Number(label="SS", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input") | |
| gr.Markdown("to") | |
| end_hours = gr.Number(label="End HH", minimum=0, maximum=23, step=1, value=0, elem_classes="time-input") | |
| end_minutes = gr.Number(label="MM", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input") | |
| end_seconds = gr.Number(label="SS", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input") | |
| add_btn = gr.Button("Add Segment", variant="primary") | |
| segments = gr.Dataframe(headers=["Segment"], row_count=3, col_count=1, datatype=["str"], interactive=True, label="Segments") | |
| with gr.Column(elem_classes="options-section"): | |
| with gr.Column(elem_classes="scroll-area"): | |
| combine = gr.Checkbox(label="Combine segments", value=True) | |
| audio_only = gr.Checkbox(label="Audio only", value=False) | |
| remove_index = gr.Number(label="Remove segment", minimum=0, step=1, value=0) | |
| remove_btn = gr.Button("Remove", variant="secondary") | |
| old_index = gr.Number(label="Move from", minimum=0, step=1, value=0) | |
| new_index = gr.Number(label="to", minimum=0, step=1, value=0) | |
| move_btn = gr.Button("Move", variant="secondary") | |
| submit_btn = gr.Button("🚀 Download", variant="primary") | |
| with gr.Column(elem_classes="output-section"): | |
| with gr.Column(elem_classes="scroll-area"): | |
| progress = gr.Slider(label="Progress", minimum=0, maximum=100, step=1, interactive=False) | |
| status = gr.Textbox(label="Status", lines=1) | |
| output_file = gr.File(label="Downloaded File") | |
| add_btn.click( | |
| add_segment, | |
| inputs=[start_hours, start_minutes, start_seconds, end_hours, end_minutes, end_seconds, segments], | |
| outputs=[segments] | |
| ) | |
| submit_btn.click( | |
| process_video, | |
| inputs=[video_url, segments, combine, audio_only], | |
| outputs=[progress, status, output_file] | |
| ) | |
| remove_btn.click( | |
| remove_segment, | |
| inputs=[segments, remove_index], | |
| outputs=[segments] | |
| ) | |
| move_btn.click( | |
| move_segment, | |
| inputs=[segments, old_index, new_index], | |
| outputs=[segments] | |
| ) | |
| def update_qualities(url): | |
| qualities = get_video_qualities(url) | |
| if qualities: | |
| return ( | |
| gr.Dropdown(choices=qualities, value=qualities[0], visible=True), | |
| gr.Markdown(visible=False) | |
| ) | |
| elif url.strip(): # Only show error if URL is not empty | |
| return ( | |
| gr.Dropdown(choices=[], visible=False), | |
| gr.Markdown("Unable to fetch video qualities. The URL might be invalid or the video might be unavailable.", visible=True) | |
| ) | |
| else: | |
| return ( | |
| gr.Dropdown(choices=[], visible=False), | |
| gr.Markdown(visible=False) | |
| ) | |
| video_url.change( | |
| update_qualities, | |
| inputs=[video_url], | |
| outputs=[quality, url_status] | |
| ) | |
| iface.launch(server_name="0.0.0.0", server_port=7860) |