rkihacker commited on
Commit
58de22e
·
verified ·
1 Parent(s): cffab53

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +105 -150
main.py CHANGED
@@ -34,12 +34,14 @@ else:
34
  # --- Constants & Headers ---
35
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
36
  LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
37
- MAX_SOURCES_TO_PROCESS = 10 # Increased to get more comprehensive results
38
- MAX_CONCURRENT_REQUESTS = 5 # Increased for faster processing
39
- SEARCH_TIMEOUT = 120 # 2 minutes for searching (adjustable)
40
- TOTAL_TIMEOUT = 180 # 3 minutes total
41
- REQUEST_DELAY = 1.0 # Shorter delay between requests
42
  USER_AGENT_ROTATION = True
 
 
43
 
44
  # Initialize fake user agent generator
45
  try:
@@ -62,7 +64,7 @@ LLM_HEADERS = {
62
 
63
  class DeepResearchRequest(BaseModel):
64
  query: str
65
- search_time: int = 120 # Default to 2 minutes
66
 
67
  app = FastAPI(
68
  title="AI Deep Research API",
@@ -91,8 +93,8 @@ async def get_real_user_agent() -> str:
91
  """Get a realistic user agent string."""
92
  try:
93
  if isinstance(ua, UserAgent):
94
- return ua.random()
95
- return ua.random() # For our fallback class
96
  except:
97
  return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
98
 
@@ -103,21 +105,18 @@ def clean_url(url: str) -> str:
103
 
104
  # Handle DuckDuckGo redirect URLs
105
  if url.startswith('//duckduckgo.com/l/'):
106
- url = f"https:{url}" # Make it a proper URL
107
  try:
108
- # Extract the real URL from DuckDuckGo's redirect
109
  parsed = urlparse(url)
110
  query_params = parsed.query
111
  if 'uddg=' in query_params:
112
- # Extract the actual URL from the parameter
113
  match = re.search(r'uddg=([^&]+)', query_params)
114
  if match:
115
  encoded_url = match.group(1)
116
  try:
117
- url = quote_plus(encoded_url) # This might need better decoding
118
- # For simplicity, we'll just return the decoded URL
119
- # In production, you'd want to properly URL-decode this
120
- return encoded_url
121
  except:
122
  pass
123
  except:
@@ -148,7 +147,6 @@ async def check_robots_txt(url: str) -> bool:
148
  robots = await response.text()
149
  if "Disallow: /" in robots:
150
  return False
151
- # Check for specific path disallows
152
  path = re.sub(r'https?://[^/]+', '', url)
153
  if any(f"Disallow: {p}" in robots for p in [path, path.rstrip('/') + '/']):
154
  return False
@@ -159,66 +157,70 @@ async def check_robots_txt(url: str) -> bool:
159
 
160
  async def fetch_search_results(query: str, max_results: int = 5) -> List[dict]:
161
  """
162
- Perform a real search using DuckDuckGo's HTML interface with improved URL handling.
163
  """
164
- try:
165
- search_url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
166
- headers = {
167
- "User-Agent": await get_real_user_agent(),
168
- "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
169
- "Accept-Language": "en-US,en;q=0.5",
170
- "Referer": "https://duckduckgo.com/",
171
- "DNT": "1"
172
- }
173
-
174
- async with aiohttp.ClientSession() as session:
175
- async with session.get(search_url, headers=headers, timeout=10) as response:
176
- if response.status != 200:
177
- logging.warning(f"Search failed with status {response.status}")
178
- return []
179
-
180
- html = await response.text()
181
- soup = BeautifulSoup(html, 'html.parser')
182
-
183
- results = []
184
- # Try multiple selectors as DuckDuckGo may change their HTML structure
185
- for selector in ['.result__body', '.result__a', '.result']:
186
- if len(results) >= max_results:
187
- break
188
-
189
- for result in soup.select(selector)[:max_results]:
190
- try:
191
- title_elem = result.select_one('.result__title .result__a') or result.select_one('.result__a')
192
- if not title_elem:
193
  continue
194
-
195
- link = title_elem['href']
196
- snippet_elem = result.select_one('.result__snippet')
197
-
198
- # Clean the URL
199
- clean_link = clean_url(link)
200
-
201
- # Skip if we couldn't get a clean URL
202
- if not clean_link or clean_link.startswith('javascript:'):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
203
  continue
204
 
205
- # Get snippet if available
206
- snippet = snippet_elem.get_text(strip=True) if snippet_elem else ""
207
-
208
- results.append({
209
- 'title': title_elem.get_text(strip=True),
210
- 'link': clean_link,
211
- 'snippet': snippet
212
- })
213
- except Exception as e:
214
- logging.warning(f"Error parsing search result: {e}")
215
- continue
216
-
217
- logging.info(f"Found {len(results)} real search results for '{query}'")
218
- return results[:max_results]
219
- except Exception as e:
220
- logging.error(f"Real search failed: {e}")
221
- return []
222
 
223
  async def process_web_source(session: aiohttp.ClientSession, source: dict, timeout: int = 15) -> Tuple[str, dict]:
224
  """
@@ -226,13 +228,12 @@ async def process_web_source(session: aiohttp.ClientSession, source: dict, timeo
226
  """
227
  headers = {'User-Agent': await get_real_user_agent()}
228
  source_info = source.copy()
229
- source_info['link'] = clean_url(source['link']) # Ensure URL is clean
230
 
231
- # Skip if URL is invalid
232
  if not source_info['link'] or not source_info['link'].startswith(('http://', 'https://')):
 
233
  return source.get('snippet', ''), source_info
234
 
235
- # Check robots.txt first
236
  if not await check_robots_txt(source_info['link']):
237
  logging.info(f"Scraping disallowed by robots.txt for {source_info['link']}")
238
  return source.get('snippet', ''), source_info
@@ -241,12 +242,10 @@ async def process_web_source(session: aiohttp.ClientSession, source: dict, timeo
241
  logging.info(f"Processing source: {source_info['link']}")
242
  start_time = time.time()
243
 
244
- # Skip non-HTML content
245
  if any(source_info['link'].lower().endswith(ext) for ext in ['.pdf', '.doc', '.docx', '.ppt', '.pptx', '.xls', '.xlsx']):
246
  logging.info(f"Skipping non-HTML content at {source_info['link']}")
247
  return source.get('snippet', ''), source_info
248
 
249
- # Add delay between requests to be polite
250
  await asyncio.sleep(REQUEST_DELAY)
251
 
252
  async with session.get(source_info['link'], headers=headers, timeout=timeout, ssl=False) as response:
@@ -262,11 +261,9 @@ async def process_web_source(session: aiohttp.ClientSession, source: dict, timeo
262
  html = await response.text()
263
  soup = BeautifulSoup(html, "html.parser")
264
 
265
- # Remove unwanted elements
266
  for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'iframe', 'noscript', 'form']):
267
  tag.decompose()
268
 
269
- # Try to find main content by common patterns
270
  selectors_to_try = [
271
  'main',
272
  'article',
@@ -289,7 +286,6 @@ async def process_web_source(session: aiohttp.ClientSession, source: dict, timeo
289
  break
290
 
291
  if not main_content:
292
- # If no main content found, try to find the largest text block
293
  all_elements = soup.find_all()
294
  candidates = [el for el in all_elements if el.name not in ['script', 'style', 'nav', 'footer', 'header']]
295
  if candidates:
@@ -299,31 +295,25 @@ async def process_web_source(session: aiohttp.ClientSession, source: dict, timeo
299
  if not main_content:
300
  main_content = soup.find('body') or soup
301
 
302
- # Clean up the content
303
  content = " ".join(main_content.stripped_strings)
304
  content = re.sub(r'\s+', ' ', content).strip()
305
 
306
- # If content is too short, try alternative extraction methods
307
  if len(content.split()) < 50 and len(html) > 10000:
308
- # Try extracting all paragraphs
309
  paras = soup.find_all('p')
310
  content = " ".join([p.get_text() for p in paras if p.get_text().strip()])
311
  content = re.sub(r'\s+', ' ', content).strip()
312
 
313
- # If still too short, try getting all text nodes
314
  if len(content.split()) < 50:
315
  content = " ".join(soup.stripped_strings)
316
  content = re.sub(r'\s+', ' ', content).strip()
317
 
318
- # If content is still too short, try to extract from specific tags
319
  if len(content.split()) < 30:
320
- # Try to get content from divs with certain classes
321
  for tag in ['div', 'section', 'article']:
322
  for element in soup.find_all(tag):
323
- if len(element.get_text().split()) > 200: # If this element has substantial content
324
  content = " ".join(element.stripped_strings)
325
  content = re.sub(r'\s+', ' ', content).strip()
326
- if len(content.split()) >= 30: # If we got enough content
327
  break
328
  if len(content.split()) >= 30:
329
  break
@@ -375,9 +365,8 @@ async def generate_research_plan(query: str, session: aiohttp.ClientSession) ->
375
  cleaned_q = re.sub(r'^[^a-zA-Z0-9]*|[^a-zA-Z0-9]*$', '', q)
376
  if cleaned_q:
377
  cleaned.append(cleaned_q)
378
- return cleaned[:6] # Limit to 6 questions max
379
 
380
- # Fallback if we couldn't get good questions from LLM
381
  return [
382
  f"What is {query} and its key features?",
383
  f"How does {query} compare to alternatives?",
@@ -394,35 +383,34 @@ async def generate_research_plan(query: str, session: aiohttp.ClientSession) ->
394
  f"What are the challenges with {query}?"
395
  ]
396
 
397
- async def continuous_search(query: str, search_time: int = 120) -> List[dict]:
398
  """
399
- Perform continuous searching for better results within time constraints.
400
  """
401
  start_time = time.time()
402
  all_results = []
403
  seen_urls = set()
404
 
405
- # Generate multiple variations of the query
406
  query_variations = [
407
  query,
408
  f"{query} comparison",
409
- f"{query} analysis",
410
  f"{query} review",
411
- f"{query} features",
412
- f"{query} vs alternatives"
 
413
  ]
414
 
415
  async with aiohttp.ClientSession() as session:
 
416
  while time.time() - start_time < search_time:
417
- # Shuffle the query variations to get diverse results
418
  random.shuffle(query_variations)
419
-
420
- for q in query_variations[:3]: # Only use first 3 variations in each iteration
421
  if time.time() - start_time >= search_time:
422
  logger.info(f"Search timed out after {search_time} seconds. Found {len(all_results)} results.")
423
  break
424
 
425
- logger.info(f"Searching for query variation: {q}")
426
  try:
427
  results = await fetch_search_results(q, max_results=5)
428
  logger.info(f"Retrieved {len(results)} results for query '{q}'")
@@ -434,42 +422,31 @@ async def continuous_search(query: str, search_time: int = 120) -> List[dict]:
434
  all_results.append(result)
435
  logger.info(f"Added new result: {result['title']} ({result['link']})")
436
 
437
- # Small delay between searches
438
- await asyncio.sleep(1.0)
439
-
440
- # If we have enough unique results, we can stop early
441
- if len(all_results) >= MAX_SOURCES_TO_PROCESS * 1.5: # Get more than we need for selection
442
  logger.info(f"Reached sufficient results: {len(all_results)}")
443
  break
444
  except Exception as e:
445
  logger.error(f"Error during search for '{q}': {e}")
446
- await asyncio.sleep(2.0) # Wait a bit before trying again
 
 
 
447
 
448
- logger.info(f"Completed continuous search. Total results: {len(all_results)}")
449
 
450
- # Filter and sort results by relevance
451
  if all_results:
452
- # Simple relevance scoring (could be enhanced with more sophisticated methods)
453
  def score_result(result):
454
- # Score based on how many query terms appear in title/snippet
455
  query_terms = set(query.lower().split())
456
  title = result['title'].lower()
457
  snippet = result['snippet'].lower()
458
-
459
- matches = 0
460
- for term in query_terms:
461
- if term in title or term in snippet:
462
- matches += 1
463
-
464
- # Also consider length of snippet as a proxy for content richness
465
  snippet_length = len(result['snippet'].split())
466
-
467
  return matches * 10 + snippet_length
468
 
469
- # Sort by score, descending
470
- all_results.sort(key=lambda x: score_result(x), reverse=True)
471
 
472
- return all_results[:MAX_SOURCES_TO_PROCESS * 2] # Return more than we need for selection
473
 
474
  async def filter_and_select_sources(results: List[dict]) -> List[dict]:
475
  """
@@ -481,7 +458,6 @@ async def filter_and_select_sources(results: List[dict]) -> List[dict]:
481
 
482
  logger.info(f"Filtering {len(results)} search results...")
483
 
484
- # Group by domain to ensure diversity
485
  domain_counts = defaultdict(int)
486
  domain_results = defaultdict(list)
487
  for result in results:
@@ -490,28 +466,20 @@ async def filter_and_select_sources(results: List[dict]) -> List[dict]:
490
  domain_results[domain].append(result)
491
 
492
  selected = []
493
-
494
- # First pass: take the top result from each domain
495
  for domain, domain_res in domain_results.items():
496
  if len(selected) >= MAX_SOURCES_TO_PROCESS:
497
  break
498
- # Take the best result from this domain (sorted by position in original results)
499
  if domain_res:
500
  selected.append(domain_res[0])
501
  logger.info(f"Selected top result from domain {domain}: {domain_res[0]['link']}")
502
 
503
- # Second pass: if we need more, take additional results from domains with good content
504
  if len(selected) < MAX_SOURCES_TO_PROCESS:
505
- # Calculate average snippet length as a proxy for content quality
506
  domain_quality = {}
507
  for domain, domain_res in domain_results.items():
508
  avg_length = sum(len(r['snippet'].split()) for r in domain_res) / len(domain_res)
509
  domain_quality[domain] = avg_length
510
 
511
- # Sort domains by quality
512
  sorted_domains = sorted(domain_quality.items(), key=lambda x: x[1], reverse=True)
513
-
514
- # Add more results from high-quality domains
515
  for domain, _ in sorted_domains:
516
  if len(selected) >= MAX_SOURCES_TO_PROCESS:
517
  break
@@ -522,7 +490,6 @@ async def filter_and_select_sources(results: List[dict]) -> List[dict]:
522
  if len(selected) >= MAX_SOURCES_TO_PROCESS:
523
  break
524
 
525
- # Final pass: if still need more, add remaining high-snippet-length results
526
  if len(selected) < MAX_SOURCES_TO_PROCESS:
527
  all_results_sorted = sorted(results, key=lambda x: len(x['snippet'].split()), reverse=True)
528
  for res in all_results_sorted:
@@ -535,7 +502,7 @@ async def filter_and_select_sources(results: List[dict]) -> List[dict]:
535
  logger.info(f"Selected {len(selected)} sources after filtering.")
536
  return selected[:MAX_SOURCES_TO_PROCESS]
537
 
538
- async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncGenerator[str, None]:
539
  def format_sse(data: dict) -> str:
540
  return f"data: {json.dumps(data)}\n\n"
541
 
@@ -545,19 +512,16 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
545
  total_tokens = 0
546
 
547
  try:
548
- # Initialize the SSE stream with start message
549
  yield format_sse({
550
  "event": "status",
551
  "data": f"Starting deep research on '{query}'. Search time limit: {search_time} seconds."
552
  })
553
 
554
  async with aiohttp.ClientSession() as session:
555
- # Step 1: Generate research plan
556
  yield format_sse({"event": "status", "data": "Generating comprehensive research plan..."})
557
  sub_questions = await generate_research_plan(query, session)
558
  yield format_sse({"event": "plan", "data": sub_questions})
559
 
560
- # Step 2: Continuous search for better results
561
  yield format_sse({
562
  "event": "status",
563
  "data": f"Performing continuous search for up to {search_time} seconds..."
@@ -580,7 +544,6 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
580
  })
581
  return
582
 
583
- # Select the best sources
584
  selected_sources = await filter_and_select_sources(search_results)
585
  yield format_sse({
586
  "event": "status",
@@ -598,7 +561,6 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
598
  })
599
  return
600
 
601
- # Step 3: Process selected sources with concurrency control
602
  semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
603
  consolidated_context = ""
604
  all_sources_used = []
@@ -608,19 +570,16 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
608
  async with semaphore:
609
  return await process_web_source(session, source, timeout=20)
610
 
611
- # Process sources with progress updates
612
  processing_tasks = []
613
  for i, source in enumerate(selected_sources):
614
- # Check if we're running out of time
615
  elapsed = time.time() - start_time
616
- if elapsed > TOTAL_TIMEOUT * 0.8: # Leave 20% of time for synthesis
617
  yield format_sse({
618
  "event": "status",
619
  "data": f"Approaching time limit, stopping source processing at {i}/{len(selected_sources)}"
620
  })
621
  break
622
 
623
- # Add delay between processing each source to be polite
624
  if i > 0:
625
  await asyncio.sleep(REQUEST_DELAY * 0.5)
626
 
@@ -633,7 +592,6 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
633
  "data": f"Processed {min(i+1, len(selected_sources))}/{len(selected_sources)} sources..."
634
  })
635
 
636
- # Process completed tasks as they finish
637
  for future in asyncio.as_completed(processing_tasks):
638
  processed_sources += 1
639
  content, source_info = await future
@@ -641,7 +599,7 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
641
  consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
642
  all_sources_used.append(source_info)
643
  successful_sources += 1
644
- total_tokens += len(content.split()) # Rough token count
645
  yield format_sse({
646
  "event": "processed_source",
647
  "data": source_info
@@ -656,14 +614,13 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
656
  })
657
  return
658
 
659
- # Step 4: Synthesize comprehensive report
660
  time_remaining = max(0, TOTAL_TIMEOUT - (time.time() - start_time))
661
  yield format_sse({
662
  "event": "status",
663
  "data": f"Synthesizing comprehensive report from {successful_sources} sources..."
664
  })
665
 
666
- max_output_tokens = min(2000, int(time_remaining * 6)) # More aggressive token count
667
 
668
  report_prompt = f"""Compose an in-depth analysis report on "{query}".
669
 
@@ -684,7 +641,7 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
684
  Cite sources where appropriate using inline citations like [1][2].
685
 
686
  Available information from {successful_sources} sources:
687
- {consolidated_context[:20000]} # Increased context size
688
 
689
  Generate a comprehensive report of approximately {max_output_tokens//4} words.
690
  Focus on providing deep insights, analysis, and actionable information.
@@ -697,7 +654,6 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
697
  "max_tokens": max_output_tokens
698
  }
699
 
700
- # Stream the report generation
701
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
702
  response.raise_for_status()
703
  async for line in response.content:
@@ -724,7 +680,6 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
724
  logging.warning(f"Error processing stream chunk: {e}")
725
  continue
726
 
727
- # Final status update
728
  duration = time.time() - start_time
729
  stats = {
730
  "total_time_seconds": round(duration),
@@ -764,7 +719,7 @@ async def deep_research_endpoint(request: DeepResearchRequest):
764
  if not request.query or len(request.query.strip()) < 3:
765
  raise HTTPException(status_code=400, detail="Query must be at least 3 characters long")
766
 
767
- search_time = min(max(request.search_time, 60), 180) # Clamp between 60 and 180 seconds
768
  return StreamingResponse(
769
  run_deep_research_stream(request.query.strip(), search_time),
770
  media_type="text/event-stream"
@@ -772,4 +727,4 @@ async def deep_research_endpoint(request: DeepResearchRequest):
772
 
773
  if __name__ == "__main__":
774
  import uvicorn
775
- uvicorn.run(app, host="0.0.0.0", port=8000)
 
34
  # --- Constants & Headers ---
35
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
36
  LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
37
+ MAX_SOURCES_TO_PROCESS = 10
38
+ MAX_CONCURRENT_REQUESTS = 3 # Reduced to avoid rate-limiting
39
+ SEARCH_TIMEOUT = 90 # Reduced to ensure time for processing
40
+ TOTAL_TIMEOUT = 180
41
+ REQUEST_DELAY = 2.0 # Increased delay to avoid rate-limiting
42
  USER_AGENT_ROTATION = True
43
+ RETRY_ATTEMPTS = 3 # Number of retries for failed search requests
44
+ RETRY_DELAY = 3.0 # Delay between retries
45
 
46
  # Initialize fake user agent generator
47
  try:
 
64
 
65
  class DeepResearchRequest(BaseModel):
66
  query: str
67
+ search_time: int = 90 # Default to 90 seconds
68
 
69
  app = FastAPI(
70
  title="AI Deep Research API",
 
93
  """Get a realistic user agent string."""
94
  try:
95
  if isinstance(ua, UserAgent):
96
+ return ua.random
97
+ return ua.random()
98
  except:
99
  return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
100
 
 
105
 
106
  # Handle DuckDuckGo redirect URLs
107
  if url.startswith('//duckduckgo.com/l/'):
108
+ url = f"https:{url}"
109
  try:
 
110
  parsed = urlparse(url)
111
  query_params = parsed.query
112
  if 'uddg=' in query_params:
 
113
  match = re.search(r'uddg=([^&]+)', query_params)
114
  if match:
115
  encoded_url = match.group(1)
116
  try:
117
+ # Properly decode the URL
118
+ from urllib.parse import unquote
119
+ return unquote(encoded_url)
 
120
  except:
121
  pass
122
  except:
 
147
  robots = await response.text()
148
  if "Disallow: /" in robots:
149
  return False
 
150
  path = re.sub(r'https?://[^/]+', '', url)
151
  if any(f"Disallow: {p}" in robots for p in [path, path.rstrip('/') + '/']):
152
  return False
 
157
 
158
  async def fetch_search_results(query: str, max_results: int = 5) -> List[dict]:
159
  """
160
+ Perform a real search using DuckDuckGo's HTML interface with retry logic.
161
  """
162
+ headers = {
163
+ "User-Agent": await get_real_user_agent(),
164
+ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
165
+ "Accept-Language": "en-US,en;q=0.5",
166
+ "Referer": "https://duckduckgo.com/",
167
+ "DNT": "1"
168
+ }
169
+
170
+ for attempt in range(RETRY_ATTEMPTS):
171
+ try:
172
+ search_url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
173
+ async with aiohttp.ClientSession() as session:
174
+ async with session.get(search_url, headers=headers, timeout=10) as response:
175
+ if response.status != 200:
176
+ if response.status == 202:
177
+ logging.warning(f"Search attempt {attempt + 1} failed with status 202 for query '{query}'")
178
+ if attempt < RETRY_ATTEMPTS - 1:
179
+ await asyncio.sleep(RETRY_DELAY)
 
 
 
 
 
 
 
 
 
 
 
180
  continue
181
+ logging.warning(f"Search failed with status {response.status} for query '{query}'")
182
+ return []
183
+
184
+ html = await response.text()
185
+ soup = BeautifulSoup(html, 'html.parser')
186
+
187
+ results = []
188
+ for selector in ['.result__body', '.result__a', '.result']:
189
+ if len(results) >= max_results:
190
+ break
191
+
192
+ for result in soup.select(selector)[:max_results]:
193
+ try:
194
+ title_elem = result.select_one('.result__title .result__a') or result.select_one('.result__a')
195
+ if not title_elem:
196
+ continue
197
+
198
+ link = title_elem['href']
199
+ snippet_elem = result.select_one('.result__snippet')
200
+
201
+ clean_link = clean_url(link)
202
+ if not clean_link or clean_link.startswith('javascript:'):
203
+ continue
204
+
205
+ snippet = snippet_elem.get_text(strip=True) if snippet_elem else ""
206
+ results.append({
207
+ 'title': title_elem.get_text(strip=True),
208
+ 'link': clean_link,
209
+ 'snippet': snippet
210
+ })
211
+ except Exception as e:
212
+ logging.warning(f"Error parsing search result: {e}")
213
  continue
214
 
215
+ logging.info(f"Found {len(results)} real search results for '{query}'")
216
+ return results[:max_results]
217
+ except Exception as e:
218
+ logging.error(f"Search attempt {attempt + 1} failed for '{query}': {e}")
219
+ if attempt < RETRY_ATTEMPTS - 1:
220
+ await asyncio.sleep(RETRY_DELAY)
221
+ continue
222
+ logging.error(f"All {RETRY_ATTEMPTS} search attempts failed for '{query}'")
223
+ return []
 
 
 
 
 
 
 
 
224
 
225
  async def process_web_source(session: aiohttp.ClientSession, source: dict, timeout: int = 15) -> Tuple[str, dict]:
226
  """
 
228
  """
229
  headers = {'User-Agent': await get_real_user_agent()}
230
  source_info = source.copy()
231
+ source_info['link'] = clean_url(source['link'])
232
 
 
233
  if not source_info['link'] or not source_info['link'].startswith(('http://', 'https://')):
234
+ logging.warning(f"Invalid URL: {source_info['link']}")
235
  return source.get('snippet', ''), source_info
236
 
 
237
  if not await check_robots_txt(source_info['link']):
238
  logging.info(f"Scraping disallowed by robots.txt for {source_info['link']}")
239
  return source.get('snippet', ''), source_info
 
242
  logging.info(f"Processing source: {source_info['link']}")
243
  start_time = time.time()
244
 
 
245
  if any(source_info['link'].lower().endswith(ext) for ext in ['.pdf', '.doc', '.docx', '.ppt', '.pptx', '.xls', '.xlsx']):
246
  logging.info(f"Skipping non-HTML content at {source_info['link']}")
247
  return source.get('snippet', ''), source_info
248
 
 
249
  await asyncio.sleep(REQUEST_DELAY)
250
 
251
  async with session.get(source_info['link'], headers=headers, timeout=timeout, ssl=False) as response:
 
261
  html = await response.text()
262
  soup = BeautifulSoup(html, "html.parser")
263
 
 
264
  for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'iframe', 'noscript', 'form']):
265
  tag.decompose()
266
 
 
267
  selectors_to_try = [
268
  'main',
269
  'article',
 
286
  break
287
 
288
  if not main_content:
 
289
  all_elements = soup.find_all()
290
  candidates = [el for el in all_elements if el.name not in ['script', 'style', 'nav', 'footer', 'header']]
291
  if candidates:
 
295
  if not main_content:
296
  main_content = soup.find('body') or soup
297
 
 
298
  content = " ".join(main_content.stripped_strings)
299
  content = re.sub(r'\s+', ' ', content).strip()
300
 
 
301
  if len(content.split()) < 50 and len(html) > 10000:
 
302
  paras = soup.find_all('p')
303
  content = " ".join([p.get_text() for p in paras if p.get_text().strip()])
304
  content = re.sub(r'\s+', ' ', content).strip()
305
 
 
306
  if len(content.split()) < 50:
307
  content = " ".join(soup.stripped_strings)
308
  content = re.sub(r'\s+', ' ', content).strip()
309
 
 
310
  if len(content.split()) < 30:
 
311
  for tag in ['div', 'section', 'article']:
312
  for element in soup.find_all(tag):
313
+ if len(element.get_text().split()) > 200:
314
  content = " ".join(element.stripped_strings)
315
  content = re.sub(r'\s+', ' ', content).strip()
316
+ if len(content.split()) >= 30:
317
  break
318
  if len(content.split()) >= 30:
319
  break
 
365
  cleaned_q = re.sub(r'^[^a-zA-Z0-9]*|[^a-zA-Z0-9]*$', '', q)
366
  if cleaned_q:
367
  cleaned.append(cleaned_q)
368
+ return cleaned[:6]
369
 
 
370
  return [
371
  f"What is {query} and its key features?",
372
  f"How does {query} compare to alternatives?",
 
383
  f"What are the challenges with {query}?"
384
  ]
385
 
386
+ async def continuous_search(query: str, search_time: int = 90) -> List[dict]:
387
  """
388
+ Perform continuous searching with retries and diverse queries.
389
  """
390
  start_time = time.time()
391
  all_results = []
392
  seen_urls = set()
393
 
 
394
  query_variations = [
395
  query,
396
  f"{query} comparison",
 
397
  f"{query} review",
398
+ f"{query} latest developments",
399
+ f"{query} features and benefits",
400
+ f"{query} challenges and limitations"
401
  ]
402
 
403
  async with aiohttp.ClientSession() as session:
404
+ iteration = 0
405
  while time.time() - start_time < search_time:
406
+ iteration += 1
407
  random.shuffle(query_variations)
408
+ for q in query_variations[:3]:
 
409
  if time.time() - start_time >= search_time:
410
  logger.info(f"Search timed out after {search_time} seconds. Found {len(all_results)} results.")
411
  break
412
 
413
+ logger.info(f"Iteration {iteration}: Searching for query variation: {q}")
414
  try:
415
  results = await fetch_search_results(q, max_results=5)
416
  logger.info(f"Retrieved {len(results)} results for query '{q}'")
 
422
  all_results.append(result)
423
  logger.info(f"Added new result: {result['title']} ({result['link']})")
424
 
425
+ await asyncio.sleep(REQUEST_DELAY)
426
+ if len(all_results) >= MAX_SOURCES_TO_PROCESS * 1.5:
 
 
 
427
  logger.info(f"Reached sufficient results: {len(all_results)}")
428
  break
429
  except Exception as e:
430
  logger.error(f"Error during search for '{q}': {e}")
431
+ await asyncio.sleep(RETRY_DELAY)
432
+
433
+ if len(all_results) >= MAX_SOURCES_TO_PROCESS * 1.5:
434
+ break
435
 
436
+ logger.info(f"Completed continuous search. Total results: {len(all_results)}")
437
 
 
438
  if all_results:
 
439
  def score_result(result):
 
440
  query_terms = set(query.lower().split())
441
  title = result['title'].lower()
442
  snippet = result['snippet'].lower()
443
+ matches = sum(1 for term in query_terms if term in title or term in snippet)
 
 
 
 
 
 
444
  snippet_length = len(result['snippet'].split())
 
445
  return matches * 10 + snippet_length
446
 
447
+ all_results.sort(key=score_result, reverse=True)
 
448
 
449
+ return all_results[:MAX_SOURCES_TO_PROCESS * 2]
450
 
451
  async def filter_and_select_sources(results: List[dict]) -> List[dict]:
452
  """
 
458
 
459
  logger.info(f"Filtering {len(results)} search results...")
460
 
 
461
  domain_counts = defaultdict(int)
462
  domain_results = defaultdict(list)
463
  for result in results:
 
466
  domain_results[domain].append(result)
467
 
468
  selected = []
 
 
469
  for domain, domain_res in domain_results.items():
470
  if len(selected) >= MAX_SOURCES_TO_PROCESS:
471
  break
 
472
  if domain_res:
473
  selected.append(domain_res[0])
474
  logger.info(f"Selected top result from domain {domain}: {domain_res[0]['link']}")
475
 
 
476
  if len(selected) < MAX_SOURCES_TO_PROCESS:
 
477
  domain_quality = {}
478
  for domain, domain_res in domain_results.items():
479
  avg_length = sum(len(r['snippet'].split()) for r in domain_res) / len(domain_res)
480
  domain_quality[domain] = avg_length
481
 
 
482
  sorted_domains = sorted(domain_quality.items(), key=lambda x: x[1], reverse=True)
 
 
483
  for domain, _ in sorted_domains:
484
  if len(selected) >= MAX_SOURCES_TO_PROCESS:
485
  break
 
490
  if len(selected) >= MAX_SOURCES_TO_PROCESS:
491
  break
492
 
 
493
  if len(selected) < MAX_SOURCES_TO_PROCESS:
494
  all_results_sorted = sorted(results, key=lambda x: len(x['snippet'].split()), reverse=True)
495
  for res in all_results_sorted:
 
502
  logger.info(f"Selected {len(selected)} sources after filtering.")
503
  return selected[:MAX_SOURCES_TO_PROCESS]
504
 
505
+ async def run_deep_research_stream(query: str, search_time: int = 90) -> AsyncGenerator[str, None]:
506
  def format_sse(data: dict) -> str:
507
  return f"data: {json.dumps(data)}\n\n"
508
 
 
512
  total_tokens = 0
513
 
514
  try:
 
515
  yield format_sse({
516
  "event": "status",
517
  "data": f"Starting deep research on '{query}'. Search time limit: {search_time} seconds."
518
  })
519
 
520
  async with aiohttp.ClientSession() as session:
 
521
  yield format_sse({"event": "status", "data": "Generating comprehensive research plan..."})
522
  sub_questions = await generate_research_plan(query, session)
523
  yield format_sse({"event": "plan", "data": sub_questions})
524
 
 
525
  yield format_sse({
526
  "event": "status",
527
  "data": f"Performing continuous search for up to {search_time} seconds..."
 
544
  })
545
  return
546
 
 
547
  selected_sources = await filter_and_select_sources(search_results)
548
  yield format_sse({
549
  "event": "status",
 
561
  })
562
  return
563
 
 
564
  semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
565
  consolidated_context = ""
566
  all_sources_used = []
 
570
  async with semaphore:
571
  return await process_web_source(session, source, timeout=20)
572
 
 
573
  processing_tasks = []
574
  for i, source in enumerate(selected_sources):
 
575
  elapsed = time.time() - start_time
576
+ if elapsed > TOTAL_TIMEOUT * 0.8:
577
  yield format_sse({
578
  "event": "status",
579
  "data": f"Approaching time limit, stopping source processing at {i}/{len(selected_sources)}"
580
  })
581
  break
582
 
 
583
  if i > 0:
584
  await asyncio.sleep(REQUEST_DELAY * 0.5)
585
 
 
592
  "data": f"Processed {min(i+1, len(selected_sources))}/{len(selected_sources)} sources..."
593
  })
594
 
 
595
  for future in asyncio.as_completed(processing_tasks):
596
  processed_sources += 1
597
  content, source_info = await future
 
599
  consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
600
  all_sources_used.append(source_info)
601
  successful_sources += 1
602
+ total_tokens += len(content.split())
603
  yield format_sse({
604
  "event": "processed_source",
605
  "data": source_info
 
614
  })
615
  return
616
 
 
617
  time_remaining = max(0, TOTAL_TIMEOUT - (time.time() - start_time))
618
  yield format_sse({
619
  "event": "status",
620
  "data": f"Synthesizing comprehensive report from {successful_sources} sources..."
621
  })
622
 
623
+ max_output_tokens = min(2000, int(time_remaining * 6))
624
 
625
  report_prompt = f"""Compose an in-depth analysis report on "{query}".
626
 
 
641
  Cite sources where appropriate using inline citations like [1][2].
642
 
643
  Available information from {successful_sources} sources:
644
+ {consolidated_context[:20000]}
645
 
646
  Generate a comprehensive report of approximately {max_output_tokens//4} words.
647
  Focus on providing deep insights, analysis, and actionable information.
 
654
  "max_tokens": max_output_tokens
655
  }
656
 
 
657
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
658
  response.raise_for_status()
659
  async for line in response.content:
 
680
  logging.warning(f"Error processing stream chunk: {e}")
681
  continue
682
 
 
683
  duration = time.time() - start_time
684
  stats = {
685
  "total_time_seconds": round(duration),
 
719
  if not request.query or len(request.query.strip()) < 3:
720
  raise HTTPException(status_code=400, detail="Query must be at least 3 characters long")
721
 
722
+ search_time = min(max(request.search_time, 60), 180)
723
  return StreamingResponse(
724
  run_deep_research_stream(request.query.strip(), search_time),
725
  media_type="text/event-stream"
 
727
 
728
  if __name__ == "__main__":
729
  import uvicorn
730
+ uvicorn.run(app, host="0.0.0.0", port=8000)