rkihacker commited on
Commit
6c6c904
·
verified ·
1 Parent(s): 277b708

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +515 -89
main.py CHANGED
@@ -4,23 +4,27 @@ import json
4
  import logging
5
  import random
6
  import re
7
- from typing import AsyncGenerator, Optional, Tuple, List
8
-
9
- from fastapi import FastAPI
 
10
  from fastapi.responses import StreamingResponse
11
  from fastapi.middleware.cors import CORSMiddleware
12
  from pydantic import BaseModel
13
  from dotenv import load_dotenv
14
  import aiohttp
15
  from bs4 import BeautifulSoup
 
16
 
17
  # --- Configuration ---
18
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
19
- logger = aiohttp.log.access_logger # Use aiohttp's logger for better async context
20
-
 
 
21
  load_dotenv()
22
- LLM_API_KEY = os.getenv("LLM_API_KEY")
23
 
 
24
  if not LLM_API_KEY:
25
  raise RuntimeError("LLM_API_KEY must be set in a .env file.")
26
  else:
@@ -29,141 +33,563 @@ else:
29
  # --- Constants & Headers ---
30
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
31
  LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
32
- MAX_SOURCES_TO_PROCESS = 15
 
 
 
 
33
 
34
- # Real Browser User Agents for SCRAPING
35
- USER_AGENTS = [
36
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
37
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
38
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0"
39
- ]
 
 
 
 
 
 
 
40
 
41
- LLM_HEADERS = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json", "Accept": "application/json"}
 
 
 
 
42
 
43
  class DeepResearchRequest(BaseModel):
44
  query: str
45
 
46
  app = FastAPI(
47
  title="AI Deep Research API",
48
- description="Provides robust, long-form, streaming deep research completions using a simulated search.",
49
- version="10.0.0" # Final: Using simulated search to bypass external blocking.
 
 
 
 
 
 
 
50
  )
51
-
52
- app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
53
 
54
  def extract_json_from_llm_response(text: str) -> Optional[list]:
 
55
  match = re.search(r'\[.*\]', text, re.DOTALL)
56
  if match:
57
- try: return json.loads(match.group(0))
58
- except json.JSONDecodeError: return None
 
 
59
  return None
60
 
61
- async def call_duckduckgo_search(query: str, max_results: int = 10) -> List[dict]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62
  """
63
- Simulates a successful DuckDuckGo search to bypass anti-scraping measures.
64
- This function returns a static, hardcoded list of relevant search results
65
- for the topic "Nian" (Chinese New Year beast), allowing the rest of the
66
- application pipeline to be tested.
67
  """
68
- logging.info(f"Simulating search for: '{query}'")
69
-
70
- # Static results related to "Nian" myth, as "niansuh" yields no results.
71
- # This provides the scraper with valid URLs to process.
72
- simulated_results = [
73
- {'title': 'Nian - Wikipedia', 'link': 'https://en.wikipedia.org/wiki/Nian', 'snippet': 'The Nian is a beast from Chinese mythology. The Nian is said to have the body of a bull, the head of a lion with a single horn, and sharp teeth.'},
74
- {'title': 'The Legend of Nian and the Origins of Chinese New Year', 'link': 'https://www.chinahighlights.com/travelguide/festivals/story-of-nian.htm', 'snippet': 'Learn about the monster Nian and how the traditions of wearing red, setting off firecrackers, and staying up late came to be part of Chinese New Year.'},
75
- {'title': 'Nian: The Beast That Invented Chinese New Year - Culture Trip', 'link': 'https://theculturetrip.com/asia/china/articles/nian-the-beast-that-invented-chinese-new-year', 'snippet': 'Once a year, at the beginning of Chinese New Year, a beast named Nian would terrorize a small village in China, eating their crops, livestock, and children.'},
76
- {'title': 'Chinese New Year mythology: The story of Nian - British Museum', 'link': 'https://www.britishmuseum.org/blog/chinese-new-year-mythology-story-nian', 'snippet': 'Discover the mythical origins of the Chinese New Year celebration and the fearsome beast, Nian.'},
77
- {'title': 'Year of the Nian Monster - Asian Art Museum', 'link': 'https://education.asianart.org/resources/year-of-the-nian-monster/', 'snippet': 'A summary of the story of the Nian monster for educators and children, explaining the connection to modern traditions.'}
78
- ]
79
-
80
- logging.info(f"Returning {len(simulated_results)} static sources.")
81
- return simulated_results[:max_results]
82
-
83
-
84
- async def research_and_process_source(session: aiohttp.ClientSession, source: dict) -> Tuple[str, dict]:
85
- headers = {'User-Agent': random.choice(USER_AGENTS)}
86
  try:
