tuan3335 commited on
Commit
a9b5cb5
·
1 Parent(s): 1cf80b8

Add structured output with Pydantic, fix tool selection logic, add YouTube cookies support, disable thinking mode

Browse files
__pycache__/app.cpython-312.pyc ADDED
Binary file (6.18 kB). View file
 
agent.py CHANGED
@@ -30,6 +30,9 @@ from huggingface_hub import InferenceClient
30
  # Groq imports for fallback
31
  from groq import Groq
32
 
 
 
 
33
  # Utils system imports
34
  from utils import (
35
  process_question_with_tools,
@@ -66,6 +69,25 @@ class AgentState(TypedDict):
66
  final_answer: str
67
  processing_complete: bool
68
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
  # =============================================================================
70
  # AI BRAIN WITH LANGCHAIN
71
  # =============================================================================
@@ -93,6 +115,30 @@ class LangChainQwen3Brain:
93
 
94
  print("🧠 LangChain Hybrid Brain initialized (HF + Groq fallback)")
95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
  def _invoke_model(self, messages: List[Dict[str, str]]) -> str:
97
  """Invoke model with messages - try HF first, fallback to Groq"""
98
 
@@ -120,63 +166,48 @@ class LangChainQwen3Brain:
120
  return completion.choices[0].message.content
121
  except Exception as groq_error:
122
  return f"AI Error: Both HF ({str(hf_error)[:50]}) and Groq ({str(groq_error)[:50]}) failed"
123
-
124
  def analyze_question(self, question: str, task_id: str = "") -> Dict[str, Any]:
125
- """AI analyzes question and decides approach"""
126
 
127
- system_prompt = get_system_prompt("main_agent")
 
128
 
129
- analysis_prompt = f"""
 
130
  Analyze this question and decide the approach:
131
 
132
  Question: "{question}"
133
  Task ID: "{task_id}"
134
 
135
- Provide your analysis in JSON format:
136
- {{
137
- "question_type": "youtube|image|audio|wiki|file|text|math",
138
- "needs_tools": true/false,
139
- "reasoning": "your reasoning",
140
- "confidence": "high|medium|low",
141
- "can_answer_directly": true/false,
142
- "suggested_approach": "brief description"
143
- }}
144
-
145
- Important:
146
- - If task_id is provided, likely has file attachment
147
- - Look for URLs, especially YouTube
148
- - Consider if question seems reversed/malformed
149
- - Be intelligent about what tools are actually needed
150
  """
 
 
 
 
 
 
151
 
152
- messages = [
153
- {"role": "system", "content": system_prompt},
154
- {"role": "user", "content": analysis_prompt}
155
- ]
156
-
157
- response = self._invoke_model(messages)
158
-
159
- # Try to parse JSON
160
- try:
161
- # Extract JSON from response
162
- import re
163
- json_match = re.search(r'\{.*\}', response, re.DOTALL)
164
- if json_match:
165
- analysis = json.loads(json_match.group())
166
- return analysis
167
- else:
168
- raise ValueError("No JSON found")
169
- except:
170
- # Fallback analysis
171
- question_type = analyze_question_type(question)
172
- return {
173
- "question_type": question_type,
174
- "needs_tools": bool(task_id) or question_type != "text",
175
- "reasoning": "JSON parsing failed, using fallback analysis",
176
- "confidence": "medium",
177
- "can_answer_directly": question_type == "text" and not task_id,
178
- "suggested_approach": f"Use {question_type} processing"
179
- }
180
 
181
  def generate_final_answer(self, question: str, tool_results: Dict[str, Any], context: str = "") -> str:
182
  """Generate final answer using LangChain"""
@@ -194,7 +225,7 @@ Important:
194
  "final_answer",
195
  question=question,
196
  context_summary=context_summary
197
- )
198
 
