LoRDxdd's picture
Add gpt4free API for Hugging Face
a4b70d9
from __future__ import annotations
import os
try:
import yt_dlp
has_yt_dlp = True
except ImportError:
has_yt_dlp = False
from ...typing import AsyncResult, Messages
from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin
from ...providers.response import AudioResponse, VideoResponse, YouTubeResponse
from ...image.copy_images import get_media_dir
from ..helper import format_media_prompt
class YouTube(AsyncGeneratorProvider, ProviderModelMixin):
url = "https://youtube.com"
working = has_yt_dlp
default_model = "search"
models = ["mp3", "1080p", "720p", "480p", "search"]
@classmethod
async def create_async_generator(
cls,
model: str,
messages: Messages,
prompt: str = None,
**kwargs
) -> AsyncResult:
prompt = format_media_prompt(messages, prompt)
results = [{
"id": line.split("?v=")[-1].split("&")[0],
"url": line
} for line in prompt.splitlines()
if line.startswith("https://www.youtube.com/watch?v=")]
provider = YouTubeProvider()
if not results:
results = await provider.search(prompt, max_results=10)
new_results = []
for result in results:
video_url = result['url']
has_video = False
for message in messages:
if isinstance(message.get("content"), str):
if video_url in message["content"] and (model == "search" or model in message["content"]):
has_video = True
break
if has_video:
continue
new_results.append(result)
if model == "search":
yield YouTubeResponse([result["id"] for result in new_results[:5]], True)
else:
if new_results:
video_url = new_results[0]['url']
path = await provider.download(video_url, model=model, output_dir=get_media_dir())
if path.endswith('.mp3'):
yield AudioResponse(f"/media/{os.path.basename(path)}")
else:
yield VideoResponse(f"/media/{os.path.basename(path)}", prompt)
yield f"\n\n[{video_url}]({video_url})\n\n"
class YouTubeProvider:
"""
Search and download YouTube videos.
model: "mp3" for audio only, or "high-definition" for best video
"""
def __init__(self):
pass
async def search(self, query: str, max_results: int = 5) -> list[dict]:
"""
Search YouTube for videos matching the query.
Returns a list of dicts with keys: title, url, id, duration
"""
ydl_opts = {
'quiet': True,
'extract_flat': True,
'skip_download': True,
}
search_url = f"ytsearch{max_results}:{query}"
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(search_url, download=False)
results = []
for entry in info.get('entries', []):
results.append({
'title': entry.get('title'),
'url': f"https://www.youtube.com/watch?v={entry.get('id')}",
'id': entry.get('id'),
'duration': entry.get('duration'),
})
return results
async def download(self, video_url: str, model: str = "720p", output_dir: str = ".") -> str:
"""
Download a YouTube video.
:param video_url: The video URL or video id
:param model: "mp3" for audio, "high-definition" for best video
:param output_dir: Download location
:return: The path to the downloaded file
"""
ydl_opts = {
'outtmpl': f"{output_dir}/%(title)s{'' if model == 'mp3' else (' ' + model)}.%(ext)s",
'quiet': True,
}
if model == "mp3":
# Audio only, best quality
ydl_opts.update({
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192'
}]
})
elif model == "1080p":
ydl_opts.update({
'format': 'bestvideo[height<=1080]+bestaudio/best[height<=1080]',
'merge_output_format': 'mp4',
})
elif model == "720p":
ydl_opts.update({
'format': 'bestvideo[height=720]+bestaudio/best[height=720]',
'merge_output_format': 'mp4',
})
elif model == "480p":
ydl_opts.update({
'format': 'bestvideo[height<=480]+bestaudio/best[height<=480]',
'merge_output_format': 'mp4',
})
else:
raise ValueError("model must be 'mp3' or 'high-definition'")
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
result = ydl.download([video_url])
return ydl.prepare_filename(ydl.extract_info(video_url, download=True)).replace('.webm', '.mp3' if model == "mp3" else '.webm')
# You can get actual file path via ydl.prepare_filename
# This is a simplified return - usually, you would parse the output or check the directory
return output_dir
# Example usage (async function to test)
async def demo():
provider = YouTubeProvider()
results = await provider.search("Python programming", max_results=2)
print("Search results:", results)
if results:
video_url = results[0]['url']
path = await provider.download(video_url, model="mp3")
print("Downloaded to:", path)
# To actually run demo()
# asyncio.run(demo())