87
- logging.info(f"Scraping: {source['link']}")
88
- if source['link'].lower().endswith('.pdf'): raise ValueError("PDF content")
89
- async with session.get(source['link'], headers=headers, timeout=10, ssl=False) as response:
90
- if response.status != 200: raise ValueError(f"HTTP status {response.status}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91
  html = await response.text()
92
  soup = BeautifulSoup(html, "html.parser")
93
- for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): tag.decompose()
94
- content = " ".join(soup.stripped_strings)
95
- if not content.strip(): raise ValueError("Parsed content is empty.")
96
- return content, source
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97
  except Exception as e:
98
- logging.warning(f"Scraping failed for {source['link']} ({e}). Falling back to snippet.")
99
- return source.get('snippet', ''), source
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
100
 
101
  async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
102
- def format_sse(data: dict) -> str: return f"data: {json.dumps(data)}\n\n"
 
 
 
 
 
 
 
103
  try:
104
- async with aiohttp.ClientSession() as session:
105
- yield format_sse({"event": "status", "data": "Generating research plan..."})
106
- plan_prompt = {"model": LLM_MODEL, "messages": [{"role": "user", "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array. Example: [\"Question 1?\"]"}]}
107
- try:
108
- async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=25) as response:
109
- response.raise_for_status(); result = await response.json()
110
- sub_questions = result if isinstance(result, list) else extract_json_from_llm_response(result['choices'][0]['message']['content'])
111
- if not isinstance(sub_questions, list) or not sub_questions: raise ValueError(f"Invalid plan from LLM: {result}")
112
- except Exception as e:
113
- yield format_sse({"event": "error", "data": f"Could not generate research plan. Reason: {e}"}); return
114
 
 
 
 
 
115
  yield format_sse({"event": "plan", "data": sub_questions})
116
 
117
- yield format_sse({"event": "status", "data": f"Searching sources for {len(sub_questions)} topics..."})
118
- search_tasks = [call_duckduckgo_search(sq) for sq in sub_questions]
119
- all_search_results = await asyncio.gather(*search_tasks)
120
- unique_sources = list({source['link']: source for results in all_search_results for source in results}.values())
 
121
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
122
  if not unique_sources:
123
- yield format_sse({"event": "error", "data": "The simulated search returned no sources. Check the hardcoded list."}); return
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
124
 
125
- sources_to_process = unique_sources[:MAX_SOURCES_TO_PROCESS]
126
- yield format_sse({"event": "status", "data": f"Found {len(unique_sources)} unique sources. Processing the top {len(sources_to_process)}..."})
 
 
 
 
127
 
128
- processing_tasks = [research_and_process_source(session, source) for source in sources_to_process]
129
- consolidated_context, all_sources_used = "", []
130
-
131
- for task in asyncio.as_completed(processing_tasks):
132
- content, source_info = await task
133
  if content and content.strip():
 
134
  consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
135
  all_sources_used.append(source_info)
 
 
 
 
136
 
137
  if not consolidated_context.strip():
138
- yield format_sse({"event": "error", "data": "Failed to scrape content from any of the discovered sources."}); return
 
 
 
 
139
 
140
- yield format_sse({"event": "status", "data": "Synthesizing final report..."})
141
- report_prompt = f'Synthesize the provided context into a long-form, comprehensive, multi-page report on "{query}". Use markdown. Elaborate extensively on each point. Base your entire report ONLY on the provided context.\n\n## Research Context ##\n{consolidated_context}'
142
- report_payload = {"model": LLM_MODEL, "messages": [{"role": "user", "content": report_prompt}], "stream": True}
 
 
 
143
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
144
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
145
  response.raise_for_status()