199
  messages = [
200
  {"role": "system", "content": get_system_prompt("reasoning_agent")},
@@ -204,9 +235,13 @@ Important:
204
  return self._invoke_model(messages)
205
 
206
  def decide_on_reversed_text(self, original: str, reversed: str) -> Dict[str, Any]:
207
- """AI decides which version of text to use"""
 
 
 
208
 
209
- decision_prompt = f"""
 
210
  You are analyzing two versions of the same text to determine which makes more sense:
211
 
212
  Original: "{original}"
@@ -215,28 +250,14 @@ Reversed: "{reversed}"
215
  Analyze both versions and decide which one is more likely to be the correct question.
216
  Consider grammar, word order, and meaning.
217
 
218
- Respond in JSON format:
219
- {{
220
- "chosen_version": "original|reversed",
221
- "reasoning": "your reasoning",
222
- "confidence": "high|medium|low"
223
- }}
224
  """
225
-
226
- messages = [
227
- {"role": "system", "content": "You are a text analysis expert."},
228
- {"role": "user", "content": decision_prompt}
229
- ]
230
-
231
- response = self._invoke_model(messages)
232
-
233
- try:
234
- import re
235
- json_match = re.search(r'\{.*\}', response, re.DOTALL)
236
- if json_match:
237
- return json.loads(json_match.group())
238
- except:
239
- pass
240
 
241
  # Fallback decision
242
  return {
 
30
  # Groq imports for fallback
31
  from groq import Groq
32
 
33
+ # Pydantic for structured output
34
+ from pydantic import BaseModel, Field
35
+
36
  # Utils system imports
37
  from utils import (
38
  process_question_with_tools,
 
69
  final_answer: str
70
  processing_complete: bool
71
 
72
+ # =============================================================================
73
+ # PYDANTIC SCHEMAS FOR STRUCTURED OUTPUT
74
+ # =============================================================================
75
+
76
+ class QuestionAnalysis(BaseModel):
77
+ """Schema for AI question analysis"""
78
+ question_type: str = Field(description="Type: youtube|image|audio|wiki|file|text|math")
79
+ needs_tools: bool = Field(description="Whether tools are needed")
80
+ reasoning: str = Field(description="AI reasoning for the decision")
81
+ confidence: str = Field(description="Confidence level: high|medium|low")
82
+ can_answer_directly: bool = Field(description="Can answer without tools")
83
+ suggested_approach: str = Field(description="Brief description of approach")
84
+
85
+ class TextDecision(BaseModel):
86
+ """Schema for reversed text decision"""
87
+ chosen_version: str = Field(description="original|reversed")
88
+ reasoning: str = Field(description="Reasoning for the choice")
89
+ confidence: str = Field(description="Confidence level: high|medium|low")
90
+
91
  # =============================================================================
92
  # AI BRAIN WITH LANGCHAIN
93
  # =============================================================================
 
115
 
116
  print("🧠 LangChain Hybrid Brain initialized (HF + Groq fallback)")
117
 
118
+ def _create_structured_model(self, schema: BaseModel):
119
+ """Create model with structured output"""
120
+ try:
121
+ # Try HuggingFace with structured output
122
+ from langchain_huggingface import ChatHuggingFace
123
+ hf_model = ChatHuggingFace(
124
+ llm=self.hf_client,
125
+ model_id=self.hf_model
126
+ )
127
+ return hf_model.with_structured_output(schema)
128
+ except Exception as hf_error:
129
+ print(f"⚠️ HF structured output failed: {str(hf_error)[:50]}...")
130
+ try:
131
+ # Fallback to Groq with structured output
132
+ from langchain_groq import ChatGroq
133
+ groq_model = ChatGroq(
134
+ api_key=os.environ.get("GROQ_API_KEY", ""),
135
+ model=self.groq_model
136
+ )
137
+ return groq_model.with_structured_output(schema)
138
+ except Exception as groq_error:
139
+ print(f"⚠️ Both structured output failed")
140
+ return None
141
+
142
  def _invoke_model(self, messages: List[Dict[str, str]]) -> str:
143
  """Invoke model with messages - try HF first, fallback to Groq"""
144
 
 
166
  return completion.choices[0].message.content
167
  except Exception as groq_error:
168
  return f"AI Error: Both HF ({str(hf_error)[:50]}) and Groq ({str(groq_error)[:50]}) failed"
169
+
170
  def analyze_question(self, question: str, task_id: str = "") -> Dict[str, Any]:
171
+ """AI analyzes question and decides approach with structured output"""
172
 
173
+ # Create structured model
174
+ structured_model = self._create_structured_model(QuestionAnalysis)
175
 
176
+ if structured_model:
177
+ analysis_prompt = f"""
178
  Analyze this question and decide the approach:
179
 
180
  Question: "{question}"
181
  Task ID: "{task_id}"
182
 
183
+ Important rules:
184
+ - If question asks about Mercedes Sosa albums, Wikipedia, historical facts -> use "wiki"
185
+ - If YouTube URL present -> use "youtube"
186
+ - If mentions image, photo, chess position -> use "image"
187
+ - If mentions audio, voice, mp3 -> use "audio"
188
+ - If mentions file attachment, Excel, CSV -> use "file"
189
+ - For math, tables, logic problems -> use "text" but needs_tools=false
190
+ - Be accurate about question_type to trigger correct tools
191
+
192
+ /no_thinking
 
 
 
 
 
193
  """
194
+
195
+ try:
196
+ result = structured_model.invoke(analysis_prompt)
197
+ return result.dict()
198
+ except Exception as e:
199
+ print(f"⚠️ Structured analysis failed: {str(e)[:50]}...")
200
 
201
+ # Fallback analysis
202
+ question_type = analyze_question_type(question)
203
+ return {
204
+ "question_type": question_type,
205
+ "needs_tools": bool(task_id) or question_type in ["wiki", "youtube", "image", "audio", "file"],
206
+ "reasoning": "Fallback analysis - structured output failed",
207
+ "confidence": "medium",
208
+ "can_answer_directly": question_type == "text" and not task_id,
209
+ "suggested_approach": f"Use {question_type} processing"
210
+ }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
211
 
212
  def generate_final_answer(self, question: str, tool_results: Dict[str, Any], context: str = "") -> str:
213
  """Generate final answer using LangChain"""
 
225
  "final_answer",
226
  question=question,
227
  context_summary=context_summary
228
+ ) + "\n\n/no_thinking"
229
 
230
  messages = [
231
  {"role": "system", "content": get_system_prompt("reasoning_agent")},
 
235
  return self._invoke_model(messages)
236
 
237
  def decide_on_reversed_text(self, original: str, reversed: str) -> Dict[str, Any]:
238
+ """AI decides which version of text to use with structured output"""
239
+
240
+ # Create structured model
241
+ structured_model = self._create_structured_model(TextDecision)
242
 
243
+ if structured_model:
244
+ decision_prompt = f"""
245
  You are analyzing two versions of the same text to determine which makes more sense:
246
 
247
  Original: "{original}"
 
250
  Analyze both versions and decide which one is more likely to be the correct question.
251
  Consider grammar, word order, and meaning.
252
 
253
+ /no_thinking
 
 
 
 
 
254
  """
255
+
256
+ try:
257
+ result = structured_model.invoke(decision_prompt)
258
+ return result.dict()
259
+ except Exception as e:
260
+ print(f"⚠️ Structured decision failed: {str(e)[:50]}...")
 
 
 
 
 
 
 
 
 
261
 
262
  # Fallback decision
263
  return {
cookies.txt ADDED
The diff for this file is too large to render. See raw diff
 
requirements.txt CHANGED
@@ -13,6 +13,9 @@ yt-dlp>=2024.12.23
13
  langchain==0.3.13
14
  langchain-core==0.3.29
15
  langgraph==0.2.61
 
 
 
16
 
17
  # Transformers for multimodal models
18
  transformers>=4.44.0
 
13
  langchain==0.3.13
14
  langchain-core==0.3.29
15
  langgraph==0.2.61
16
+ langchain-huggingface>=0.1.0
17
+ langchain-groq>=0.2.0
18
+ pydantic>=2.0.0
19
 
20
  # Transformers for multimodal models
21
  transformers>=4.44.0
utils/tool_orchestrator.py CHANGED
@@ -71,30 +71,29 @@ class ToolOrchestrator:
71
 
72
  def determine_tools_to_run(self, question_type: str, has_file: bool, has_urls: bool) -> List[str]:
73
  """
74
- Xác định tools cần chạy dựa trên question type
75
  """
76
  tools_to_run = []
77
 
78
- # Luôn chạy text processor nếu cần
79
- if question_type in ["text", "unknown"]:
80
- tools_to_run.append("text_processor")
81
-
82
- # Tools theo loại câu hỏi
83
- if question_type == "youtube" or "youtube" in str(has_urls):
84
  tools_to_run.append("youtube_tool")
85
- elif question_type == "image" or has_file:
86
  tools_to_run.append("image_ocr")
87
- elif question_type == "audio" or has_file:
88
  tools_to_run.append("audio_transcript")
89
  elif question_type == "wiki":
90
  tools_to_run.append("wiki_search")
91
- elif question_type == "file" or has_file:
92
  tools_to_run.append("file_reader")
 
 
93
 
94
- # Fallback: nếu không xác định được, thử wiki search
95
- if not tools_to_run or question_type == "unknown":
96
- tools_to_run.append("wiki_search")
97
 
 
98
  return tools_to_run
99
 
100
  def run_tools_sequence(self, tools_list: List[str], question: str, task_id: str = "") -> List[ToolResult]:
 
71
 
72
  def determine_tools_to_run(self, question_type: str, has_file: bool, has_urls: bool) -> List[str]:
73
  """
74
+ Xác định tools cần chạy dựa trên question type - FIXED LOGIC
75
  """
76
  tools_to_run = []
77
 
78
+ # Tools theo loại câu hỏi từ AI analysis
79
+ if question_type == "youtube":
 
 
 
 
80
  tools_to_run.append("youtube_tool")
81
+ elif question_type == "image":
82
  tools_to_run.append("image_ocr")
83
+ elif question_type == "audio":
84
  tools_to_run.append("audio_transcript")
85
  elif question_type == "wiki":
86
  tools_to_run.append("wiki_search")
87
+ elif question_type == "file":
88
  tools_to_run.append("file_reader")
89
+ elif question_type == "text":
90
+ tools_to_run.append("text_processor")
91
 
92
+ # Fallback cho math hoặc unknown
93
+ if question_type in ["math", "unknown"] or not tools_to_run:
94
+ tools_to_run.append("text_processor")
95
 
96
+ print(f"🎯 Tools to run: {tools_to_run}")
97
  return tools_to_run
98
 
99
  def run_tools_sequence(self, tools_list: List[str], question: str, task_id: str = "") -> List[ToolResult]:
utils/youtube_tool.py CHANGED
@@ -71,8 +71,12 @@ def get_youtube_content(question: str) -> Dict[str, Any]:
71
  print(f"Found YouTube URL: {youtube_url}")
72
 
73
  try:
74
- # Sử dụng yt-dlp để lấy metadata an toàn
75
  import yt_dlp
 
 
 
 
76
 
77
  ydl_opts = {
78
  'writesubtitles': True,
@@ -83,6 +87,13 @@ def get_youtube_content(question: str) -> Dict[str, Any]:
83
  'no_warnings': True
84
  }
85
 
 
 
 
 
 
 
 
86
  with yt_dlp.YoutubeDL(ydl_opts) as ydl:
87
  info = ydl.extract_info(youtube_url, download=False)
88
 
 
71
  print(f"Found YouTube URL: {youtube_url}")
72
 
73
  try:
74
+ # Sử dụng yt-dlp để lấy metadata an toàn với cookies
75
  import yt_dlp
76
+ import os
77
+
78
+ # Path to cookies file
79
+ cookies_path = "cookies.txt"
80
 
81
  ydl_opts = {
82
  'writesubtitles': True,
 
87
  'no_warnings': True
88
  }
89
 
90
+ # Add cookies if file exists
91
+ if os.path.exists(cookies_path):
92
+ ydl_opts['cookiefile'] = cookies_path
93
+ print(f"🍪 Using cookies from {cookies_path}")
94
+ else:
95
+ print("⚠️ No cookies.txt found, trying without cookies")
96
+
97
  with yt_dlp.YoutubeDL(ydl_opts) as ydl:
98
  info = ydl.extract_info(youtube_url, download=False)
99