Spaces:
Sleeping
Sleeping
File size: 7,412 Bytes
92d2175 040a6c6 92d2175 040a6c6 92d2175 040a6c6 92d2175 040a6c6 92d2175 040a6c6 92d2175 040a6c6 32dd219 92d2175 040a6c6 92d2175 040a6c6 92d2175 040a6c6 92d2175 040a6c6 a9b5cb5 040a6c6 92d2175 040a6c6 92d2175 040a6c6 92d2175 040a6c6 92d2175 040a6c6 92d2175 040a6c6 92d2175 040a6c6 92d2175 040a6c6 |
|
"""
YOUTUBE PROCESSING TOOL
Enhanced with cookies support for bot detection bypass
"""
import os
import re
import json
import tempfile
from typing import Dict, Any, Optional, List
from urllib.parse import urlparse, parse_qs
try:
from pytube import YouTube
from youtube_transcript_api import YouTubeTranscriptApi
import yt_dlp
except ImportError as e:
print(f"⚠️ YouTube dependencies missing: {e}")
from .state_manager import get_agent_state
class YouTubeTool:
def __init__(self):
# Lấy path cookie từ biến môi trường
self.cookies_path = os.environ.get("YOUTUBE_COOKIES_PATH")
if not self.cookies_path:
raise ValueError("YOUTUBE_COOKIES_PATH environment variable is not set. Please set it to the path of your cookies.txt file.")
print(f"🎬 YouTube Tool with cookies support initialized. Cookie path: {self.cookies_path}")
def process_youtube(self, youtube_input: str, **kwargs) -> Dict[str, Any]:
"""
Process YouTube content with cookie authentication
"""
try:
# Extract video ID from URL or use as-is
video_id = self._extract_video_id(youtube_input)
if not video_id:
return self._error_result("Invalid YouTube URL or video ID")
print(f"🎬 Processing YouTube video: {video_id}")
# Try multiple extraction methods
video_data = self._extract_with_cookies(video_id) or self._extract_with_pytube(video_id)
if not video_data:
return self._error_result("Could not extract video data")
# Update agent state
state = get_agent_state()
state.cached_data["youtube_analysis"] = video_data
return {
"success": True,
"data": video_data,
"summary": f"YouTube video processed: {video_data.get('title', 'Unknown')[:50]}..."
}
except Exception as e:
error_msg = f"YouTube processing failed: {str(e)}"
print(f"❌ {error_msg}")
return self._error_result(error_msg)
def _extract_video_id(self, url_or_id: str) -> Optional[str]:
"""Extract video ID from YouTube URL or return if already ID"""
if len(url_or_id) == 11 and url_or_id.isalnum():
return url_or_id
# Extract from various YouTube URL formats
patterns = [
r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})',
r'youtube\.com/.*[?&]v=([a-zA-Z0-9_-]{11})',
]
for pattern in patterns:
match = re.search(pattern, url_or_id)
if match:
return match.group(1)
return None
def _extract_with_cookies(self, video_id: str) -> Optional[Dict[str, Any]]:
"""Extract using yt-dlp with cookies for better success rate"""
try:
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extractaudio': False,
'extract_flat': False,
}
# Add cookies if file exists
if os.path.exists(self.cookies_path):
ydl_opts['cookiefile'] = self.cookies_path
print(f"🍪 Using cookies from: {self.cookies_path}")
url = f"https://www.youtube.com/watch?v={video_id}"
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
# Extract transcript using youtube-transcript-api
transcript = self._get_transcript(video_id)
return {
"video_id": video_id,
"title": info.get('title', ''),
"description": info.get('description', ''),
"channel": info.get('uploader', ''),
"duration": info.get('duration', 0),
"view_count": info.get('view_count', 0),
"transcript": transcript,
"thumbnail_url": info.get('thumbnail', ''),
"upload_date": info.get('upload_date', ''),
"url": url,
"extraction_method": "yt-dlp_with_cookies"
}
except Exception as e:
print(f"⚠️ yt-dlp extraction failed: {str(e)}")
return None
def _extract_with_pytube(self, video_id: str) -> Optional[Dict[str, Any]]:
"""Fallback extraction using pytube"""
try:
url = f"https://www.youtube.com/watch?v={video_id}"
yt = YouTube(url)
transcript = self._get_transcript(video_id)
return {
"video_id": video_id,
"title": yt.title or '',
"description": yt.description or '',
"channel": yt.author or '',
"duration": yt.length or 0,
"view_count": yt.views or 0,
"transcript": transcript,
"thumbnail_url": yt.thumbnail_url or '',
"upload_date": str(yt.publish_date) if yt.publish_date else '',
"url": url,
"extraction_method": "pytube_fallback"
}
except Exception as e:
print(f"⚠️ PyTube extraction failed: {str(e)}")
return None
def _get_transcript(self, video_id: str) -> str:
"""Get video transcript using youtube-transcript-api"""
try:
# Try to get transcript in multiple languages
languages = ['en', 'en-US', 'auto', 'vi']
for lang in languages:
try:
transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[lang])
transcript_text = ' '.join([entry['text'] for entry in transcript_list])
if transcript_text.strip():
return transcript_text
except:
continue
# If no manual transcript, try auto-generated
try:
transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
return ' '.join([entry['text'] for entry in transcript_list])
except:
return "No transcript available"
except Exception as e:
print(f"⚠️ Transcript extraction failed: {str(e)}")
return "Transcript extraction failed"
def is_youtube_url(self, text: str) -> bool:
"""Check if text contains YouTube URL"""
youtube_patterns = [
r'youtube\.com/watch\?v=',
r'youtu\.be/',
r'youtube\.com/embed/',
r'youtube\.com/.*[?&]v='
]
return any(re.search(pattern, text, re.IGNORECASE) for pattern in youtube_patterns)
def _error_result(self, error_msg: str) -> Dict[str, Any]:
"""Standard error result format"""
return {
"success": False,
"error": error_msg,
"data": None,
"summary": f"YouTube processing failed: {error_msg}"
} |