146
  async for line in response.content:
 
 
 
 
 
 
 
 
147
  line_str = line.decode('utf-8').strip()
148
- if line_str.startswith('data:'): line_str = line_str[5:].strip()
149
- if line_str == "[DONE]": break
 
 
150
  try:
151
  chunk = json.loads(line_str)
152
  choices = chunk.get("choices")
153
  if choices and isinstance(choices, list) and len(choices) > 0:
154
  content = choices[0].get("delta", {}).get("content")
155
- if content: yield format_sse({"event": "chunk", "data": content})
156
- except json.JSONDecodeError: continue
 
 
 
 
 
157
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
158
  yield format_sse({"event": "sources", "data": all_sources_used})
 
 
 
 
 
 
159
  except Exception as e:
160
- logging.error(f"A critical error occurred: {e}", exc_info=True)
161
- yield format_sse({"event": "error", "data": f"An unexpected error occurred: {str(e)}"})
 
 
 
 
 
 
 
 
 
162
 
163
  @app.post("/deep-research", response_class=StreamingResponse)
164
  async def deep_research_endpoint(request: DeepResearchRequest):
165
- return StreamingResponse(run_deep_research_stream(request.query), media_type="text/event-stream")
 
 
 
 
 
 
 
166
 
167
  if __name__ == "__main__":
168
  import uvicorn
169
- uvicorn.run(app, host="0.0.0.0", port=8000)
 
4
  import logging
5
  import random
6
  import re
7
+ import time
8
+ from typing import AsyncGenerator, Optional, Tuple, List, Dict
9
+ from urllib.parse import quote_plus
10
+ from fastapi import FastAPI, HTTPException
11
  from fastapi.responses import StreamingResponse
12
  from fastapi.middleware.cors import CORSMiddleware
13
  from pydantic import BaseModel
14
  from dotenv import load_dotenv
15
  import aiohttp
16
  from bs4 import BeautifulSoup
17
+ from fake_useragent import UserAgent
18
 
19
  # --- Configuration ---
20
+ logging.basicConfig(
21
+ level=logging.INFO,
22
+ format='%(asctime)s - %(levelname)s - %(message)s'
23
+ )
24
+ logger = logging.getLogger(__name__)
25
  load_dotenv()
 
26
 
27
+ LLM_API_KEY = os.getenv("LLM_API_KEY")
28
  if not LLM_API_KEY:
29
  raise RuntimeError("LLM_API_KEY must be set in a .env file.")
30
  else:
 
33
  # --- Constants & Headers ---
34
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
35
  LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
36
+ MAX_SOURCES_TO_PROCESS = 6 # Reduced to stay within time limits with real requests
37
+ MAX_CONCURRENT_REQUESTS = 3 # Be conservative with real websites
38
+ RESEARCH_TIMEOUT = 180 # 3 minutes maximum
39
+ REQUEST_DELAY = 2.0 # Longer delay between requests to be more polite
40
+ USER_AGENT_ROTATION = True
41
 
42
+ # Initialize fake user agent generator
43
+ try:
44
+ ua = UserAgent()
45
+ except:
46
+ # Fallback if fake_useragent isn't available
47
+ class SimpleUA:
48
+ def random(self):
49
+ return random.choice([
50
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
51
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
52
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0"
53
+ ])
54
+ ua = SimpleUA()
55
 
56
+ LLM_HEADERS = {
57
+ "Authorization": f"Bearer {LLM_API_KEY}",
58
+ "Content-Type": "application/json",
59
+ "Accept": "application/json"
60
+ }
61
 
62
  class DeepResearchRequest(BaseModel):
63
  query: str
64
 
65
  app = FastAPI(
66
  title="AI Deep Research API",
67
+ description="Provides robust, long-form, streaming deep research completions using real web searches.",
68
+ version="2.1.0" # Updated version
69
+ )
70
+ app.add_middleware(
71
+ CORSMiddleware,
72
+ allow_origins=["*"],
73
+ allow_credentials=True,
74
+ allow_methods=["*"],
75
+ allow_headers=["*"]
76
  )
 
 
77
 
78
  def extract_json_from_llm_response(text: str) -> Optional[list]:
