|
|
from __future__ import annotations |
|
|
|
|
|
import os |
|
|
|
|
|
try: |
|
|
import yt_dlp |
|
|
has_yt_dlp = True |
|
|
except ImportError: |
|
|
has_yt_dlp = False |
|
|
|
|
|
from ...typing import AsyncResult, Messages |
|
|
from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin |
|
|
from ...providers.response import AudioResponse, VideoResponse, YouTubeResponse |
|
|
from ...image.copy_images import get_media_dir |
|
|
from ..helper import format_media_prompt |
|
|
|
|
|
class YouTube(AsyncGeneratorProvider, ProviderModelMixin): |
|
|
url = "https://youtube.com" |
|
|
working = has_yt_dlp |
|
|
|
|
|
default_model = "search" |
|
|
models = ["mp3", "1080p", "720p", "480p", "search"] |
|
|
|
|
|
@classmethod |
|
|
async def create_async_generator( |
|
|
cls, |
|
|
model: str, |
|
|
messages: Messages, |
|
|
prompt: str = None, |
|
|
**kwargs |
|
|
) -> AsyncResult: |
|
|
prompt = format_media_prompt(messages, prompt) |
|
|
results = [{ |
|
|
"id": line.split("?v=")[-1].split("&")[0], |
|
|
"url": line |
|
|
} for line in prompt.splitlines() |
|
|
if line.startswith("https://www.youtube.com/watch?v=")] |
|
|
provider = YouTubeProvider() |
|
|
if not results: |
|
|
results = await provider.search(prompt, max_results=10) |
|
|
new_results = [] |
|
|
for result in results: |
|
|
video_url = result['url'] |
|
|
has_video = False |
|
|
for message in messages: |
|
|
if isinstance(message.get("content"), str): |
|
|
if video_url in message["content"] and (model == "search" or model in message["content"]): |
|
|
has_video = True |
|
|
break |
|
|
if has_video: |
|
|
continue |
|
|
new_results.append(result) |
|
|
if model == "search": |
|
|
yield YouTubeResponse([result["id"] for result in new_results[:5]], True) |
|
|
else: |
|
|
if new_results: |
|
|
video_url = new_results[0]['url'] |
|
|
path = await provider.download(video_url, model=model, output_dir=get_media_dir()) |
|
|
if path.endswith('.mp3'): |
|
|
yield AudioResponse(f"/media/{os.path.basename(path)}") |
|
|
else: |
|
|
yield VideoResponse(f"/media/{os.path.basename(path)}", prompt) |
|
|
yield f"\n\n[{video_url}]({video_url})\n\n" |
|
|
|
|
|
class YouTubeProvider: |
|
|
""" |
|
|
Search and download YouTube videos. |
|
|
|
|
|
model: "mp3" for audio only, or "high-definition" for best video |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
pass |
|
|
|
|
|
async def search(self, query: str, max_results: int = 5) -> list[dict]: |
|
|
""" |
|
|
Search YouTube for videos matching the query. |
|
|
|
|
|
Returns a list of dicts with keys: title, url, id, duration |
|
|
""" |
|
|
ydl_opts = { |
|
|
'quiet': True, |
|
|
'extract_flat': True, |
|
|
'skip_download': True, |
|
|
} |
|
|
search_url = f"ytsearch{max_results}:{query}" |
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
info = ydl.extract_info(search_url, download=False) |
|
|
results = [] |
|
|
for entry in info.get('entries', []): |
|
|
results.append({ |
|
|
'title': entry.get('title'), |
|
|
'url': f"https://www.youtube.com/watch?v={entry.get('id')}", |
|
|
'id': entry.get('id'), |
|
|
'duration': entry.get('duration'), |
|
|
}) |
|
|
return results |
|
|
|
|
|
async def download(self, video_url: str, model: str = "720p", output_dir: str = ".") -> str: |
|
|
""" |
|
|
Download a YouTube video. |
|
|
|
|
|
:param video_url: The video URL or video id |
|
|
:param model: "mp3" for audio, "high-definition" for best video |
|
|
:param output_dir: Download location |
|
|
:return: The path to the downloaded file |
|
|
""" |
|
|
ydl_opts = { |
|
|
'outtmpl': f"{output_dir}/%(title)s{'' if model == 'mp3' else (' ' + model)}.%(ext)s", |
|
|
'quiet': True, |
|
|
} |
|
|
if model == "mp3": |
|
|
|
|
|
ydl_opts.update({ |
|
|
'format': 'bestaudio/best', |
|
|
'postprocessors': [{ |
|
|
'key': 'FFmpegExtractAudio', |
|
|
'preferredcodec': 'mp3', |
|
|
'preferredquality': '192' |
|
|
}] |
|
|
}) |
|
|
elif model == "1080p": |
|
|
ydl_opts.update({ |
|
|
'format': 'bestvideo[height<=1080]+bestaudio/best[height<=1080]', |
|
|
'merge_output_format': 'mp4', |
|
|
}) |
|
|
elif model == "720p": |
|
|
ydl_opts.update({ |
|
|
'format': 'bestvideo[height=720]+bestaudio/best[height=720]', |
|
|
'merge_output_format': 'mp4', |
|
|
}) |
|
|
elif model == "480p": |
|
|
ydl_opts.update({ |
|
|
'format': 'bestvideo[height<=480]+bestaudio/best[height<=480]', |
|
|
'merge_output_format': 'mp4', |
|
|
}) |
|
|
else: |
|
|
raise ValueError("model must be 'mp3' or 'high-definition'") |
|
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
result = ydl.download([video_url]) |
|
|
return ydl.prepare_filename(ydl.extract_info(video_url, download=True)).replace('.webm', '.mp3' if model == "mp3" else '.webm') |
|
|
|
|
|
|
|
|
return output_dir |
|
|
|
|
|
|
|
|
|
|
|
async def demo(): |
|
|
provider = YouTubeProvider() |
|
|
results = await provider.search("Python programming", max_results=2) |
|
|
print("Search results:", results) |
|
|
if results: |
|
|
video_url = results[0]['url'] |
|
|
path = await provider.download(video_url, model="mp3") |
|
|
print("Downloaded to:", path) |
|
|
|
|
|
|
|
|
|
|
|
|