rkihacker commited on
Commit
4d61bdb
·
verified ·
1 Parent(s): b7afcad

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +301 -128
main.py CHANGED
@@ -6,7 +6,7 @@ import random
6
  import re
7
  import time
8
  from typing import AsyncGenerator, Optional, Tuple, List, Dict
9
- from urllib.parse import quote_plus
10
  from fastapi import FastAPI, HTTPException
11
  from fastapi.responses import StreamingResponse
12
  from fastapi.middleware.cors import CORSMiddleware
@@ -15,6 +15,7 @@ from dotenv import load_dotenv
15
  import aiohttp
16
  from bs4 import BeautifulSoup
17
  from fake_useragent import UserAgent
 
18
 
19
  # --- Configuration ---
20
  logging.basicConfig(
@@ -33,16 +34,17 @@ else:
33
  # --- Constants & Headers ---
34
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
35
  LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
36
- MAX_SOURCES_TO_PROCESS = 6
37
- MAX_CONCURRENT_REQUESTS = 3
38
- RESEARCH_TIMEOUT = 180 # 3 minutes maximum
39
- REQUEST_DELAY = 2.0
 
 
40
 
41
  # Initialize fake user agent generator
42
  try:
43
  ua = UserAgent()
44
  except:
45
- # Fallback if fake_useragent isn't available
46
  class SimpleUA:
47
  def random(self):
48
  return random.choice([
@@ -60,11 +62,12 @@ LLM_HEADERS = {
60
 
61
  class DeepResearchRequest(BaseModel):
62
  query: str
 
63
 
64
  app = FastAPI(
65
  title="AI Deep Research API",
66
- description="Provides robust, long-form, streaming deep research completions using real web searches.",
67
- version="2.1.0"
68
  )
69
  app.add_middleware(
70
  CORSMiddleware,
@@ -88,11 +91,46 @@ async def get_real_user_agent() -> str:
88
  """Get a realistic user agent string."""
89
  try:
90
  if isinstance(ua, UserAgent):
91
- return ua.random
92
  return ua.random() # For our fallback class
93
  except:
94
  return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
  async def check_robots_txt(url: str) -> bool:
97
  """Check if scraping is allowed by robots.txt."""
98
  try:
@@ -121,7 +159,7 @@ async def check_robots_txt(url: str) -> bool:
121
 
122
  async def fetch_search_results(query: str, max_results: int = 5) -> List[dict]:
123
  """
124
- Perform a real search using DuckDuckGo's HTML interface.
125
  """
126
  try:
127
  search_url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
@@ -143,37 +181,41 @@ async def fetch_search_results(query: str, max_results: int = 5) -> List[dict]:
143
  soup = BeautifulSoup(html, 'html.parser')
144
 
145
  results = []
146
- # Updated selectors for DuckDuckGo's current HTML structure
147
- for result in soup.select('.result__body')[:max_results]:
148
- try:
149
- title_elem = result.select_one('.result__title .result__a')
150
- link_elem = title_elem if title_elem else result.select_one('a')
151
- snippet_elem = result.select_one('.result__snippet')
152
-
153
- if title_elem and link_elem and snippet_elem:
154
- # Handle DuckDuckGo's redirect URLs
155
- link = link_elem['href']
156
- if link.startswith('/l/'):
157
- redirect_url = f"https://duckduckgo.com{link}"
158
- try:
159
- async with session.get(redirect_url, headers=headers, timeout=5, allow_redirects=False) as redirect_resp:
160
- if redirect_resp.status == 302:
161
- link = redirect_resp.headers.get('Location', link)
162
- except Exception as e:
163
- logging.warning(f"Could not follow redirect for {link}: {e}")
164
- continue
 
 
 
 
165
 
166
  results.append({
167
  'title': title_elem.get_text(strip=True),
168
- 'link': link,
169
- 'snippet': snippet_elem.get_text(strip=True)
170
  })
171
- except Exception as e:
172
- logging.warning(f"Error parsing search result: {e}")
173
- continue
174
 
175
  logging.info(f"Found {len(results)} real search results for '{query}'")
176
- return results
177
  except Exception as e:
178
  logging.error(f"Real search failed: {e}")
179
  return []
@@ -184,32 +226,37 @@ async def process_web_source(session: aiohttp.ClientSession, source: dict, timeo
184
  """
185
  headers = {'User-Agent': await get_real_user_agent()}
186
  source_info = source.copy()
 
 
 
 
 
187
 
188
  # Check robots.txt first
189
- if not await check_robots_txt(source['link']):
190
- logging.info(f"Scraping disallowed by robots.txt for {source['link']}")
191
  return source.get('snippet', ''), source_info
192
 
193
  try:
194
- logging.info(f"Processing source: {source['link']}")
195
  start_time = time.time()
196
 
197
  # Skip non-HTML content
198
- if any(source['link'].lower().endswith(ext) for ext in ['.pdf', '.doc', '.docx', '.ppt', '.pptx', '.xls', '.xlsx']):
199
- logging.info(f"Skipping non-HTML content at {source['link']}")
200
  return source.get('snippet', ''), source_info
201
 
202
  # Add delay between requests to be polite
203
  await asyncio.sleep(REQUEST_DELAY)
204
 
205
- async with session.get(source['link'], headers=headers, timeout=timeout, ssl=False) as response:
206
  if response.status != 200:
207
- logging.warning(f"HTTP {response.status} for {source['link']}")
208
  return source.get('snippet', ''), source_info
209
 
210
  content_type = response.headers.get('Content-Type', '').lower()
211
  if 'text/html' not in content_type:
212
- logging.info(f"Non-HTML content at {source['link']} (type: {content_type})")
213
  return source.get('snippet', ''), source_info
214
 
215
  html = await response.text()
@@ -229,7 +276,10 @@ async def process_web_source(session: aiohttp.ClientSession, source: dict, timeo
229
  '.article-body',
230
  '.post-content',
231
  '.entry-content',
232
- '#content'
 
 
 
233
  ]
234
 
235
  main_content = None
@@ -265,8 +315,21 @@ async def process_web_source(session: aiohttp.ClientSession, source: dict, timeo
265
  content = " ".join(soup.stripped_strings)
266
  content = re.sub(r'\s+', ' ', content).strip()
267
 
 
268
  if len(content.split()) < 30:
269
- logging.warning(f"Very little content extracted from {source['link']}")
 
 
 
 
 
 
 
 
 
 
 
 
270
  return source.get('snippet', ''), source_info
271
 
272
  source_info['word_count'] = len(content.split())
@@ -274,10 +337,10 @@ async def process_web_source(session: aiohttp.ClientSession, source: dict, timeo
274
  return content, source_info
275
 
276
  except asyncio.TimeoutError:
277
- logging.warning(f"Timeout while processing {source['link']}")
278
  return source.get('snippet', ''), source_info
279
  except Exception as e:
280
- logging.warning(f"Error processing {source['link']}: {str(e)[:200]}")
281
  return source.get('snippet', ''), source_info
282
 
283
  async def generate_research_plan(query: str, session: aiohttp.ClientSession) -> List[str]:
@@ -287,9 +350,8 @@ async def generate_research_plan(query: str, session: aiohttp.ClientSession) ->
287
  "model": LLM_MODEL,
288
  "messages": [{
289
  "role": "user",
290
- "content": f"""Generate 4-5 focused sub-questions for in-depth research on '{query}'.
291
- The questions should cover different aspects and perspectives of the topic.
292
- Ensure the questions are specific enough to guide web searches effectively.
293
  Your response MUST be ONLY the raw JSON array with no additional text.
294
  Example: ["What is the historical background of X?", "What are the current trends in X?"]"""
295
  }],
@@ -313,26 +375,154 @@ async def generate_research_plan(query: str, session: aiohttp.ClientSession) ->
313
  cleaned_q = re.sub(r'^[^a-zA-Z0-9]*|[^a-zA-Z0-9]*$', '', q)
314
  if cleaned_q:
315
  cleaned.append(cleaned_q)
316
- return cleaned[:5]
317
 
318
  # Fallback if we couldn't get good questions from LLM
319
  return [
320
- f"What is {query} and its key characteristics?",
321
- f"What are the main aspects or components of {query}?",
322
- f"What is the history and development of {query}?",
323
- f"What are the current trends or recent developments in {query}?",
324
- f"What are common challenges or controversies related to {query}?"
325
  ]
326
  except Exception as e:
327
  logging.error(f"Failed to generate research plan: {e}")
328
  return [
329
  f"What is {query}?",
330
- f"What are the key features of {query}?",
331
- f"What is the history of {query}?",
332
- f"What are current developments in {query}?"
333
  ]
334
 
335
- async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
336
  def format_sse(data: dict) -> str:
337
  return f"data: {json.dumps(data)}\n\n"
338
 
@@ -345,7 +535,7 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
345
  # Initialize the SSE stream with start message
346
  yield format_sse({
347
  "event": "status",
348
- "data": f"Starting deep research on '{query}'. Target completion time: 2-3 minutes."
349
  })
350
 
351
  async with aiohttp.ClientSession() as session:
@@ -354,68 +544,40 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
354
  sub_questions = await generate_research_plan(query, session)
355
  yield format_sse({"event": "plan", "data": sub_questions})
356
 
357
- # Step 2: Search for sources for each sub-question
358
  yield format_sse({
359
  "event": "status",
360
- "data": f"Searching for sources across {len(sub_questions)} research topics..."
361
  })
362
 
363
- all_search_results = []
364
- for sub_question in sub_questions:
365
- try:
366
- # Add delay between searches to be polite
367
- if len(all_search_results) > 0:
368
- await asyncio.sleep(REQUEST_DELAY)
369
-
370
- results = await fetch_search_results(sub_question, max_results=3)
371
- if results:
372
- all_search_results.extend(results)
373
- yield format_sse({
374
- "event": "status",
375
- "data": f"Found {len(results)} sources for question: '{sub_question[:60]}...'"
376
- })
377
- else:
378
- yield format_sse({
379
- "event": "warning",
380
- "data": f"No search results found for: '{sub_question[:60]}...'"
381
- })
382
- except Exception as e:
383
- logging.error(f"Search failed for '{sub_question}': {e}")
384
- yield format_sse({
385
- "event": "warning",
386
- "data": f"Search failed for one sub-topic: {str(e)[:100]}"
387
- })
388
 
389
- if not all_search_results:
390
  yield format_sse({
391
  "event": "error",
392
  "data": "No search results found. Check your query and try again."
393
  })
394
  return
395
 
396
- # Deduplicate results by URL
397
- unique_sources = []
398
- seen_urls = set()
399
- for result in all_search_results:
400
- if result['link'] not in seen_urls:
401
- seen_urls.add(result['link'])
402
- unique_sources.append(result)
403
-
404
- # Limit to max sources we want to process
405
- unique_sources = unique_sources[:MAX_SOURCES_TO_PROCESS]
406
  yield format_sse({
407
  "event": "status",
408
- "data": f"Found {len(unique_sources)} unique sources to process."
409
  })
410
 
411
- if not unique_sources:
412
  yield format_sse({
413
  "event": "error",
414
- "data": "No valid sources found after deduplication."
415
  })
416
  return
417
 
418
- # Step 3: Process sources with concurrency control
419
  semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
420
  consolidated_context = ""
421
  all_sources_used = []
@@ -427,13 +589,13 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
427
 
428
  # Process sources with progress updates
429
  processing_tasks = []
430
- for i, source in enumerate(unique_sources):
431
  # Check if we're running out of time
432
  elapsed = time.time() - start_time
433
- if elapsed > RESEARCH_TIMEOUT * 0.7:
434
  yield format_sse({
435
  "event": "status",
436
- "data": f"Approaching time limit, stopping source processing at {i}/{len(unique_sources)}"
437
  })
438
  break
439
 
@@ -444,10 +606,10 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
444
  task = asyncio.create_task(process_with_semaphore(source))
445
  processing_tasks.append(task)
446
 
447
- if (i + 1) % 2 == 0 or (i + 1) == len(unique_sources):
448
  yield format_sse({
449
  "event": "status",
450
- "data": f"Processed {min(i+1, len(unique_sources))}/{len(unique_sources)} sources..."
451
  })
452
 
453
  # Process completed tasks as they finish
@@ -458,7 +620,7 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
458
  consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
459
  all_sources_used.append(source_info)
460
  successful_sources += 1
461
- total_tokens += len(content.split())
462
  else:
463
  processing_errors += 1
464
 
@@ -469,30 +631,38 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
469
  })
470
  return
471
 
472
- # Step 4: Synthesize report
473
- time_remaining = max(0, RESEARCH_TIMEOUT - (time.time() - start_time))
474
  yield format_sse({
475
  "event": "status",
476
- "data": f"Synthesizing report with content from {successful_sources} sources..."
477
  })
478
 
479
- max_output_tokens = min(1500, int(time_remaining * 5))
 
 
 
 
 
 
 
 
 
 
 
480
 
481
- report_prompt = f"""Compose a comprehensive research report on "{query}".
482
- Structure the report with clear sections based on the research questions.
483
- Use markdown formatting for headings, lists, and emphasis.
484
 
485
- Key requirements:
486
- 1. Start with an introduction that explains what {query} is and why it's important
487
- 2. Include well-organized sections with clear headings
488
- 3. Cite specific information from sources where appropriate
489
- 4. End with a conclusion that summarizes key findings and insights
490
- 5. Keep the report concise but comprehensive
491
 
492
- Available information (summarized from {successful_sources} sources):
493
- {consolidated_context[:18000]}
494
 
495
- Generate a report that is approximately {max_output_tokens//4} words long.
 
496
  """
497
 
498
  report_payload = {
@@ -502,10 +672,11 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
502
  "max_tokens": max_output_tokens
503
  }
504
 
 
505
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
506
  response.raise_for_status()
507
  async for line in response.content:
508
- if time.time() - start_time > RESEARCH_TIMEOUT:
509
  yield format_sse({
510
  "event": "warning",
511
  "data": "Time limit reached, ending report generation early."
@@ -528,6 +699,7 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
528
  logging.warning(f"Error processing stream chunk: {e}")
529
  continue
530
 
 
531
  duration = time.time() - start_time
532
  stats = {
533
  "total_time_seconds": round(duration),
@@ -546,7 +718,7 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
546
  except asyncio.TimeoutError:
547
  yield format_sse({
548
  "event": "error",
549
- "data": f"Research process timed out after {RESEARCH_TIMEOUT} seconds."
550
  })
551
  except Exception as e:
552
  logging.error(f"Critical error in research process: {e}", exc_info=True)
@@ -567,8 +739,9 @@ async def deep_research_endpoint(request: DeepResearchRequest):
567
  if not request.query or len(request.query.strip()) < 3:
568
  raise HTTPException(status_code=400, detail="Query must be at least 3 characters long")
569
 
 
570
  return StreamingResponse(
571
- run_deep_research_stream(request.query.strip()),
572
  media_type="text/event-stream"
573
  )
574
 
 
6
  import re
7
  import time
8
  from typing import AsyncGenerator, Optional, Tuple, List, Dict
9
+ from urllib.parse import quote_plus, urlparse
10
  from fastapi import FastAPI, HTTPException
11
  from fastapi.responses import StreamingResponse
12
  from fastapi.middleware.cors import CORSMiddleware
 
15
  import aiohttp
16
  from bs4 import BeautifulSoup
17
  from fake_useragent import UserAgent
18
+ from collections import defaultdict
19
 
20
  # --- Configuration ---
21
  logging.basicConfig(
 
34
  # --- Constants & Headers ---
35
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
36
  LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
37
+ MAX_SOURCES_TO_PROCESS = 10 # Increased to get more comprehensive results
38
+ MAX_CONCURRENT_REQUESTS = 5 # Increased for faster processing
39
+ SEARCH_TIMEOUT = 120 # 2 minutes for searching (adjustable)
40
+ TOTAL_TIMEOUT = 180 # 3 minutes total
41
+ REQUEST_DELAY = 1.0 # Shorter delay between requests
42
+ USER_AGENT_ROTATION = True
43
 
44
  # Initialize fake user agent generator
45
  try:
46
  ua = UserAgent()
47
  except:
 
48
  class SimpleUA:
49
  def random(self):
50
  return random.choice([
 
62
 
63
  class DeepResearchRequest(BaseModel):
64
  query: str
65
+ search_time: int = 120 # Default to 2 minutes
66
 
67
  app = FastAPI(
68
  title="AI Deep Research API",
69
+ description="Provides comprehensive research reports from real web searches within 1-2 minutes.",
70
+ version="3.0.0"
71
  )
72
  app.add_middleware(
73
  CORSMiddleware,
 
91
  """Get a realistic user agent string."""
92
  try:
93
  if isinstance(ua, UserAgent):
94
+ return ua.random()
95
  return ua.random() # For our fallback class
96
  except:
97
  return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
98
 
99
+ def clean_url(url: str) -> str:
100
+ """Clean up and normalize URLs."""
101
+ if not url:
102
+ return ""
103
+
104
+ # Handle DuckDuckGo redirect URLs
105
+ if url.startswith('//duckduckgo.com/l/'):
106
+ url = f"https:{url}" # Make it a proper URL
107
+ try:
108
+ # Extract the real URL from DuckDuckGo's redirect
109
+ parsed = urlparse(url)
110
+ query_params = parsed.query
111
+ if 'uddg=' in query_params:
112
+ # Extract the actual URL from the parameter
113
+ match = re.search(r'uddg=([^&]+)', query_params)
114
+ if match:
115
+ encoded_url = match.group(1)
116
+ try:
117
+ url = quote_plus(encoded_url) # This might need better decoding
118
+ # For simplicity, we'll just return the decoded URL
119
+ # In production, you'd want to properly URL-decode this
120
+ return encoded_url
121
+ except:
122
+ pass
123
+ except:
124
+ pass
125
+
126
+ # Ensure URL has proper scheme
127
+ if url.startswith('//'):
128
+ url = 'https:' + url
129
+ elif not url.startswith(('http://', 'https://')):
130
+ url = 'https://' + url
131
+
132
+ return url
133
+
134
  async def check_robots_txt(url: str) -> bool:
135
  """Check if scraping is allowed by robots.txt."""
136
  try:
 
159
 
160
  async def fetch_search_results(query: str, max_results: int = 5) -> List[dict]:
161
  """
162
+ Perform a real search using DuckDuckGo's HTML interface with improved URL handling.
163
  """
164
  try:
165
  search_url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
 
181
  soup = BeautifulSoup(html, 'html.parser')
182
 
183
  results = []
184
+ # Try multiple selectors as DuckDuckGo may change their HTML structure
185
+ for selector in ['.result__body', '.result__a', '.result']:
186
+ if len(results) >= max_results:
187
+ break
188
+
189
+ for result in soup.select(selector)[:max_results]:
190
+ try:
191
+ title_elem = result.select_one('.result__title .result__a') or result.select_one('.result__a')
192
+ if not title_elem:
193
+ continue
194
+
195
+ link = title_elem['href']
196
+ snippet_elem = result.select_one('.result__snippet')
197
+
198
+ # Clean the URL
199
+ clean_link = clean_url(link)
200
+
201
+ # Skip if we couldn't get a clean URL
202
+ if not clean_link or clean_link.startswith('javascript:'):
203
+ continue
204
+
205
+ # Get snippet if available
206
+ snippet = snippet_elem.get_text(strip=True) if snippet_elem else ""
207
 
208
  results.append({
209
  'title': title_elem.get_text(strip=True),
210
+ 'link': clean_link,
211
+ 'snippet': snippet
212
  })
213
+ except Exception as e:
214
+ logging.warning(f"Error parsing search result: {e}")
215
+ continue
216
 
217
  logging.info(f"Found {len(results)} real search results for '{query}'")
218
+ return results[:max_results]
219
  except Exception as e:
220
  logging.error(f"Real search failed: {e}")
221
  return []
 
226
  """
227
  headers = {'User-Agent': await get_real_user_agent()}
228
  source_info = source.copy()
229
+ source_info['link'] = clean_url(source['link']) # Ensure URL is clean
230
+
231
+ # Skip if URL is invalid
232
+ if not source_info['link'] or not source_info['link'].startswith(('http://', 'https://')):
233
+ return source.get('snippet', ''), source_info
234
 
235
  # Check robots.txt first
236
+ if not await check_robots_txt(source_info['link']):
237
+ logging.info(f"Scraping disallowed by robots.txt for {source_info['link']}")
238
  return source.get('snippet', ''), source_info
239
 
240
  try:
241
+ logging.info(f"Processing source: {source_info['link']}")
242
  start_time = time.time()
243
 
244
  # Skip non-HTML content
245
+ if any(source_info['link'].lower().endswith(ext) for ext in ['.pdf', '.doc', '.docx', '.ppt', '.pptx', '.xls', '.xlsx']):
246
+ logging.info(f"Skipping non-HTML content at {source_info['link']}")
247
  return source.get('snippet', ''), source_info
248
 
249
  # Add delay between requests to be polite
250
  await asyncio.sleep(REQUEST_DELAY)
251
 
252
+ async with session.get(source_info['link'], headers=headers, timeout=timeout, ssl=False) as response:
253
  if response.status != 200:
254
+ logging.warning(f"HTTP {response.status} for {source_info['link']}")
255
  return source.get('snippet', ''), source_info
256
 
257
  content_type = response.headers.get('Content-Type', '').lower()
258
  if 'text/html' not in content_type:
259
+ logging.info(f"Non-HTML content at {source_info['link']} (type: {content_type})")
260
  return source.get('snippet', ''), source_info
261
 
262
  html = await response.text()
 
276
  '.article-body',
277
  '.post-content',
278
  '.entry-content',
279
+ '#content',
280
+ '#main',
281
+ '.main',
282
+ '.article'
283
  ]
284
 
285
  main_content = None
 
315
  content = " ".join(soup.stripped_strings)
316
  content = re.sub(r'\s+', ' ', content).strip()
317
 
318
+ # If content is still too short, try to extract from specific tags
319
  if len(content.split()) < 30:
320
+ # Try to get content from divs with certain classes
321
+ for tag in ['div', 'section', 'article']:
322
+ for element in soup.find_all(tag):
323
+ if len(element.get_text().split()) > 200: # If this element has substantial content
324
+ content = " ".join(element.stripped_strings)
325
+ content = re.sub(r'\s+', ' ', content).strip()
326
+ if len(content.split()) >= 30: # If we got enough content
327
+ break
328
+ if len(content.split()) >= 30:
329
+ break
330
+
331
+ if len(content.split()) < 30:
332
+ logging.warning(f"Very little content extracted from {source_info['link']}")
333
  return source.get('snippet', ''), source_info
334
 
335
  source_info['word_count'] = len(content.split())
 
337
  return content, source_info
338
 
339
  except asyncio.TimeoutError:
340
+ logging.warning(f"Timeout while processing {source_info['link']}")
341
  return source.get('snippet', ''), source_info
342
  except Exception as e:
343
+ logging.warning(f"Error processing {source_info['link']}: {str(e)[:200]}")
344
  return source.get('snippet', ''), source_info
345
 
346
  async def generate_research_plan(query: str, session: aiohttp.ClientSession) -> List[str]:
 
350
  "model": LLM_MODEL,
351
  "messages": [{
352
  "role": "user",
353
+ "content": f"""Generate 4-6 comprehensive sub-questions for in-depth research on '{query}'.
354
+ Focus on key aspects that would provide a complete understanding of the topic.
 
355
  Your response MUST be ONLY the raw JSON array with no additional text.
356
  Example: ["What is the historical background of X?", "What are the current trends in X?"]"""
357
  }],
 
375
  cleaned_q = re.sub(r'^[^a-zA-Z0-9]*|[^a-zA-Z0-9]*$', '', q)
376
  if cleaned_q:
377
  cleaned.append(cleaned_q)
378
+ return cleaned[:6] # Limit to 6 questions max
379
 
380
  # Fallback if we couldn't get good questions from LLM
381
  return [
382
+ f"What is {query} and its key features?",
383
+ f"How does {query} compare to alternatives?",
384
+ f"What are the current developments in {query}?",
385
+ f"What are the main challenges with {query}?",
386
+ f"What does the future hold for {query}?"
387
  ]
388
  except Exception as e:
389
  logging.error(f"Failed to generate research plan: {e}")
390
  return [
391
  f"What is {query}?",
392
+ f"What are the key aspects of {query}?",
393
+ f"What are current trends in {query}?",
394
+ f"What are the challenges with {query}?"
395
  ]
396
 
397
+ async def continuous_search(query: str, search_time: int = 120) -> List[dict]:
398
+ """
399
+ Perform continuous searching for better results within time constraints.
400
+ """
401
+ start_time = time.time()
402
+ all_results = []
403
+ seen_urls = set()
404
+
405
+ # Generate multiple variations of the query
406
+ query_variations = [
407
+ query,
408
+ f"{query} comparison",
409
+ f"{query} analysis",
410
+ f"{query} review",
411
+ f"{query} features",
412
+ f"{query} vs alternatives"
413
+ ]
414
+
415
+ async with aiohttp.ClientSession() as session:
416
+ while time.time() - start_time < search_time:
417
+ # Shuffle the query variations to get diverse results
418
+ random.shuffle(query_variations)
419
+
420
+ for q in query_variations[:3]: # Only use first 3 variations in each iteration
421
+ if time.time() - start_time >= search_time:
422
+ break
423
+
424
+ try:
425
+ results = await fetch_search_results(q, max_results=5)
426
+ for result in results:
427
+ clean_link = clean_url(result['link'])
428
+ if clean_link and clean_link not in seen_urls:
429
+ seen_urls.add(clean_link)
430
+ result['link'] = clean_link
431
+ all_results.append(result)
432
+ logging.info(f"Found new result: {result['title']}")
433
+
434
+ # Small delay between searches
435
+ await asyncio.sleep(1.0)
436
+
437
+ # If we have enough unique results, we can stop early
438
+ if len(all_results) >= MAX_SOURCES_TO_PROCESS * 1.5: # Get more than we need for selection
439
+ break
440
+ except Exception as e:
441
+ logging.error(f"Error during continuous search: {e}")
442
+ await asyncio.sleep(2.0) # Wait a bit before trying again
443
+
444
+ # Filter and sort results by relevance
445
+ if all_results:
446
+ # Simple relevance scoring (could be enhanced with more sophisticated methods)
447
+ def score_result(result):
448
+ # Score based on how many query terms appear in title/snippet
449
+ query_terms = set(query.lower().split())
450
+ title = result['title'].lower()
451
+ snippet = result['snippet'].lower()
452
+
453
+ matches = 0
454
+ for term in query_terms:
455
+ if term in title or term in snippet:
456
+ matches += 1
457
+
458
+ # Also consider length of snippet as a proxy for content richness
459
+ snippet_length = len(result['snippet'].split())
460
+
461
+ return matches * 10 + snippet_length
462
+
463
+ # Sort by score, descending
464
+ all_results.sort(key=lambda x: score_result(x), reverse=True)
465
+
466
+ return all_results[:MAX_SOURCES_TO_PROCESS * 2] # Return more than we need for selection
467
+
468
+ async def filter_and_select_sources(results: List[dict]) -> List[dict]:
469
+ """
470
+ Filter and select the best sources from search results.
471
+ """
472
+ if not results:
473
+ return []
474
+
475
+ # Group by domain to ensure diversity
476
+ domain_counts = defaultdict(int)
477
+ domain_results = defaultdict(list)
478
+ for result in results:
479
+ domain = urlparse(result['link']).netloc
480
+ domain_counts[domain] += 1
481
+ domain_results[domain].append(result)
482
+
483
+ selected = []
484
+
485
+ # First pass: take the top result from each domain
486
+ for domain, domain_res in domain_results.items():
487
+ if len(selected) >= MAX_SOURCES_TO_PROCESS:
488
+ break
489
+ # Take the best result from this domain (sorted by position in original results)
490
+ if domain_res:
491
+ selected.append(domain_res[0])
492
+
493
+ # Second pass: if we need more, take additional results from domains with good content
494
+ if len(selected) < MAX_SOURCES_TO_PROCESS:
495
+ # Calculate average snippet length as a proxy for content quality
496
+ domain_quality = {}
497
+ for domain, domain_res in domain_results.items():
498
+ avg_length = sum(len(r['snippet'].split()) for r in domain_res) / len(domain_res)
499
+ domain_quality[domain] = avg_length
500
+
501
+ # Sort domains by quality
502
+ sorted_domains = sorted(domain_quality.items(), key=lambda x: x[1], reverse=True)
503
+
504
+ # Add more results from high-quality domains
505
+ for domain, _ in sorted_domains:
506
+ if len(selected) >= MAX_SOURCES_TO_PROCESS:
507
+ break
508
+ for res in domain_results[domain]:
509
+ if res not in selected:
510
+ selected.append(res)
511
+ if len(selected) >= MAX_SOURCES_TO_PROCESS:
512
+ break
513
+
514
+ # Final pass: if still need more, add remaining high-snippet-length results
515
+ if len(selected) < MAX_SOURCES_TO_PROCESS:
516
+ all_results_sorted = sorted(results, key=lambda x: len(x['snippet'].split()), reverse=True)
517
+ for res in all_results_sorted:
518
+ if res not in selected:
519
+ selected.append(res)
520
+ if len(selected) >= MAX_SOURCES_TO_PROCESS:
521
+ break
522
+
523
+ return selected[:MAX_SOURCES_TO_PROCESS]
524
+
525
+ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncGenerator[str, None]:
526
  def format_sse(data: dict) -> str:
527
  return f"data: {json.dumps(data)}\n\n"
528
 
 
535
  # Initialize the SSE stream with start message
536
  yield format_sse({
537
  "event": "status",
538
+ "data": f"Starting deep research on '{query}'. Search time limit: {search_time} seconds."
539
  })
540
 
541
  async with aiohttp.ClientSession() as session:
 
544
  sub_questions = await generate_research_plan(query, session)
545
  yield format_sse({"event": "plan", "data": sub_questions})
546
 
547
+ # Step 2: Continuous search for better results
548
  yield format_sse({
549
  "event": "status",
550
+ "data": f"Performing continuous search for up to {search_time} seconds..."
551
  })
552
 
553
+ search_results = await continuous_search(query, search_time)
554
+ yield format_sse({
555
+ "event": "status",
556
+ "data": f"Found {len(search_results)} potential sources. Selecting the best ones..."
557
+ })
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
558
 
559
+ if not search_results:
560
  yield format_sse({
561
  "event": "error",
562
  "data": "No search results found. Check your query and try again."
563
  })
564
  return
565
 
566
+ # Select the best sources
567
+ selected_sources = await filter_and_select_sources(search_results)
 
 
 
 
 
 
 
 
568
  yield format_sse({
569
  "event": "status",
570
+ "data": f"Selected {len(selected_sources)} high-quality sources to process."
571
  })
572
 
573
+ if not selected_sources:
574
  yield format_sse({
575
  "event": "error",
576
+ "data": "No valid sources found after filtering."
577
  })
578
  return
579
 
580
+ # Step 3: Process selected sources with concurrency control
581
  semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
582
  consolidated_context = ""
583
  all_sources_used = []
 
589
 
590
  # Process sources with progress updates
591
  processing_tasks = []
592
+ for i, source in enumerate(selected_sources):
593
  # Check if we're running out of time
594
  elapsed = time.time() - start_time
595
+ if elapsed > TOTAL_TIMEOUT * 0.8: # Leave 20% of time for synthesis
596
  yield format_sse({
597
  "event": "status",
598
+ "data": f"Approaching time limit, stopping source processing at {i}/{len(selected_sources)}"
599
  })
600
  break
601
 
 
606
  task = asyncio.create_task(process_with_semaphore(source))
607
  processing_tasks.append(task)
608
 
609
+ if (i + 1) % 2 == 0 or (i + 1) == len(selected_sources):
610
  yield format_sse({
611
  "event": "status",
612
+ "data": f"Processed {min(i+1, len(selected_sources))}/{len(selected_sources)} sources..."
613
  })
614
 
615
  # Process completed tasks as they finish
 
620
  consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
621
  all_sources_used.append(source_info)
622
  successful_sources += 1
623
+ total_tokens += len(content.split()) # Rough token count
624
  else:
625
  processing_errors += 1
626
 
 
631
  })
632
  return
633
 
634
+ # Step 4: Synthesize comprehensive report
635
+ time_remaining = max(0, TOTAL_TIMEOUT - (time.time() - start_time))
636
  yield format_sse({
637
  "event": "status",
638
+ "data": f"Synthesizing comprehensive report from {successful_sources} sources..."
639
  })
640
 
641
+ max_output_tokens = min(2000, int(time_remaining * 6)) # More aggressive token count
642
+
643
+ report_prompt = f"""Compose an in-depth analysis report on "{query}".
644
+
645
+ Structure the report with these sections:
646
+ 1. Introduction and Background
647
+ 2. Key Features and Capabilities
648
+ 3. Comparative Analysis with Alternatives
649
+ 4. Current Developments and Trends
650
+ 5. Challenges and Limitations
651
+ 6. Future Outlook
652
+ 7. Conclusion and Recommendations
653
 
654
+ For each section, provide detailed analysis based on the source material.
655
+ Include specific examples and data points from the sources when available.
656
+ Compare and contrast different viewpoints from various sources.
657
 
658
+ Use markdown formatting for headings, subheadings, lists, and emphasis.
659
+ Cite sources where appropriate using inline citations like [1][2].
 
 
 
 
660
 
661
+ Available information from {successful_sources} sources:
662
+ {consolidated_context[:20000]} # Increased context size
663
 
664
+ Generate a comprehensive report of approximately {max_output_tokens//4} words.
665
+ Focus on providing deep insights, analysis, and actionable information.
666
  """
667
 
668
  report_payload = {
 
672
  "max_tokens": max_output_tokens
673
  }
674
 
675
+ # Stream the report generation
676
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
677
  response.raise_for_status()
678
  async for line in response.content:
679
+ if time.time() - start_time > TOTAL_TIMEOUT:
680
  yield format_sse({
681
  "event": "warning",
682
  "data": "Time limit reached, ending report generation early."
 
699
  logging.warning(f"Error processing stream chunk: {e}")
700
  continue
701
 
702
+ # Final status update
703
  duration = time.time() - start_time
704
  stats = {
705
  "total_time_seconds": round(duration),
 
718
  except asyncio.TimeoutError:
719
  yield format_sse({
720
  "event": "error",
721
+ "data": f"Research process timed out after {TOTAL_TIMEOUT} seconds."
722
  })
723
  except Exception as e:
724
  logging.error(f"Critical error in research process: {e}", exc_info=True)
 
739
  if not request.query or len(request.query.strip()) < 3:
740
  raise HTTPException(status_code=400, detail="Query must be at least 3 characters long")
741
 
742
+ search_time = min(max(request.search_time, 60), 180) # Clamp between 60 and 180 seconds
743
  return StreamingResponse(
744
+ run_deep_research_stream(request.query.strip(), search_time),
745
  media_type="text/event-stream"
746
  )
747