79
+ """Extract JSON array from LLM response text."""
80
  match = re.search(r'\[.*\]', text, re.DOTALL)
81
  if match:
82
+ try:
83
+ return json.loads(match.group(0))
84
+ except json.JSONDecodeError:
85
+ return None
86
  return None
87
 
88
+ async def get_real_user_agent() -> str:
89
+ """Get a realistic user agent string."""
90
+ if USER_AGENT_ROTATION:
91
+ return ua.random()
92
+ return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
93
+
94
+ async def check_robots_txt(url: str) -> bool:
95
+ """Check if scraping is allowed by robots.txt."""
96
+ try:
97
+ domain = re.search(r'https?://([^/]+)', url)
98
+ if not domain:
99
+ return False
100
+
101
+ domain = domain.group(1)
102
+ robots_url = f"https://{domain}/robots.txt"
103
+
104
+ async with aiohttp.ClientSession() as session:
105
+ headers = {'User-Agent': await get_real_user_agent()}
106
+ async with session.get(robots_url, headers=headers, timeout=5) as response:
107
+ if response.status == 200:
108
+ robots = await response.text()
109
+ # Simple check - disallow all if present
110
+ if "Disallow: /" in robots:
111
+ return False
112
+ # Check for specific disallow rules for our path
113
+ path = re.sub(r'https?://[^/]+', '', url)
114
+ if f"Disallow: {path}" in robots:
115
+ return False
116
+ return True
117
+ except Exception as e:
118
+ logging.warning(f"Could not check robots.txt for {url}: {e}")
119
+ return False # Default to not scraping if we can't check
120
+
121
+ async def fetch_search_results(query: str, max_results: int = 5) -> List[dict]:
122
+ """
123
+ Perform a real search using DuckDuckGo's HTML interface.
124
+ Note: This may break if DuckDuckGo changes their HTML structure.
125
+ """
126
+ try:
127
+ search_url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
128
+ headers = {
129
+ "User-Agent": await get_real_user_agent(),
130
+ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
131
+ "Accept-Language": "en-US,en;q=0.5",
132
+ "Referer": "https://duckduckgo.com/",
133
+ "DNT": "1"
134
+ }
135
+
136
+ async with aiohttp.ClientSession() as session:
137
+ async with session.get(search_url, headers=headers, timeout=10) as response:
138
+ if response.status != 200:
139
+ logging.warning(f"Search failed with status {response.status}")
140
+ return []
141
+
142
+ html = await response.text()
143
+ soup = BeautifulSoup(html, 'html.parser')
144
+
145
+ results = []
146
+ # Updated selectors for DuckDuckGo's current HTML structure
147
+ for result in soup.select('.result')[:max_results]:
148
+ try:
149
+ title_elem = result.select_one('.result__title .result__a')
150
+ link_elem = title_elem if title_elem else result.select_one('a')
151
+ snippet_elem = result.select_one('.result__snippet')
152
+
153
+ if title_elem and link_elem and snippet_elem:
154
+ # Clean up the URL
155
+ link = link_elem['href']
156
+ if link.startswith('/l/'):
157
+ # DuckDuckGo returns relative links that redirect
158
+ # We need to follow these to get the actual URL
159
+ try:
160
+ redirect_url = f"https://duckduckgo.com{link}"
161
+ async with session.get(redirect_url, headers=headers, timeout=5, allow_redirects=False) as redirect_resp:
162
+ if redirect_resp.status == 302:
163
+ link = redirect_resp.headers.get('Location', link)
164
+ except Exception as e:
165
+ logging.warning(f"Could not follow redirect for {link}: {e}")
166
+ continue
167
+
168
+ results.append({
169
+ 'title': title_elem.get_text(strip=True),
170
+ 'link': link,
171
+ 'snippet': snippet_elem.get_text(strip=True)
172
+ })
173
+ except Exception as e:
174
+ logging.warning(f"Error parsing search result: {e}")
175
+ continue
176
+
177
+ logging.info(f"Found {len(results)} real search results for '{query}'")
178
+ return results
179
+
180
+ except Exception as e:
181
+ logging.error(f"Real search failed: {e}")
182
+ return []
183
+
184
+ async def process_web_source(session: aiohttp.ClientSession, source: dict, timeout: int = 15) -> Tuple[str, dict]:
185
  """
186
+ Process a real web source with improved content extraction and error handling.
 
 
 
187
  """
