File size: 5,754 Bytes
a4b70d9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
from __future__ import annotations
import os
try:
import yt_dlp
has_yt_dlp = True
except ImportError:
has_yt_dlp = False
from ...typing import AsyncResult, Messages
from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin
from ...providers.response import AudioResponse, VideoResponse, YouTubeResponse
from ...image.copy_images import get_media_dir
from ..helper import format_media_prompt
class YouTube(AsyncGeneratorProvider, ProviderModelMixin):
url = "https://youtube.com"
working = has_yt_dlp
default_model = "search"
models = ["mp3", "1080p", "720p", "480p", "search"]
@classmethod
async def create_async_generator(
cls,
model: str,
messages: Messages,
prompt: str = None,
**kwargs
) -> AsyncResult:
prompt = format_media_prompt(messages, prompt)
results = [{
"id": line.split("?v=")[-1].split("&")[0],
"url": line
} for line in prompt.splitlines()
if line.startswith("https://www.youtube.com/watch?v=")]
provider = YouTubeProvider()
if not results:
results = await provider.search(prompt, max_results=10)
new_results = []
for result in results:
video_url = result['url']
has_video = False
for message in messages:
if isinstance(message.get("content"), str):
if video_url in message["content"] and (model == "search" or model in message["content"]):
has_video = True
break
if has_video:
continue
new_results.append(result)
if model == "search":
yield YouTubeResponse([result["id"] for result in new_results[:5]], True)
else:
if new_results:
video_url = new_results[0]['url']
path = await provider.download(video_url, model=model, output_dir=get_media_dir())
if path.endswith('.mp3'):
yield AudioResponse(f"/media/{os.path.basename(path)}")
else:
yield VideoResponse(f"/media/{os.path.basename(path)}", prompt)
yield f"\n\n[{video_url}]({video_url})\n\n"
class YouTubeProvider:
"""
Search and download YouTube videos.
model: "mp3" for audio only, or "high-definition" for best video
"""
def __init__(self):
pass
async def search(self, query: str, max_results: int = 5) -> list[dict]:
"""
Search YouTube for videos matching the query.
Returns a list of dicts with keys: title, url, id, duration
"""
ydl_opts = {
'quiet': True,
'extract_flat': True,
'skip_download': True,
}
search_url = f"ytsearch{max_results}:{query}"
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(search_url, download=False)
results = []
for entry in info.get('entries', []):
results.append({
'title': entry.get('title'),
'url': f"https://www.youtube.com/watch?v={entry.get('id')}",
'id': entry.get('id'),
'duration': entry.get('duration'),
})
return results
async def download(self, video_url: str, model: str = "720p", output_dir: str = ".") -> str:
"""
Download a YouTube video.
:param video_url: The video URL or video id
:param model: "mp3" for audio, "high-definition" for best video
:param output_dir: Download location
:return: The path to the downloaded file
"""
ydl_opts = {
'outtmpl': f"{output_dir}/%(title)s{'' if model == 'mp3' else (' ' + model)}.%(ext)s",
'quiet': True,
}
if model == "mp3":
# Audio only, best quality
ydl_opts.update({
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192'
}]
})
elif model == "1080p":
ydl_opts.update({
'format': 'bestvideo[height<=1080]+bestaudio/best[height<=1080]',
'merge_output_format': 'mp4',
})
elif model == "720p":
ydl_opts.update({
'format': 'bestvideo[height=720]+bestaudio/best[height=720]',
'merge_output_format': 'mp4',
})
elif model == "480p":
ydl_opts.update({
'format': 'bestvideo[height<=480]+bestaudio/best[height<=480]',
'merge_output_format': 'mp4',
})
else:
raise ValueError("model must be 'mp3' or 'high-definition'")
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
result = ydl.download([video_url])
return ydl.prepare_filename(ydl.extract_info(video_url, download=True)).replace('.webm', '.mp3' if model == "mp3" else '.webm')
# You can get actual file path via ydl.prepare_filename
# This is a simplified return - usually, you would parse the output or check the directory
return output_dir
# Example usage (async function to test)
async def demo():
provider = YouTubeProvider()
results = await provider.search("Python programming", max_results=2)
print("Search results:", results)
if results:
video_url = results[0]['url']
path = await provider.download(video_url, model="mp3")
print("Downloaded to:", path)
# To actually run demo()
# asyncio.run(demo())
|