File size: 7,412 Bytes
92d2175
040a6c6
 
92d2175
 
040a6c6
92d2175
040a6c6
 
 
 
92d2175
040a6c6
 
 
 
 
 
92d2175
040a6c6
92d2175
040a6c6
 
32dd219
 
 
 
 
92d2175
040a6c6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92d2175
040a6c6
 
 
 
92d2175
040a6c6
 
 
 
 
92d2175
040a6c6
 
 
 
a9b5cb5
040a6c6
 
 
 
 
 
 
 
 
 
 
92d2175
040a6c6
 
 
 
92d2175
040a6c6
92d2175
040a6c6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92d2175
040a6c6
92d2175
 
040a6c6
 
 
 
 
 
 
 
 
 
 
92d2175
 
040a6c6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92d2175
040a6c6
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
"""
YOUTUBE PROCESSING TOOL
Enhanced with cookies support for bot detection bypass
"""

import os
import re
import json
import tempfile
from typing import Dict, Any, Optional, List
from urllib.parse import urlparse, parse_qs

try:
    from pytube import YouTube
    from youtube_transcript_api import YouTubeTranscriptApi
    import yt_dlp
except ImportError as e:
    print(f"⚠️ YouTube dependencies missing: {e}")

from .state_manager import get_agent_state

class YouTubeTool:
    def __init__(self):
        # Lấy path cookie từ biến môi trường
        self.cookies_path = os.environ.get("YOUTUBE_COOKIES_PATH")
        if not self.cookies_path:
            raise ValueError("YOUTUBE_COOKIES_PATH environment variable is not set. Please set it to the path of your cookies.txt file.")
        print(f"🎬 YouTube Tool with cookies support initialized. Cookie path: {self.cookies_path}")
    
    def process_youtube(self, youtube_input: str, **kwargs) -> Dict[str, Any]:
        """
        Process YouTube content with cookie authentication
        """
        try:
            # Extract video ID from URL or use as-is
            video_id = self._extract_video_id(youtube_input)
            if not video_id:
                return self._error_result("Invalid YouTube URL or video ID")
            
            print(f"🎬 Processing YouTube video: {video_id}")
            
            # Try multiple extraction methods
            video_data = self._extract_with_cookies(video_id) or self._extract_with_pytube(video_id)
            
            if not video_data:
                return self._error_result("Could not extract video data")
            
            # Update agent state
            state = get_agent_state()
            state.cached_data["youtube_analysis"] = video_data
            
            return {
                "success": True,
                "data": video_data,
                "summary": f"YouTube video processed: {video_data.get('title', 'Unknown')[:50]}..."
            }
            
        except Exception as e:
            error_msg = f"YouTube processing failed: {str(e)}"
            print(f"❌ {error_msg}")
            return self._error_result(error_msg)
    
    def _extract_video_id(self, url_or_id: str) -> Optional[str]:
        """Extract video ID from YouTube URL or return if already ID"""
        if len(url_or_id) == 11 and url_or_id.isalnum():
            return url_or_id
        
        # Extract from various YouTube URL formats
        patterns = [
            r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})',
            r'youtube\.com/.*[?&]v=([a-zA-Z0-9_-]{11})',
        ]
        
        for pattern in patterns:
            match = re.search(pattern, url_or_id)
            if match:
                return match.group(1)
        
        return None
    
    def _extract_with_cookies(self, video_id: str) -> Optional[Dict[str, Any]]:
        """Extract using yt-dlp with cookies for better success rate"""
        try:
            ydl_opts = {
                'quiet': True,
                'no_warnings': True,
                'extractaudio': False,
                'extract_flat': False,
            }
            
            # Add cookies if file exists
            if os.path.exists(self.cookies_path):
                ydl_opts['cookiefile'] = self.cookies_path
                print(f"🍪 Using cookies from: {self.cookies_path}")
            
            url = f"https://www.youtube.com/watch?v={video_id}"
            
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                info = ydl.extract_info(url, download=False)
                
                # Extract transcript using youtube-transcript-api
                transcript = self._get_transcript(video_id)
                
                return {
                    "video_id": video_id,
                    "title": info.get('title', ''),
                    "description": info.get('description', ''),
                    "channel": info.get('uploader', ''),
                    "duration": info.get('duration', 0),
                    "view_count": info.get('view_count', 0),
                    "transcript": transcript,
                    "thumbnail_url": info.get('thumbnail', ''),
                    "upload_date": info.get('upload_date', ''),
                    "url": url,
                    "extraction_method": "yt-dlp_with_cookies"
                }
        
        except Exception as e:
            print(f"⚠️ yt-dlp extraction failed: {str(e)}")
            return None
    
    def _extract_with_pytube(self, video_id: str) -> Optional[Dict[str, Any]]:
        """Fallback extraction using pytube"""
        try:
            url = f"https://www.youtube.com/watch?v={video_id}"
            yt = YouTube(url)
            
            transcript = self._get_transcript(video_id)
            
            return {
                "video_id": video_id,
                "title": yt.title or '',
                "description": yt.description or '',
                "channel": yt.author or '',
                "duration": yt.length or 0,
                "view_count": yt.views or 0,
                "transcript": transcript,
                "thumbnail_url": yt.thumbnail_url or '',
                "upload_date": str(yt.publish_date) if yt.publish_date else '',
                "url": url,
                "extraction_method": "pytube_fallback"
            }
            
        except Exception as e:
            print(f"⚠️ PyTube extraction failed: {str(e)}")
            return None
    
    def _get_transcript(self, video_id: str) -> str:
        """Get video transcript using youtube-transcript-api"""
        try:
            # Try to get transcript in multiple languages
            languages = ['en', 'en-US', 'auto', 'vi']
            
            for lang in languages:
                try:
                    transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[lang])
                    transcript_text = ' '.join([entry['text'] for entry in transcript_list])
                    if transcript_text.strip():
                        return transcript_text
                except:
                    continue
            
            # If no manual transcript, try auto-generated
            try:
                transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
                return ' '.join([entry['text'] for entry in transcript_list])
            except:
                return "No transcript available"
                
        except Exception as e:
            print(f"⚠️ Transcript extraction failed: {str(e)}")
            return "Transcript extraction failed"
    
    def is_youtube_url(self, text: str) -> bool:
        """Check if text contains YouTube URL"""
        youtube_patterns = [
            r'youtube\.com/watch\?v=',
            r'youtu\.be/',
            r'youtube\.com/embed/',
            r'youtube\.com/.*[?&]v='
        ]
        return any(re.search(pattern, text, re.IGNORECASE) for pattern in youtube_patterns)
    
    def _error_result(self, error_msg: str) -> Dict[str, Any]:
        """Standard error result format"""
        return {
            "success": False,
            "error": error_msg,
            "data": None,
            "summary": f"YouTube processing failed: {error_msg}"
        }