188
+ headers = {'User-Agent': await get_real_user_agent()}
189
+ source_info = source.copy()
190
+
191
+ # Check robots.txt first
192
+ if not await check_robots_txt(source['link']):
193
+ logging.info(f"Scraping disallowed by robots.txt for {source['link']}")
194
+ return source.get('snippet', ''), source_info
195
+
 
 
 
 
 
 
 
 
 
 
196
  try:
197
+ logging.info(f"Processing source: {source['link']}")
198
+ start_time = time.time()
199
+
200
+ # Skip non-HTML content
201
+ if any(source['link'].lower().endswith(ext) for ext in ['.pdf', '.doc', '.docx', '.ppt', '.pptx', '.xls', '.xlsx']):
202
+ logging.info(f"Skipping non-HTML content at {source['link']}")
203
+ return source.get('snippet', ''), source_info
204
+
205
+ # Add delay between requests to be polite
206
+ await asyncio.sleep(REQUEST_DELAY)
207
+
208
+ async with session.get(source['link'], headers=headers, timeout=timeout, ssl=False) as response:
209
+ if response.status != 200:
210
+ logging.warning(f"HTTP {response.status} for {source['link']}")
211
+ return source.get('snippet', ''), source_info
212
+
213
+ content_type = response.headers.get('Content-Type', '').lower()
214
+ if 'text/html' not in content_type:
215
+ logging.info(f"Non-HTML content at {source['link']} (type: {content_type})")
216
+ return source.get('snippet', ''), source_info
217
+
218
  html = await response.text()
219
  soup = BeautifulSoup(html, "html.parser")
220
+
221
+ # Remove unwanted elements
222
+ for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'iframe', 'noscript', 'form']):
223
+ tag.decompose()
224
+
225
+ # Try to find main content by common patterns
226
+ main_content = None
227
+ selectors_to_try = [
228
+ 'main',
229
+ 'article',
230
+ '[role="main"]',
231
+ '.main-content',
232
+ '.content',
233
+ '.article-body',
234
+ '.post-content',
235
+ '.entry-content',
236
+ '#content'
237
+ ]
238
+
239
+ for selector in selectors_to_try:
240
+ main_content = soup.select_one(selector)
241
+ if main_content:
242
+ break
243
+
244
+ if not main_content:
245
+ # If no main content found, try to find the largest text block
246
+ all_elements = soup.find_all()
247
+ # Filter out elements that are likely not main content
248
+ candidates = [el for el in all_elements if el.name not in ['script', 'style', 'nav', 'footer', 'header']]
249
+ if candidates:
250
+ # Sort by text length
251
+ candidates.sort(key=lambda x: len(x.get_text()), reverse=True)
252
+ main_content = candidates[0] if candidates else soup
253
+
254
+ if not main_content:
255
+ main_content = soup.find('body') or soup
256
+
257
+ # Clean up the content
258
+ content = " ".join(main_content.stripped_strings)
259
+ content = re.sub(r'\s+', ' ', content).strip()
260
+
261
+ # If content is too short, try alternative extraction methods
262
+ if len(content.split()) < 50 and len(html) > 10000:
263
+ # Try extracting all paragraphs
264
+ paras = soup.find_all('p')
265
+ content = " ".join([p.get_text() for p in paras if p.get_text().strip()])
266
+ content = re.sub(r'\s+', ' ', content).strip()
267
+
268
+ # If still too short, try getting all text nodes
269
+ if len(content.split()) < 50:
270
+ content = " ".join(soup.stripped_strings)
271
+ content = re.sub(r'\s+', ' ', content).strip()
272
+
273
+ if len(content.split()) < 30: # Minimum threshold for useful content
274
+ logging.warning(f"Very little content extracted from {source['link']}")
275
+ return source.get('snippet', ''), source_info
276
+
277
+ source_info['word_count'] = len(content.split())
278
+ source_info['processing_time'] = time.time() - start_time
279
+ return content, source_info
280
+
281
+ except asyncio.TimeoutError:
282
+ logging.warning(f"Timeout while processing {source['link']}")
283
+ return source.get('snippet', ''), source_info
284
  except Exception as e:
