""" YOUTUBE PROCESSING TOOL Enhanced with cookies support for bot detection bypass """ import os import re import json import tempfile from typing import Dict, Any, Optional, List from urllib.parse import urlparse, parse_qs try: from pytube import YouTube from youtube_transcript_api import YouTubeTranscriptApi import yt_dlp except ImportError as e: print(f"⚠️ YouTube dependencies missing: {e}") from .state_manager import get_agent_state class YouTubeTool: def __init__(self): # Lấy path cookie từ biến môi trường self.cookies_path = os.environ.get("YOUTUBE_COOKIES_PATH") if not self.cookies_path: raise ValueError("YOUTUBE_COOKIES_PATH environment variable is not set. Please set it to the path of your cookies.txt file.") print(f"🎬 YouTube Tool with cookies support initialized. Cookie path: {self.cookies_path}") def process_youtube(self, youtube_input: str, **kwargs) -> Dict[str, Any]: """ Process YouTube content with cookie authentication """ try: # Extract video ID from URL or use as-is video_id = self._extract_video_id(youtube_input) if not video_id: return self._error_result("Invalid YouTube URL or video ID") print(f"🎬 Processing YouTube video: {video_id}") # Try multiple extraction methods video_data = self._extract_with_cookies(video_id) or self._extract_with_pytube(video_id) if not video_data: return self._error_result("Could not extract video data") # Update agent state state = get_agent_state() state.cached_data["youtube_analysis"] = video_data return { "success": True, "data": video_data, "summary": f"YouTube video processed: {video_data.get('title', 'Unknown')[:50]}..." } except Exception as e: error_msg = f"YouTube processing failed: {str(e)}" print(f"❌ {error_msg}") return self._error_result(error_msg) def _extract_video_id(self, url_or_id: str) -> Optional[str]: """Extract video ID from YouTube URL or return if already ID""" if len(url_or_id) == 11 and url_or_id.isalnum(): return url_or_id # Extract from various YouTube URL formats patterns = [ r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})', r'youtube\.com/.*[?&]v=([a-zA-Z0-9_-]{11})', ] for pattern in patterns: match = re.search(pattern, url_or_id) if match: return match.group(1) return None def _extract_with_cookies(self, video_id: str) -> Optional[Dict[str, Any]]: """Extract using yt-dlp with cookies for better success rate""" try: ydl_opts = { 'quiet': True, 'no_warnings': True, 'extractaudio': False, 'extract_flat': False, } # Add cookies if file exists if os.path.exists(self.cookies_path): ydl_opts['cookiefile'] = self.cookies_path print(f"🍪 Using cookies from: {self.cookies_path}") url = f"https://www.youtube.com/watch?v={video_id}" with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) # Extract transcript using youtube-transcript-api transcript = self._get_transcript(video_id) return { "video_id": video_id, "title": info.get('title', ''), "description": info.get('description', ''), "channel": info.get('uploader', ''), "duration": info.get('duration', 0), "view_count": info.get('view_count', 0), "transcript": transcript, "thumbnail_url": info.get('thumbnail', ''), "upload_date": info.get('upload_date', ''), "url": url, "extraction_method": "yt-dlp_with_cookies" } except Exception as e: print(f"⚠️ yt-dlp extraction failed: {str(e)}") return None def _extract_with_pytube(self, video_id: str) -> Optional[Dict[str, Any]]: """Fallback extraction using pytube""" try: url = f"https://www.youtube.com/watch?v={video_id}" yt = YouTube(url) transcript = self._get_transcript(video_id) return { "video_id": video_id, "title": yt.title or '', "description": yt.description or '', "channel": yt.author or '', "duration": yt.length or 0, "view_count": yt.views or 0, "transcript": transcript, "thumbnail_url": yt.thumbnail_url or '', "upload_date": str(yt.publish_date) if yt.publish_date else '', "url": url, "extraction_method": "pytube_fallback" } except Exception as e: print(f"⚠️ PyTube extraction failed: {str(e)}") return None def _get_transcript(self, video_id: str) -> str: """Get video transcript using youtube-transcript-api""" try: # Try to get transcript in multiple languages languages = ['en', 'en-US', 'auto', 'vi'] for lang in languages: try: transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[lang]) transcript_text = ' '.join([entry['text'] for entry in transcript_list]) if transcript_text.strip(): return transcript_text except: continue # If no manual transcript, try auto-generated try: transcript_list = YouTubeTranscriptApi.get_transcript(video_id) return ' '.join([entry['text'] for entry in transcript_list]) except: return "No transcript available" except Exception as e: print(f"⚠️ Transcript extraction failed: {str(e)}") return "Transcript extraction failed" def is_youtube_url(self, text: str) -> bool: """Check if text contains YouTube URL""" youtube_patterns = [ r'youtube\.com/watch\?v=', r'youtu\.be/', r'youtube\.com/embed/', r'youtube\.com/.*[?&]v=' ] return any(re.search(pattern, text, re.IGNORECASE) for pattern in youtube_patterns) def _error_result(self, error_msg: str) -> Dict[str, Any]: """Standard error result format""" return { "success": False, "error": error_msg, "data": None, "summary": f"YouTube processing failed: {error_msg}" }