rkihacker commited on
Commit
4118768
·
verified ·
1 Parent(s): 4d61bdb

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +201 -158
main.py CHANGED
@@ -34,11 +34,11 @@ else:
34
  # --- Constants & Headers ---
35
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
36
  LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
37
- MAX_SOURCES_TO_PROCESS = 10 # Increased to get more comprehensive results
38
- MAX_CONCURRENT_REQUESTS = 5 # Increased for faster processing
39
- SEARCH_TIMEOUT = 120 # 2 minutes for searching (adjustable)
40
- TOTAL_TIMEOUT = 180 # 3 minutes total
41
- REQUEST_DELAY = 1.0 # Shorter delay between requests
42
  USER_AGENT_ROTATION = True
43
 
44
  # Initialize fake user agent generator
@@ -62,12 +62,12 @@ LLM_HEADERS = {
62
 
63
  class DeepResearchRequest(BaseModel):
64
  query: str
65
- search_time: int = 120 # Default to 2 minutes
66
 
67
  app = FastAPI(
68
  title="AI Deep Research API",
69
- description="Provides comprehensive research reports from real web searches within 1-2 minutes.",
70
- version="3.0.0"
71
  )
72
  app.add_middleware(
73
  CORSMiddleware,
@@ -79,7 +79,9 @@ app.add_middleware(
79
 
80
  def extract_json_from_llm_response(text: str) -> Optional[list]:
81
  """Extract JSON array from LLM response text."""
82
- match = re.search(r'\[.*\]', text, re.DOTALL)
 
 
83
  if match:
84
  try:
85
  return json.loads(match.group(0))
@@ -105,19 +107,19 @@ def clean_url(url: str) -> str:
105
  if url.startswith('//duckduckgo.com/l/'):
106
  url = f"https:{url}" # Make it a proper URL
107
  try:
108
- # Extract the real URL from DuckDuckGo's redirect
109
  parsed = urlparse(url)
110
  query_params = parsed.query
111
  if 'uddg=' in query_params:
112
- # Extract the actual URL from the parameter
113
  match = re.search(r'uddg=([^&]+)', query_params)
114
  if match:
115
  encoded_url = match.group(1)
116
  try:
117
- url = quote_plus(encoded_url) # This might need better decoding
118
- # For simplicity, we'll just return the decoded URL
119
- # In production, you'd want to properly URL-decode this
120
- return encoded_url
 
 
121
  except:
122
  pass
123
  except:
@@ -193,7 +195,7 @@ async def fetch_search_results(query: str, max_results: int = 5) -> List[dict]:
193
  continue
194
 
195
  link = title_elem['href']
196
- snippet_elem = result.select_one('.result__snippet')
197
 
198
  # Clean the URL
199
  clean_link = clean_url(link)
@@ -205,10 +207,15 @@ async def fetch_search_results(query: str, max_results: int = 5) -> List[dict]:
205
  # Get snippet if available
206
  snippet = snippet_elem.get_text(strip=True) if snippet_elem else ""
207
 
 
 
 
 
208
  results.append({
209
  'title': title_elem.get_text(strip=True),
210
  'link': clean_link,
211
- 'snippet': snippet
 
212
  })
213
  except Exception as e:
214
  logging.warning(f"Error parsing search result: {e}")
@@ -372,7 +379,7 @@ async def generate_research_plan(query: str, session: aiohttp.ClientSession) ->
372
  cleaned = []
373
  for q in sub_questions:
374
  if isinstance(q, str) and q.strip():
375
- cleaned_q = re.sub(r'^[^a-zA-Z0-9]*|[^a-zA-Z0-9]*$', '', q)
376
  if cleaned_q:
377
  cleaned.append(cleaned_q)
378
  return cleaned[:6] # Limit to 6 questions max
@@ -397,10 +404,13 @@ async def generate_research_plan(query: str, session: aiohttp.ClientSession) ->
397
  async def continuous_search(query: str, search_time: int = 120) -> List[dict]:
398
  """
399
  Perform continuous searching for better results within time constraints.
 
400
  """
401
  start_time = time.time()
402
  all_results = []
403
  seen_urls = set()
 
 
404
 
405
  # Generate multiple variations of the query
406
  query_variations = [
@@ -409,43 +419,70 @@ async def continuous_search(query: str, search_time: int = 120) -> List[dict]:
409
  f"{query} analysis",
410
  f"{query} review",
411
  f"{query} features",
412
- f"{query} vs alternatives"
 
 
413
  ]
414
 
415
  async with aiohttp.ClientSession() as session:
416
  while time.time() - start_time < search_time:
 
417
  # Shuffle the query variations to get diverse results
418
  random.shuffle(query_variations)
419
 
420
- for q in query_variations[:3]: # Only use first 3 variations in each iteration
 
 
 
421
  if time.time() - start_time >= search_time:
422
  break
423
 
424
  try:
 
 
425
  results = await fetch_search_results(q, max_results=5)
426
- for result in results:
427
- clean_link = clean_url(result['link'])
428
- if clean_link and clean_link not in seen_urls:
 
 
 
 
 
 
 
 
 
 
 
429
  seen_urls.add(clean_link)
 
 
 
430
  result['link'] = clean_link
431
  all_results.append(result)
432
- logging.info(f"Found new result: {result['title']}")
433
 
434
  # Small delay between searches
435
  await asyncio.sleep(1.0)
436
 
437
  # If we have enough unique results, we can stop early
438
- if len(all_results) >= MAX_SOURCES_TO_PROCESS * 1.5: # Get more than we need for selection
 
439
  break
 
440
  except Exception as e:
441
  logging.error(f"Error during continuous search: {e}")
442
  await asyncio.sleep(2.0) # Wait a bit before trying again
443
 
 
 
 
 
444
  # Filter and sort results by relevance
445
  if all_results:
446
- # Simple relevance scoring (could be enhanced with more sophisticated methods)
447
  def score_result(result):
448
- # Score based on how many query terms appear in title/snippet
449
  query_terms = set(query.lower().split())
450
  title = result['title'].lower()
451
  snippet = result['snippet'].lower()
@@ -458,7 +495,11 @@ async def continuous_search(query: str, search_time: int = 120) -> List[dict]:
458
  # Also consider length of snippet as a proxy for content richness
459
  snippet_length = len(result['snippet'].split())
460
 
461
- return matches * 10 + snippet_length
 
 
 
 
462
 
463
  # Sort by score, descending
464
  all_results.sort(key=lambda x: score_result(x), reverse=True)
@@ -468,19 +509,21 @@ async def continuous_search(query: str, search_time: int = 120) -> List[dict]:
468
  async def filter_and_select_sources(results: List[dict]) -> List[dict]:
469
  """
470
  Filter and select the best sources from search results.
 
471
  """
472
  if not results:
473
- return []
474
 
475
  # Group by domain to ensure diversity
476
  domain_counts = defaultdict(int)
477
  domain_results = defaultdict(list)
478
  for result in results:
479
- domain = urlparse(result['link']).netloc
480
  domain_counts[domain] += 1
481
  domain_results[domain].append(result)
482
 
483
  selected = []
 
484
 
485
  # First pass: take the top result from each domain
486
  for domain, domain_res in domain_results.items():
@@ -488,6 +531,8 @@ async def filter_and_select_sources(results: List[dict]) -> List[dict]:
488
  break
489
  # Take the best result from this domain (sorted by position in original results)
490
  if domain_res:
 
 
491
  selected.append(domain_res[0])
492
 
493
  # Second pass: if we need more, take additional results from domains with good content
@@ -495,6 +540,8 @@ async def filter_and_select_sources(results: List[dict]) -> List[dict]:
495
  # Calculate average snippet length as a proxy for content quality
496
  domain_quality = {}
497
  for domain, domain_res in domain_results.items():
 
 
498
  avg_length = sum(len(r['snippet'].split()) for r in domain_res) / len(domain_res)
499
  domain_quality[domain] = avg_length
500
 
@@ -511,16 +558,19 @@ async def filter_and_select_sources(results: List[dict]) -> List[dict]:
511
  if len(selected) >= MAX_SOURCES_TO_PROCESS:
512
  break
513
 
514
- # Final pass: if still need more, add remaining high-snippet-length results
515
  if len(selected) < MAX_SOURCES_TO_PROCESS:
516
- all_results_sorted = sorted(results, key=lambda x: len(x['snippet'].split()), reverse=True)
517
- for res in all_results_sorted:
518
- if res not in selected:
519
- selected.append(res)
520
- if len(selected) >= MAX_SOURCES_TO_PROCESS:
521
- break
 
 
 
522
 
523
- return selected[:MAX_SOURCES_TO_PROCESS]
524
 
525
  async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncGenerator[str, None]:
526
  def format_sse(data: dict) -> str:
@@ -535,27 +585,66 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
535
  # Initialize the SSE stream with start message
536
  yield format_sse({
537
  "event": "status",
538
- "data": f"Starting deep research on '{query}'. Search time limit: {search_time} seconds."
539
  })
540
 
541
  async with aiohttp.ClientSession() as session:
542
  # Step 1: Generate research plan
543
  yield format_sse({"event": "status", "data": "Generating comprehensive research plan..."})
544
  sub_questions = await generate_research_plan(query, session)
545
- yield format_sse({"event": "plan", "data": sub_questions})
 
 
 
 
 
 
546
 
547
  # Step 2: Continuous search for better results
548
  yield format_sse({
549
  "event": "status",
550
- "data": f"Performing continuous search for up to {search_time} seconds..."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
551
  })
552
 
553
  search_results = await continuous_search(query, search_time)
 
 
 
554
  yield format_sse({
555
  "event": "status",
556
- "data": f"Found {len(search_results)} potential sources. Selecting the best ones..."
557
  })
558
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
559
  if not search_results:
560
  yield format_sse({
561
  "event": "error",
@@ -564,10 +653,13 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
564
  return
565
 
566
  # Select the best sources
567
- selected_sources = await filter_and_select_sources(search_results)
 
 
 
568
  yield format_sse({
569
  "event": "status",
570
- "data": f"Selected {len(selected_sources)} high-quality sources to process."
571
  })
572
 
573
  if not selected_sources:
@@ -577,6 +669,20 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
577
  })
578
  return
579
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
580
  # Step 3: Process selected sources with concurrency control
581
  semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
582
  consolidated_context = ""
@@ -595,7 +701,7 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
595
  if elapsed > TOTAL_TIMEOUT * 0.8: # Leave 20% of time for synthesis
596
  yield format_sse({
597
  "event": "status",
598
- "data": f"Approaching time limit, stopping source processing at {i}/{len(selected_sources)}"
599
  })
600
  break
601
 
@@ -603,26 +709,57 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
603
  if i > 0:
604
  await asyncio.sleep(REQUEST_DELAY * 0.5)
605
 
 
 
 
 
 
 
 
 
 
 
 
 
 
606
  task = asyncio.create_task(process_with_semaphore(source))
607
  processing_tasks.append(task)
608
 
609
- if (i + 1) % 2 == 0 or (i + 1) == len(selected_sources):
610
- yield format_sse({
611
- "event": "status",
612
- "data": f"Processed {min(i+1, len(selected_sources))}/{len(selected_sources)} sources..."
613
- })
614
-
615
  # Process completed tasks as they finish
616
  for future in asyncio.as_completed(processing_tasks):
617
  processed_sources += 1
618
  content, source_info = await future
 
619
  if content and content.strip():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
620
  consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
621
  all_sources_used.append(source_info)
622
  successful_sources += 1
623
- total_tokens += len(content.split()) # Rough token count
624
  else:
625
  processing_errors += 1
 
 
 
 
 
 
 
 
626
 
627
  if not consolidated_context.strip():
628
  yield format_sse({
@@ -631,120 +768,26 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
631
  })
632
  return
633
 
 
 
 
 
 
 
634
  # Step 4: Synthesize comprehensive report
635
  time_remaining = max(0, TOTAL_TIMEOUT - (time.time() - start_time))
636
  yield format_sse({
637
  "event": "status",
638
- "data": f"Synthesizing comprehensive report from {successful_sources} sources..."
639
  })
640
 
641
  max_output_tokens = min(2000, int(time_remaining * 6)) # More aggressive token count
642
 
643
- report_prompt = f"""Compose an in-depth analysis report on "{query}".
644
 
645
  Structure the report with these sections:
646
- 1. Introduction and Background
647
  2. Key Features and Capabilities
648
- 3. Comparative Analysis with Alternatives
649
- 4. Current Developments and Trends
650
- 5. Challenges and Limitations
651
- 6. Future Outlook
652
- 7. Conclusion and Recommendations
653
-
654
- For each section, provide detailed analysis based on the source material.
655
- Include specific examples and data points from the sources when available.
656
- Compare and contrast different viewpoints from various sources.
657
-
658
- Use markdown formatting for headings, subheadings, lists, and emphasis.
659
- Cite sources where appropriate using inline citations like [1][2].
660
-
661
- Available information from {successful_sources} sources:
662
- {consolidated_context[:20000]} # Increased context size
663
-
664
- Generate a comprehensive report of approximately {max_output_tokens//4} words.
665
- Focus on providing deep insights, analysis, and actionable information.
666
- """
667
-
668
- report_payload = {
669
- "model": LLM_MODEL,
670
- "messages": [{"role": "user", "content": report_prompt}],
671
- "stream": True,
672
- "max_tokens": max_output_tokens
673
- }
674
-
675
- # Stream the report generation
676
- async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
677
- response.raise_for_status()
678
- async for line in response.content:
679
- if time.time() - start_time > TOTAL_TIMEOUT:
680
- yield format_sse({
681
- "event": "warning",
682
- "data": "Time limit reached, ending report generation early."
683
- })
684
- break
685
-
686
- line_str = line.decode('utf-8').strip()
687
- if line_str.startswith('data:'):
688
- line_str = line_str[5:].strip()
689
- if line_str == "[DONE]":
690
- break
691
- try:
692
- chunk = json.loads(line_str)
693
- choices = chunk.get("choices")
694
- if choices and isinstance(choices, list) and len(choices) > 0:
695
- content = choices[0].get("delta", {}).get("content")
696
- if content:
697
- yield format_sse({"event": "chunk", "data": content})
698
- except Exception as e:
699
- logging.warning(f"Error processing stream chunk: {e}")
700
- continue
701
-
702
- # Final status update
703
- duration = time.time() - start_time
704
- stats = {
705
- "total_time_seconds": round(duration),
706
- "sources_processed": processed_sources,
707
- "sources_successful": successful_sources,
708
- "estimated_tokens": total_tokens,
709
- "sources_used": len(all_sources_used)
710
- }
711
- yield format_sse({
712
- "event": "status",
713
- "data": f"Research completed successfully in {duration:.1f} seconds."
714
- })
715
- yield format_sse({"event": "stats", "data": stats})
716
- yield format_sse({"event": "sources", "data": all_sources_used})
717
-
718
- except asyncio.TimeoutError:
719
- yield format_sse({
720
- "event": "error",
721
- "data": f"Research process timed out after {TOTAL_TIMEOUT} seconds."
722
- })
723
- except Exception as e:
724
- logging.error(f"Critical error in research process: {e}", exc_info=True)
725
- yield format_sse({
726
- "event": "error",
727
- "data": f"An unexpected error occurred: {str(e)[:200]}"
728
- })
729
- finally:
730
- duration = time.time() - start_time
731
- yield format_sse({
732
- "event": "complete",
733
- "data": f"Research process finished after {duration:.1f} seconds."
734
- })
735
-
736
- @app.post("/deep-research", response_class=StreamingResponse)
737
- async def deep_research_endpoint(request: DeepResearchRequest):
738
- """Endpoint for deep research that streams SSE responses."""
739
- if not request.query or len(request.query.strip()) < 3:
740
- raise HTTPException(status_code=400, detail="Query must be at least 3 characters long")
741
-
742
- search_time = min(max(request.search_time, 60), 180) # Clamp between 60 and 180 seconds
743
- return StreamingResponse(
744
- run_deep_research_stream(request.query.strip(), search_time),
745
- media_type="text/event-stream"
746
- )
747
-
748
- if __name__ == "__main__":
749
- import uvicorn
750
- uvicorn.run(app, host="0.0.0.0", port=8000)
 
34
  # --- Constants & Headers ---
35
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
36
  LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
37
+ MAX_SOURCES_TO_PROCESS = 10
38
+ MAX_CONCURRENT_REQUESTS = 5
39
+ SEARCH_TIMEOUT = 120 # Default search time in seconds
40
+ TOTAL_TIMEOUT = 180 # Total time limit in seconds
41
+ REQUEST_DELAY = 1.0 # Delay between requests in seconds
42
  USER_AGENT_ROTATION = True
43
 
44
  # Initialize fake user agent generator
 
62
 
63
  class DeepResearchRequest(BaseModel):
64
  query: str
65
+ search_time: int = SEARCH_TIMEOUT # Default search time
66
 
67
  app = FastAPI(
68
  title="AI Deep Research API",
69
+ description="Provides comprehensive research reports from real web searches.",
70
+ version="3.1.0"
71
  )
72
  app.add_middleware(
73
  CORSMiddleware,
 
79
 
80
  def extract_json_from_llm_response(text: str) -> Optional[list]:
81
  """Extract JSON array from LLM response text."""
82
+ match = re.search(r'$$
83
+ .*
84
+ $$', text, re.DOTALL)
85
  if match:
86
  try:
87
  return json.loads(match.group(0))
 
107
  if url.startswith('//duckduckgo.com/l/'):
108
  url = f"https:{url}" # Make it a proper URL
109
  try:
 
110
  parsed = urlparse(url)
111
  query_params = parsed.query
112
  if 'uddg=' in query_params:
 
113
  match = re.search(r'uddg=([^&]+)', query_params)
114
  if match:
115
  encoded_url = match.group(1)
116
  try:
117
+ # URL decode the parameter
118
+ decoded_url = quote_plus(encoded_url)
119
+ # Sometimes it's double-encoded
120
+ if '%25' in decoded_url:
121
+ decoded_url = quote_plus(decoded_url)
122
+ return decoded_url
123
  except:
124
  pass
125
  except:
 
195
  continue
196
 
197
  link = title_elem['href']
198
+ snippet_elem = result.select_one('.result__snippet') or result.select_one('.result__body')
199
 
200
  # Clean the URL
201
  clean_link = clean_url(link)
 
207
  # Get snippet if available
208
  snippet = snippet_elem.get_text(strip=True) if snippet_elem else ""
209
 
210
+ # Skip if we already have this URL
211
+ if any(r['link'] == clean_link for r in results):
212
+ continue
213
+
214
  results.append({
215
  'title': title_elem.get_text(strip=True),
216
  'link': clean_link,
217
+ 'snippet': snippet,
218
+ 'source': 'duckduckgo'
219
  })
220
  except Exception as e:
221
  logging.warning(f"Error parsing search result: {e}")
 
379
  cleaned = []
380
  for q in sub_questions:
381
  if isinstance(q, str) and q.strip():
382
+ cleaned_q = re.sub(r'^[^a-zA-Z0-9]*|[^a-zA-Z0-9]*\$', '', q)
383
  if cleaned_q:
384
  cleaned.append(cleaned_q)
385
  return cleaned[:6] # Limit to 6 questions max
 
404
  async def continuous_search(query: str, search_time: int = 120) -> List[dict]:
405
  """
406
  Perform continuous searching for better results within time constraints.
407
+ Provides detailed feedback about the search process.
408
  """
409
  start_time = time.time()
410
  all_results = []
411
  seen_urls = set()
412
+ seen_domains = defaultdict(int)
413
+ search_iterations = 0
414
 
415
  # Generate multiple variations of the query
416
  query_variations = [
 
419
  f"{query} analysis",
420
  f"{query} review",
421
  f"{query} features",
422
+ f"{query} vs alternatives",
423
+ f"latest {query} news",
424
+ f"{query} pros and cons"
425
  ]
426
 
427
  async with aiohttp.ClientSession() as session:
428
  while time.time() - start_time < search_time:
429
+ search_iterations += 1
430
  # Shuffle the query variations to get diverse results
431
  random.shuffle(query_variations)
432
 
433
+ # Use only a subset of queries each iteration
434
+ queries_for_this_iteration = query_variations[:min(3, len(query_variations))]
435
+
436
+ for q in queries_for_this_iteration:
437
  if time.time() - start_time >= search_time:
438
  break
439
 
440
  try:
441
+ # Notify about current search
442
+ logging.info(f"Searching for: '{q}'")
443
  results = await fetch_search_results(q, max_results=5)
444
+
445
+ if results:
446
+ for result in results:
447
+ clean_link = clean_url(result['link'])
448
+ domain = urlparse(clean_link).netloc if clean_link else ""
449
+
450
+ # Skip if we've already seen this URL
451
+ if clean_link in seen_urls:
452
+ continue
453
+
454
+ # Skip if we have too many results from this domain
455
+ if domain and seen_domains[domain] >= 2: # Max 2 results per domain
456
+ continue
457
+
458
  seen_urls.add(clean_link)
459
+ if domain:
460
+ seen_domains[domain] += 1
461
+
462
  result['link'] = clean_link
463
  all_results.append(result)
464
+ logging.info(f"Found new result: {result['title']} ({domain})")
465
 
466
  # Small delay between searches
467
  await asyncio.sleep(1.0)
468
 
469
  # If we have enough unique results, we can stop early
470
+ if len(all_results) >= MAX_SOURCES_TO_PROCESS * 2: # Get more than we need for selection
471
+ logging.info(f"Found enough unique results ({len(all_results)})")
472
  break
473
+
474
  except Exception as e:
475
  logging.error(f"Error during continuous search: {e}")
476
  await asyncio.sleep(2.0) # Wait a bit before trying again
477
 
478
+ # Break if we've done several iterations
479
+ if search_iterations >= 4: # Limit to 4 search iterations
480
+ break
481
+
482
  # Filter and sort results by relevance
483
  if all_results:
484
+ # Simple relevance scoring
485
  def score_result(result):
 
486
  query_terms = set(query.lower().split())
487
  title = result['title'].lower()
488
  snippet = result['snippet'].lower()
 
495
  # Also consider length of snippet as a proxy for content richness
496
  snippet_length = len(result['snippet'].split())
497
 
498
+ # Prefer results from diverse domains
499
+ domain = urlparse(result['link']).netloc if result['link'] else ""
500
+ domain_score = 10 if seen_domains[domain] <= 1 else 5 # Bonus for unique domains
501
+
502
+ return matches * 10 + snippet_length + domain_score
503
 
504
  # Sort by score, descending
505
  all_results.sort(key=lambda x: score_result(x), reverse=True)
 
509
  async def filter_and_select_sources(results: List[dict]) -> List[dict]:
510
  """
511
  Filter and select the best sources from search results.
512
+ Returns a tuple of (selected_sources, rejected_sources_with_reasons)
513
  """
514
  if not results:
515
+ return [], []
516
 
517
  # Group by domain to ensure diversity
518
  domain_counts = defaultdict(int)
519
  domain_results = defaultdict(list)
520
  for result in results:
521
+ domain = urlparse(result['link']).netloc if result['link'] else ""
522
  domain_counts[domain] += 1
523
  domain_results[domain].append(result)
524
 
525
  selected = []
526
+ rejected = []
527
 
528
  # First pass: take the top result from each domain
529
  for domain, domain_res in domain_results.items():
 
531
  break
532
  # Take the best result from this domain (sorted by position in original results)
533
  if domain_res:
534
+ # Sort domain results by snippet length (proxy for content richness)
535
+ domain_res.sort(key=lambda x: len(x['snippet'].split()), reverse=True)
536
  selected.append(domain_res[0])
537
 
538
  # Second pass: if we need more, take additional results from domains with good content
 
540
  # Calculate average snippet length as a proxy for content quality
541
  domain_quality = {}
542
  for domain, domain_res in domain_results.items():
543
+ if not domain_res:
544
+ continue
545
  avg_length = sum(len(r['snippet'].split()) for r in domain_res) / len(domain_res)
546
  domain_quality[domain] = avg_length
547
 
 
558
  if len(selected) >= MAX_SOURCES_TO_PROCESS:
559
  break
560
 
561
+ # Third pass: if still need more, add remaining high-snippet-length results
562
  if len(selected) < MAX_SOURCES_TO_PROCESS:
563
+ # Sort all results by snippet length
564
+ remaining_results = [res for res in results if res not in selected]
565
+ remaining_results.sort(key=lambda x: len(x['snippet'].split()), reverse=True)
566
+
567
+ while len(selected) < MAX_SOURCES_TO_PROCESS and remaining_results:
568
+ selected.append(remaining_results.pop(0))
569
+
570
+ # The remaining results are our rejected ones (for now we won't track reasons)
571
+ rejected = [res for res in results if res not in selected]
572
 
573
+ return selected, rejected
574
 
575
  async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncGenerator[str, None]:
576
  def format_sse(data: dict) -> str:
 
585
  # Initialize the SSE stream with start message
586
  yield format_sse({
587
  "event": "status",
588
+ "data": f"Starting deep research on '{query}'. Searching for comprehensive sources..."
589
  })
590
 
591
  async with aiohttp.ClientSession() as session:
592
  # Step 1: Generate research plan
593
  yield format_sse({"event": "status", "data": "Generating comprehensive research plan..."})
594
  sub_questions = await generate_research_plan(query, session)
595
+ yield format_sse({
596
+ "event": "plan",
597
+ "data": {
598
+ "sub_questions": sub_questions,
599
+ "message": f"Research will focus on these {len(sub_questions)} key aspects"
600
+ }
601
+ })
602
 
603
  # Step 2: Continuous search for better results
604
  yield format_sse({
605
  "event": "status",
606
+ "data": "Performing intelligent search for high-quality sources..."
607
+ })
608
+
609
+ # Show search variations we'll use
610
+ query_variations = [
611
+ query,
612
+ f"{query} comparison",
613
+ f"{query} analysis",
614
+ f"{query} review",
615
+ f"{query} features",
616
+ f"{query} vs alternatives"
617
+ ]
618
+ yield format_sse({
619
+ "event": "status",
620
+ "data": f"Using {len(query_variations)} different search variations to find diverse sources"
621
  })
622
 
623
  search_results = await continuous_search(query, search_time)
624
+
625
+ # Report on search results
626
+ unique_domains = len({urlparse(r['link']).netloc for r in search_results if r['link']})
627
  yield format_sse({
628
  "event": "status",
629
+ "data": f"Found {len(search_results)} potential sources from {unique_domains} unique domains"
630
  })
631
 
632
+ # Display some of the top sources found
633
+ if search_results:
634
+ top_sources = search_results[:5] # Show top 5
635
+ sources_list = []
636
+ for i, source in enumerate(top_sources, 1):
637
+ domain = urlparse(source['link']).netloc if source['link'] else "Unknown"
638
+ sources_list.append(f"{i}. {source['title']} ({domain})")
639
+
640
+ yield format_sse({
641
+ "event": "sources_found",
642
+ "data": {
643
+ "top_sources": sources_list,
644
+ "total_sources": len(search_results)
645
+ }
646
+ })
647
+
648
  if not search_results:
649
  yield format_sse({
650
  "event": "error",
 
653
  return
654
 
655
  # Select the best sources
656
+ selected_sources, rejected_sources = await filter_and_select_sources(search_results)
657
+
658
+ # Report on selected sources
659
+ unique_selected_domains = len({urlparse(r['link']).netloc for r in selected_sources if r['link']})
660
  yield format_sse({
661
  "event": "status",
662
+ "data": f"Selected {len(selected_sources)} high-quality sources from {unique_selected_domains} unique domains for in-depth analysis"
663
  })
664
 
665
  if not selected_sources:
 
669
  })
670
  return
671
 
672
+ # Show selected sources
673
+ selected_sources_list = []
674
+ for i, source in enumerate(selected_sources, 1):
675
+ domain = urlparse(source['link']).netloc if source['link'] else "Unknown"
676
+ selected_sources_list.append(f"{i}. {source['title']} ({domain})")
677
+
678
+ yield format_sse({
679
+ "event": "sources_selected",
680
+ "data": {
681
+ "selected_sources": selected_sources_list,
682
+ "message": "Proceeding with in-depth analysis of these sources"
683
+ }
684
+ })
685
+
686
  # Step 3: Process selected sources with concurrency control
687
  semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
688
  consolidated_context = ""
 
701
  if elapsed > TOTAL_TIMEOUT * 0.8: # Leave 20% of time for synthesis
702
  yield format_sse({
703
  "event": "status",
704
+ "data": f"Approaching time limit, stopping source processing after {i}/{len(selected_sources)} sources"
705
  })
706
  break
707
 
 
709
  if i > 0:
710
  await asyncio.sleep(REQUEST_DELAY * 0.5)
711
 
712
+ # Notify about processing this source
713
+ domain = urlparse(source['link']).netloc if source['link'] else "Unknown"
714
+ yield format_sse({
715
+ "event": "processing_source",
716
+ "data": {
717
+ "index": i + 1,
718
+ "total": len(selected_sources),
719
+ "title": source['title'],
720
+ "domain": domain,
721
+ "url": source['link']
722
+ }
723
+ })
724
+
725
  task = asyncio.create_task(process_with_semaphore(source))
726
  processing_tasks.append(task)
727
 
 
 
 
 
 
 
728
  # Process completed tasks as they finish
729
  for future in asyncio.as_completed(processing_tasks):
730
  processed_sources += 1
731
  content, source_info = await future
732
+
733
  if content and content.strip():
734
+ # Report successful processing
735
+ domain = urlparse(source_info['link']).netloc if source_info['link'] else "Unknown"
736
+ word_count = len(content.split())
737
+
738
+ yield format_sse({
739
+ "event": "source_processed",
740
+ "data": {
741
+ "title": source_info['title'],
742
+ "domain": domain,
743
+ "word_count": word_count,
744
+ "status": "success"
745
+ }
746
+ })
747
+
748
+ # Add to our consolidated context
749
  consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
750
  all_sources_used.append(source_info)
751
  successful_sources += 1
752
+ total_tokens += word_count # Add to token count
753
  else:
754
  processing_errors += 1
755
+ yield format_sse({
756
+ "event": "source_processed",
757
+ "data": {
758
+ "title": source_info['title'],
759
+ "status": "failed",
760
+ "reason": "Could not extract sufficient content"
761
+ }
762
+ })
763
 
764
  if not consolidated_context.strip():
765
  yield format_sse({
 
768
  })
769
  return
770
 
771
+ # Report on processing results
772
+ yield format_sse({
773
+ "event": "status",
774
+ "data": f"Successfully processed {successful_sources} of {processed_sources} sources, extracting approximately {total_tokens} words of content"
775
+ })
776
+
777
  # Step 4: Synthesize comprehensive report
778
  time_remaining = max(0, TOTAL_TIMEOUT - (time.time() - start_time))
779
  yield format_sse({
780
  "event": "status",
781
+ "data": f"Generating comprehensive analysis report from {successful_sources} sources..."
782
  })
783
 
784
  max_output_tokens = min(2000, int(time_remaining * 6)) # More aggressive token count
785
 
786
+ report_prompt = f"""Compose a comprehensive analysis report on "{query}".
787
 
788
  Structure the report with these sections:
789
+ 1. Executive Summary
790
  2. Key Features and Capabilities
791
+ 3. Comparative Analysis
792
+ 4. Strengths and Weaknesses
793
+ 5. Current Trends and