285
+ logging.warning(f"Error processing {source['link']}: {str(e)[:200]}")
286
+ return source.get('snippet', ''), source_info
287
+
288
+ async def generate_research_plan(query: str, session: aiohttp.ClientSession) -> List[str]:
289
+ """Generate a comprehensive research plan with sub-questions."""
290
+ try:
291
+ plan_prompt = {
292
+ "model": LLM_MODEL,
293
+ "messages": [{
294
+ "role": "user",
295
+ "content": f"""Generate 4-5 focused sub-questions for in-depth research on '{query}'.
296
+ The questions should cover different aspects and perspectives of the topic.
297
+ Ensure the questions are specific enough to guide web searches effectively.
298
+ Your response MUST be ONLY the raw JSON array with no additional text.
299
+ Example: ["What is the historical background of X?", "What are the current trends in X?"]"""
300
+ }],
301
+ "temperature": 0.7,
302
+ "max_tokens": 300
303
+ }
304
+
305
+ async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=30) as response:
306
+ response.raise_for_status()
307
+ result = await response.json()
308
+
309
+ if isinstance(result, list):
310
+ return result
311
+ elif isinstance(result, dict) and 'choices' in result:
312
+ content = result['choices'][0]['message']['content']
313
+ sub_questions = extract_json_from_llm_response(content)
314
+ if sub_questions and isinstance(sub_questions, list):
315
+ # Clean up the questions
316
+ cleaned = []
317
+ for q in sub_questions:
318
+ if isinstance(q, str) and q.strip():
319
+ cleaned_q = re.sub(r'^[^a-zA-Z0-9]*|[^a-zA-Z0-9]*$', '', q)
320
+ if cleaned_q:
321
+ cleaned.append(cleaned_q)
322
+ return cleaned[:5] # Limit to 5 questions max
323
+
324
+ # Fallback if we couldn't get good questions from LLM
325
+ default_questions = [
326
+ f"What is {query} and its key characteristics?",
327
+ f"What are the main aspects or components of {query}?",
328
+ f"What is the history and development of {query}?",
329
+ f"What are the current trends or recent developments in {query}?",
330
+ f"What are common challenges or controversies related to {query}?"
331
+ ]
332
+ return default_questions[:4]
333
+
334
+ except Exception as e:
335
+ logging.error(f"Failed to generate research plan: {e}")
336
+ return [
337
+ f"What is {query}?",
338
+ f"What are the key features of {query}?",
339
+ f"What is the history of {query}?",
340
+ f"What are current developments in {query}?"
341
+ ]
342
 
343
  async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
344
+ def format_sse(data: dict) -> str:
345
+ return f"data: {json.dumps(data)}\n\n"
346
+
347
+ start_time = time.time()
348
+ processed_sources = 0
349
+ successful_sources = 0
350
+ total_tokens = 0
351
+
352
  try:
353
+ # Initialize the SSE stream with start message
354
+ yield format_sse({
355
+ "event": "status",
356
+ "data": f"Starting deep research on '{query}'. Target completion time: 2-3 minutes."
357
+ })
 
 
 
 
 
358
 
359
+ async with aiohttp.ClientSession() as session:
360
+ # Step 1: Generate research plan
361
+ yield format_sse({"event": "status", "data": "Generating comprehensive research plan..."})
362
+ sub_questions = await generate_research_plan(query, session)
363
  yield format_sse({"event": "plan", "data": sub_questions})
364
 
365
+ # Step 2: Search for sources for each sub-question
366
+ yield format_sse({
367
+ "event": "status",
368
+ "data": f"Searching for sources across {len(sub_questions)} research topics..."
369
+ })
370
 
371
+ all_search_results = []
372
+ for sub_question in sub_questions:
373
+ try:
374
+ # Add delay between searches to be polite
375
+ if len(all_search_results) > 0:
376
+ await asyncio.sleep(REQUEST_DELAY)
377
+
378
+ results = await fetch_search_results(sub_question, max_results=3)
379
+ if results:
380
+ all_search_results.extend(results)
381
+ yield format_sse({
382
+ "event": "status",
383
+ "data": f"Found {len(results)} sources for question: '{sub_question[:60]}...'"
384
+ })
385
+ else:
386
+ yield format_sse({
387
+ "event": "warning",
388
+ "data": f"No search results found for: '{sub_question[:60]}...'"
389
+ })
390
+ except Exception as e:
391
+ logging.error(f"Search failed for '{sub_question}': {e}")
392
+ yield format_sse({
393
+ "event": "warning",
394
+ "data": f"Search failed for one sub-topic: {str(e)[:100]}"
395
+ })
396
+
397
+ if not all_search_results:
398
+ yield format_sse({
399
+ "event": "error",
400
+ "data": "No search results found. Check your query and try again."
401
+ })
402
+ return
403
+
404
+ # Deduplicate results by URL
405
+ unique_sources = []
406
+ seen_urls = set()
407
+ for result in all_search_results:
408
+ if result['link'] not in seen_urls:
409
+ seen_urls.add(result['link'])
410
+ unique_sources.append(result)
411
+
412
+ # Limit to max sources we want to process
413
+ unique_sources = unique_sources[:MAX_SOURCES_TO_PROCESS]
414
+ yield format_sse({
415
+ "event": "status",
416
+ "data": f"Found {len(unique_sources)} unique sources to process."
417
+ })
418
+
419
+ # If we have no sources, return early
420
  if not unique_sources:
421
+ yield format_sse({
422
+ "event": "error",
423
+ "data": "No valid sources found after deduplication."
424
+ })
425
+ return
426
+
427
+ # Step 3: Process sources with concurrency control
428
+ semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
429
+ consolidated_context = ""
430
+ all_sources_used = []
431
+ processing_errors = 0
432
+
433
+ async def process_with_semaphore(source):
434
+ async with semaphore:
435
+ return await process_web_source(session, source, timeout=20)
436
+
437
+ # Process sources with progress updates
438
+ processing_tasks = []
439
+ for i, source in enumerate(unique_sources):
440
+ # Check if we're running out of time
441
+ elapsed = time.time() - start_time
442
+ if elapsed > RESEARCH_TIMEOUT * 0.7: # Leave 30% of time for synthesis
443
+ yield format_sse({
444
+ "event": "status",
445
+ "data": f"Approaching time limit, stopping source processing at {i}/{len(unique_sources)}"
446
+ })
447
+ break
448
+
449
+ # Add delay between processing each source to be polite
450
+ if i > 0:
451
+ await asyncio.sleep(REQUEST_DELAY * 0.5) # Shorter delay between same-domain requests
452
+
453
+ task = asyncio.create_task(process_with_semaphore(source))
454
+ processing_tasks.append(task)
455
 
456
+ # Yield progress updates periodically
457
+ if (i + 1) % 2 == 0 or (i + 1) == len(unique_sources):
458
+ yield format_sse({
459
+ "event": "status",
460
+ "data": f"Processed {min(i+1, len(unique_sources))}/{len(unique_sources)} sources..."
461
+ })
462
 
463
+ # Process completed tasks as they finish
464
+ for future in asyncio.as_completed(processing_tasks):
465
+ processed_sources += 1
466
+ content, source_info = await future
 
467
  if content and content.strip():
468
+ # Add source content to our consolidated context
469
  consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
470
  all_sources_used.append(source_info)
471
+ successful_sources += 1
472
+ total_tokens += len(content.split()) # Rough token count
473
+ else:
474
+ processing_errors += 1
475
 
476
  if not consolidated_context.strip():
477
+ yield format_sse({
478
+ "event": "error",
479
+ "data": f"Failed to extract content from any sources. {processing_errors} errors occurred."
480
+ })
481
+ return
482
 
483
+ # Step 4: Synthesize report with improved prompt
484
+ time_remaining = max(0, RESEARCH_TIMEOUT - (time.time() - start_time))
485
+ yield format_sse({
486
+ "event": "status",
487
+ "data": f"Synthesizing report with content from {successful_sources} sources..."
488
+ })
489
 
490
+ # Estimate how many tokens we can generate based on remaining time
491
+ max_output_tokens = min(1500, int(time_remaining * 5))
492
+
493
+ report_prompt = f"""Compose a comprehensive research report on "{query}".
494
+ Structure the report with clear sections based on the research questions.
495
+ Use markdown formatting for headings, lists, and emphasis.
496
+
497
+ Key requirements:
498
+ 1. Start with an introduction that explains what {query} is and why it's important
499
+ 2. Include well-organized sections with clear headings based on the research questions
500
+ 3. Cite specific information from sources where appropriate
501
+ 4. End with a conclusion that summarizes key findings and insights
502
+ 5. Keep the report concise but comprehensive
503
+
504
+ Available information (summarized from {successful_sources} sources):
505
+ {consolidated_context[:18000]} # Increased context size but still limited
506
+
507
+ Generate a report that is approximately {max_output_tokens//4} words long (about {max_output_tokens//4//200} paragraphs).
508
+ Focus on the most important and relevant information.
509
+ """
510
+
511
+ report_payload = {
512
+ "model": LLM_MODEL,
513
+ "messages": [{"role": "user", "content": report_prompt}],
514
+ "stream": True,
515
+ "max_tokens": max_output_tokens
516
+ }
517
+
518
+ # Stream the report generation
519
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
520
  response.raise_for_status()
521
  async for line in response.content:
522
+ # Check if we're running out of time
523
+ if time.time() - start_time > RESEARCH_TIMEOUT:
524
+ yield format_sse({
525
+ "event": "warning",
526
+ "data": "Time limit reached, ending report generation early."
527
+ })
528
+ break
529
+
530
  line_str = line.decode('utf-8').strip()
531
+ if line_str.startswith('data:'):
532
+ line_str = line_str[5:].strip()
533
+ if line_str == "[DONE]":
534
+ break
535
  try:
536
  chunk = json.loads(line_str)
537
  choices = chunk.get("choices")
538
  if choices and isinstance(choices, list) and len(choices) > 0:
539
  content = choices[0].get("delta", {}).get("content")
540
+ if content:
541
+ yield format_sse({"event": "chunk", "data": content})
542
+ except json.JSONDecodeError:
543
+ continue
544
+ except Exception as e:
545
+ logging.warning(f"Error processing stream chunk: {e}")
546
+ continue
547
 
548
+ # Final status update
549
+ duration = time.time() - start_time
550
+ stats = {
551
+ "total_time_seconds": round(duration),
552
+ "sources_processed": processed_sources,
553
+ "sources_successful": successful_sources,
554
+ "estimated_tokens": total_tokens,
555
+ "sources_used": len(all_sources_used)
556
+ }
557
+ yield format_sse({
558
+ "event": "status",
559
+ "data": f"Research completed successfully in {duration:.1f} seconds."
560
+ })
561
+ yield format_sse({"event": "stats", "data": stats})
562
  yield format_sse({"event": "sources", "data": all_sources_used})
563
+
564
+ except asyncio.TimeoutError:
565
+ yield format_sse({
566
+ "event": "error",
567
+ "data": f"Research process timed out after {RESEARCH_TIMEOUT} seconds."
568
+ })
569
  except Exception as e:
570
+ logging.error(f"Critical error in research process: {e}", exc_info=True)
571
+ yield format_sse({
572
+ "event": "error",
573
+ "data": f"An unexpected error occurred: {str(e)[:200]}"
574
+ })
575
+ finally:
576
+ duration = time.time() - start_time
577
+ yield format_sse({
578
+ "event": "complete",
579
+ "data": f"Research process finished after {duration:.1f} seconds."
580
+ })
581
 
582
  @app.post("/deep-research", response_class=StreamingResponse)
583
  async def deep_research_endpoint(request: DeepResearchRequest):
584
+ """Endpoint for deep research that streams SSE responses."""
585
+ if not request.query or len(request.query.strip()) < 3:
586
+ raise HTTPException(status_code=400, detail="Query must be at least 3 characters long")
587
+
588
+ return StreamingResponse(
589
+ run_deep_research_stream(request.query.strip()),
590
+ media_type="text/event-stream"
591
+ )
592
 
593
  if __name__ == "__main__":
594
  import uvicorn
595
+ uvicorn.run(app, host="0.0.0.0", port=8000)