rkihacker commited on
Commit
cffab53
·
verified ·
1 Parent(s): 4118768

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +182 -200
main.py CHANGED
@@ -34,11 +34,11 @@ else:
34
  # --- Constants & Headers ---
35
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
36
  LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
37
- MAX_SOURCES_TO_PROCESS = 10
38
- MAX_CONCURRENT_REQUESTS = 5
39
- SEARCH_TIMEOUT = 120 # Default search time in seconds
40
- TOTAL_TIMEOUT = 180 # Total time limit in seconds
41
- REQUEST_DELAY = 1.0 # Delay between requests in seconds
42
  USER_AGENT_ROTATION = True
43
 
44
  # Initialize fake user agent generator
@@ -62,12 +62,12 @@ LLM_HEADERS = {
62
 
63
  class DeepResearchRequest(BaseModel):
64
  query: str
65
- search_time: int = SEARCH_TIMEOUT # Default search time
66
 
67
  app = FastAPI(
68
  title="AI Deep Research API",
69
- description="Provides comprehensive research reports from real web searches.",
70
- version="3.1.0"
71
  )
72
  app.add_middleware(
73
  CORSMiddleware,
@@ -79,9 +79,7 @@ app.add_middleware(
79
 
80
  def extract_json_from_llm_response(text: str) -> Optional[list]:
81
  """Extract JSON array from LLM response text."""
82
- match = re.search(r'$$
83
- .*
84
- $$', text, re.DOTALL)
85
  if match:
86
  try:
87
  return json.loads(match.group(0))
@@ -107,19 +105,19 @@ def clean_url(url: str) -> str:
107
  if url.startswith('//duckduckgo.com/l/'):
108
  url = f"https:{url}" # Make it a proper URL
109
  try:
 
110
  parsed = urlparse(url)
111
  query_params = parsed.query
112
  if 'uddg=' in query_params:
 
113
  match = re.search(r'uddg=([^&]+)', query_params)
114
  if match:
115
  encoded_url = match.group(1)
116
  try:
117
- # URL decode the parameter
118
- decoded_url = quote_plus(encoded_url)
119
- # Sometimes it's double-encoded
120
- if '%25' in decoded_url:
121
- decoded_url = quote_plus(decoded_url)
122
- return decoded_url
123
  except:
124
  pass
125
  except:
@@ -195,7 +193,7 @@ async def fetch_search_results(query: str, max_results: int = 5) -> List[dict]:
195
  continue
196
 
197
  link = title_elem['href']
198
- snippet_elem = result.select_one('.result__snippet') or result.select_one('.result__body')
199
 
200
  # Clean the URL
201
  clean_link = clean_url(link)
@@ -207,15 +205,10 @@ async def fetch_search_results(query: str, max_results: int = 5) -> List[dict]:
207
  # Get snippet if available
208
  snippet = snippet_elem.get_text(strip=True) if snippet_elem else ""
209
 
210
- # Skip if we already have this URL
211
- if any(r['link'] == clean_link for r in results):
212
- continue
213
-
214
  results.append({
215
  'title': title_elem.get_text(strip=True),
216
  'link': clean_link,
217
- 'snippet': snippet,
218
- 'source': 'duckduckgo'
219
  })
220
  except Exception as e:
221
  logging.warning(f"Error parsing search result: {e}")
@@ -379,7 +372,7 @@ async def generate_research_plan(query: str, session: aiohttp.ClientSession) ->
379
  cleaned = []
380
  for q in sub_questions:
381
  if isinstance(q, str) and q.strip():
382
- cleaned_q = re.sub(r'^[^a-zA-Z0-9]*|[^a-zA-Z0-9]*\$', '', q)
383
  if cleaned_q:
384
  cleaned.append(cleaned_q)
385
  return cleaned[:6] # Limit to 6 questions max
@@ -404,13 +397,10 @@ async def generate_research_plan(query: str, session: aiohttp.ClientSession) ->
404
  async def continuous_search(query: str, search_time: int = 120) -> List[dict]:
405
  """
406
  Perform continuous searching for better results within time constraints.
407
- Provides detailed feedback about the search process.
408
  """
409
  start_time = time.time()
410
  all_results = []
411
  seen_urls = set()
412
- seen_domains = defaultdict(int)
413
- search_iterations = 0
414
 
415
  # Generate multiple variations of the query
416
  query_variations = [
@@ -419,70 +409,49 @@ async def continuous_search(query: str, search_time: int = 120) -> List[dict]:
419
  f"{query} analysis",
420
  f"{query} review",
421
  f"{query} features",
422
- f"{query} vs alternatives",
423
- f"latest {query} news",
424
- f"{query} pros and cons"
425
  ]
426
 
427
  async with aiohttp.ClientSession() as session:
428
  while time.time() - start_time < search_time:
429
- search_iterations += 1
430
  # Shuffle the query variations to get diverse results
431
  random.shuffle(query_variations)
432
 
433
- # Use only a subset of queries each iteration
434
- queries_for_this_iteration = query_variations[:min(3, len(query_variations))]
435
-
436
- for q in queries_for_this_iteration:
437
  if time.time() - start_time >= search_time:
 
438
  break
439
 
 
440
  try:
441
- # Notify about current search
442
- logging.info(f"Searching for: '{q}'")
443
  results = await fetch_search_results(q, max_results=5)
444
-
445
- if results:
446
- for result in results:
447
- clean_link = clean_url(result['link'])
448
- domain = urlparse(clean_link).netloc if clean_link else ""
449
-
450
- # Skip if we've already seen this URL
451
- if clean_link in seen_urls:
452
- continue
453
-
454
- # Skip if we have too many results from this domain
455
- if domain and seen_domains[domain] >= 2: # Max 2 results per domain
456
- continue
457
-
458
  seen_urls.add(clean_link)
459
- if domain:
460
- seen_domains[domain] += 1
461
-
462
  result['link'] = clean_link
463
  all_results.append(result)
464
- logging.info(f"Found new result: {result['title']} ({domain})")
465
 
466
  # Small delay between searches
467
  await asyncio.sleep(1.0)
468
 
469
  # If we have enough unique results, we can stop early
470
- if len(all_results) >= MAX_SOURCES_TO_PROCESS * 2: # Get more than we need for selection
471
- logging.info(f"Found enough unique results ({len(all_results)})")
472
  break
473
-
474
  except Exception as e:
475
- logging.error(f"Error during continuous search: {e}")
476
  await asyncio.sleep(2.0) # Wait a bit before trying again
477
 
478
- # Break if we've done several iterations
479
- if search_iterations >= 4: # Limit to 4 search iterations
480
- break
481
 
482
  # Filter and sort results by relevance
483
  if all_results:
484
- # Simple relevance scoring
485
  def score_result(result):
 
486
  query_terms = set(query.lower().split())
487
  title = result['title'].lower()
488
  snippet = result['snippet'].lower()
@@ -495,11 +464,7 @@ async def continuous_search(query: str, search_time: int = 120) -> List[dict]:
495
  # Also consider length of snippet as a proxy for content richness
496
  snippet_length = len(result['snippet'].split())
497
 
498
- # Prefer results from diverse domains
499
- domain = urlparse(result['link']).netloc if result['link'] else ""
500
- domain_score = 10 if seen_domains[domain] <= 1 else 5 # Bonus for unique domains
501
-
502
- return matches * 10 + snippet_length + domain_score
503
 
504
  # Sort by score, descending
505
  all_results.sort(key=lambda x: score_result(x), reverse=True)
@@ -509,21 +474,22 @@ async def continuous_search(query: str, search_time: int = 120) -> List[dict]:
509
  async def filter_and_select_sources(results: List[dict]) -> List[dict]:
510
  """
511
  Filter and select the best sources from search results.
512
- Returns a tuple of (selected_sources, rejected_sources_with_reasons)
513
  """
514
  if not results:
515
- return [], []
 
 
 
516
 
517
  # Group by domain to ensure diversity
518
  domain_counts = defaultdict(int)
519
  domain_results = defaultdict(list)
520
  for result in results:
521
- domain = urlparse(result['link']).netloc if result['link'] else ""
522
  domain_counts[domain] += 1
523
  domain_results[domain].append(result)
524
 
525
  selected = []
526
- rejected = []
527
 
528
  # First pass: take the top result from each domain
529
  for domain, domain_res in domain_results.items():
@@ -531,17 +497,14 @@ async def filter_and_select_sources(results: List[dict]) -> List[dict]:
531
  break
532
  # Take the best result from this domain (sorted by position in original results)
533
  if domain_res:
534
- # Sort domain results by snippet length (proxy for content richness)
535
- domain_res.sort(key=lambda x: len(x['snippet'].split()), reverse=True)
536
  selected.append(domain_res[0])
 
537
 
538
  # Second pass: if we need more, take additional results from domains with good content
539
  if len(selected) < MAX_SOURCES_TO_PROCESS:
540
  # Calculate average snippet length as a proxy for content quality
541
  domain_quality = {}
542
  for domain, domain_res in domain_results.items():
543
- if not domain_res:
544
- continue
545
  avg_length = sum(len(r['snippet'].split()) for r in domain_res) / len(domain_res)
546
  domain_quality[domain] = avg_length
547
 
@@ -555,22 +518,22 @@ async def filter_and_select_sources(results: List[dict]) -> List[dict]:
555
  for res in domain_results[domain]:
556
  if res not in selected:
557
  selected.append(res)
 
558
  if len(selected) >= MAX_SOURCES_TO_PROCESS:
559
  break
560
 
561
- # Third pass: if still need more, add remaining high-snippet-length results
562
  if len(selected) < MAX_SOURCES_TO_PROCESS:
563
- # Sort all results by snippet length
564
- remaining_results = [res for res in results if res not in selected]
565
- remaining_results.sort(key=lambda x: len(x['snippet'].split()), reverse=True)
566
-
567
- while len(selected) < MAX_SOURCES_TO_PROCESS and remaining_results:
568
- selected.append(remaining_results.pop(0))
569
-
570
- # The remaining results are our rejected ones (for now we won't track reasons)
571
- rejected = [res for res in results if res not in selected]
572
 
573
- return selected, rejected
 
574
 
575
  async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncGenerator[str, None]:
576
  def format_sse(data: dict) -> str:
@@ -585,66 +548,31 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
585
  # Initialize the SSE stream with start message
586
  yield format_sse({
587
  "event": "status",
588
- "data": f"Starting deep research on '{query}'. Searching for comprehensive sources..."
589
  })
590
 
591
  async with aiohttp.ClientSession() as session:
592
  # Step 1: Generate research plan
593
  yield format_sse({"event": "status", "data": "Generating comprehensive research plan..."})
594
  sub_questions = await generate_research_plan(query, session)
595
- yield format_sse({
596
- "event": "plan",
597
- "data": {
598
- "sub_questions": sub_questions,
599
- "message": f"Research will focus on these {len(sub_questions)} key aspects"
600
- }
601
- })
602
 
603
  # Step 2: Continuous search for better results
604
  yield format_sse({
605
  "event": "status",
606
- "data": "Performing intelligent search for high-quality sources..."
607
  })
608
 
609
- # Show search variations we'll use
610
- query_variations = [
611
- query,
612
- f"{query} comparison",
613
- f"{query} analysis",
614
- f"{query} review",
615
- f"{query} features",
616
- f"{query} vs alternatives"
617
- ]
618
  yield format_sse({
619
  "event": "status",
620
- "data": f"Using {len(query_variations)} different search variations to find diverse sources"
621
  })
622
-
623
- search_results = await continuous_search(query, search_time)
624
-
625
- # Report on search results
626
- unique_domains = len({urlparse(r['link']).netloc for r in search_results if r['link']})
627
  yield format_sse({
628
- "event": "status",
629
- "data": f"Found {len(search_results)} potential sources from {unique_domains} unique domains"
630
  })
631
 
632
- # Display some of the top sources found
633
- if search_results:
634
- top_sources = search_results[:5] # Show top 5
635
- sources_list = []
636
- for i, source in enumerate(top_sources, 1):
637
- domain = urlparse(source['link']).netloc if source['link'] else "Unknown"
638
- sources_list.append(f"{i}. {source['title']} ({domain})")
639
-
640
- yield format_sse({
641
- "event": "sources_found",
642
- "data": {
643
- "top_sources": sources_list,
644
- "total_sources": len(search_results)
645
- }
646
- })
647
-
648
  if not search_results:
649
  yield format_sse({
650
  "event": "error",
@@ -653,13 +581,14 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
653
  return
654
 
655
  # Select the best sources
656
- selected_sources, rejected_sources = await filter_and_select_sources(search_results)
657
-
658
- # Report on selected sources
659
- unique_selected_domains = len({urlparse(r['link']).netloc for r in selected_sources if r['link']})
660
  yield format_sse({
661
  "event": "status",
662
- "data": f"Selected {len(selected_sources)} high-quality sources from {unique_selected_domains} unique domains for in-depth analysis"
 
 
 
 
663
  })
664
 
665
  if not selected_sources:
@@ -669,20 +598,6 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
669
  })
670
  return
671
 
672
- # Show selected sources
673
- selected_sources_list = []
674
- for i, source in enumerate(selected_sources, 1):
675
- domain = urlparse(source['link']).netloc if source['link'] else "Unknown"
676
- selected_sources_list.append(f"{i}. {source['title']} ({domain})")
677
-
678
- yield format_sse({
679
- "event": "sources_selected",
680
- "data": {
681
- "selected_sources": selected_sources_list,
682
- "message": "Proceeding with in-depth analysis of these sources"
683
- }
684
- })
685
-
686
  # Step 3: Process selected sources with concurrency control
687
  semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
688
  consolidated_context = ""
@@ -701,7 +616,7 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
701
  if elapsed > TOTAL_TIMEOUT * 0.8: # Leave 20% of time for synthesis
702
  yield format_sse({
703
  "event": "status",
704
- "data": f"Approaching time limit, stopping source processing after {i}/{len(selected_sources)} sources"
705
  })
706
  break
707
 
@@ -709,57 +624,30 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
709
  if i > 0:
710
  await asyncio.sleep(REQUEST_DELAY * 0.5)
711
 
712
- # Notify about processing this source
713
- domain = urlparse(source['link']).netloc if source['link'] else "Unknown"
714
- yield format_sse({
715
- "event": "processing_source",
716
- "data": {
717
- "index": i + 1,
718
- "total": len(selected_sources),
719
- "title": source['title'],
720
- "domain": domain,
721
- "url": source['link']
722
- }
723
- })
724
-
725
  task = asyncio.create_task(process_with_semaphore(source))
726
  processing_tasks.append(task)
727
 
 
 
 
 
 
 
728
  # Process completed tasks as they finish
729
  for future in asyncio.as_completed(processing_tasks):
730
  processed_sources += 1
731
  content, source_info = await future
732
-
733
  if content and content.strip():
734
- # Report successful processing
735
- domain = urlparse(source_info['link']).netloc if source_info['link'] else "Unknown"
736
- word_count = len(content.split())
737
-
738
- yield format_sse({
739
- "event": "source_processed",
740
- "data": {
741
- "title": source_info['title'],
742
- "domain": domain,
743
- "word_count": word_count,
744
- "status": "success"
745
- }
746
- })
747
-
748
- # Add to our consolidated context
749
  consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
750
  all_sources_used.append(source_info)
751
  successful_sources += 1
752
- total_tokens += word_count # Add to token count
753
- else:
754
- processing_errors += 1
755
  yield format_sse({
756
- "event": "source_processed",
757
- "data": {
758
- "title": source_info['title'],
759
- "status": "failed",
760
- "reason": "Could not extract sufficient content"
761
- }
762
  })
 
 
763
 
764
  if not consolidated_context.strip():
765
  yield format_sse({
@@ -768,26 +656,120 @@ async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncG
768
  })
769
  return
770
 
771
- # Report on processing results
772
- yield format_sse({
773
- "event": "status",
774
- "data": f"Successfully processed {successful_sources} of {processed_sources} sources, extracting approximately {total_tokens} words of content"
775
- })
776
-
777
  # Step 4: Synthesize comprehensive report
778
  time_remaining = max(0, TOTAL_TIMEOUT - (time.time() - start_time))
779
  yield format_sse({
780
  "event": "status",
781
- "data": f"Generating comprehensive analysis report from {successful_sources} sources..."
782
  })
783
 
784
  max_output_tokens = min(2000, int(time_remaining * 6)) # More aggressive token count
785
 
786
- report_prompt = f"""Compose a comprehensive analysis report on "{query}".
787
 
788
  Structure the report with these sections:
789
- 1. Executive Summary
790
  2. Key Features and Capabilities
791
- 3. Comparative Analysis
792
- 4. Strengths and Weaknesses
793
- 5. Current Trends and
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
  # --- Constants & Headers ---
35
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
36
  LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
37
+ MAX_SOURCES_TO_PROCESS = 10 # Increased to get more comprehensive results
38
+ MAX_CONCURRENT_REQUESTS = 5 # Increased for faster processing
39
+ SEARCH_TIMEOUT = 120 # 2 minutes for searching (adjustable)
40
+ TOTAL_TIMEOUT = 180 # 3 minutes total
41
+ REQUEST_DELAY = 1.0 # Shorter delay between requests
42
  USER_AGENT_ROTATION = True
43
 
44
  # Initialize fake user agent generator
 
62
 
63
  class DeepResearchRequest(BaseModel):
64
  query: str
65
+ search_time: int = 120 # Default to 2 minutes
66
 
67
  app = FastAPI(
68
  title="AI Deep Research API",
69
+ description="Provides comprehensive research reports from real web searches within 1-2 minutes.",
70
+ version="3.0.0"
71
  )
72
  app.add_middleware(
73
  CORSMiddleware,
 
79
 
80
  def extract_json_from_llm_response(text: str) -> Optional[list]:
81
  """Extract JSON array from LLM response text."""
82
+ match = re.search(r'\[.*\]', text, re.DOTALL)
 
 
83
  if match:
84
  try:
85
  return json.loads(match.group(0))
 
105
  if url.startswith('//duckduckgo.com/l/'):
106
  url = f"https:{url}" # Make it a proper URL
107
  try:
108
+ # Extract the real URL from DuckDuckGo's redirect
109
  parsed = urlparse(url)
110
  query_params = parsed.query
111
  if 'uddg=' in query_params:
112
+ # Extract the actual URL from the parameter
113
  match = re.search(r'uddg=([^&]+)', query_params)
114
  if match:
115
  encoded_url = match.group(1)
116
  try:
117
+ url = quote_plus(encoded_url) # This might need better decoding
118
+ # For simplicity, we'll just return the decoded URL
119
+ # In production, you'd want to properly URL-decode this
120
+ return encoded_url
 
 
121
  except:
122
  pass
123
  except:
 
193
  continue
194
 
195
  link = title_elem['href']
196
+ snippet_elem = result.select_one('.result__snippet')
197
 
198
  # Clean the URL
199
  clean_link = clean_url(link)
 
205
  # Get snippet if available
206
  snippet = snippet_elem.get_text(strip=True) if snippet_elem else ""
207
 
 
 
 
 
208
  results.append({
209
  'title': title_elem.get_text(strip=True),
210
  'link': clean_link,
211
+ 'snippet': snippet
 
212
  })
213
  except Exception as e:
214
  logging.warning(f"Error parsing search result: {e}")
 
372
  cleaned = []
373
  for q in sub_questions:
374
  if isinstance(q, str) and q.strip():
375
+ cleaned_q = re.sub(r'^[^a-zA-Z0-9]*|[^a-zA-Z0-9]*$', '', q)
376
  if cleaned_q:
377
  cleaned.append(cleaned_q)
378
  return cleaned[:6] # Limit to 6 questions max
 
397
  async def continuous_search(query: str, search_time: int = 120) -> List[dict]:
398
  """
399
  Perform continuous searching for better results within time constraints.
 
400
  """
401
  start_time = time.time()
402
  all_results = []
403
  seen_urls = set()
 
 
404
 
405
  # Generate multiple variations of the query
406
  query_variations = [
 
409
  f"{query} analysis",
410
  f"{query} review",
411
  f"{query} features",
412
+ f"{query} vs alternatives"
 
 
413
  ]
414
 
415
  async with aiohttp.ClientSession() as session:
416
  while time.time() - start_time < search_time:
 
417
  # Shuffle the query variations to get diverse results
418
  random.shuffle(query_variations)
419
 
420
+ for q in query_variations[:3]: # Only use first 3 variations in each iteration
 
 
 
421
  if time.time() - start_time >= search_time:
422
+ logger.info(f"Search timed out after {search_time} seconds. Found {len(all_results)} results.")
423
  break
424
 
425
+ logger.info(f"Searching for query variation: {q}")
426
  try:
 
 
427
  results = await fetch_search_results(q, max_results=5)
428
+ logger.info(f"Retrieved {len(results)} results for query '{q}'")
429
+ for result in results:
430
+ clean_link = clean_url(result['link'])
431
+ if clean_link and clean_link not in seen_urls:
 
 
 
 
 
 
 
 
 
 
432
  seen_urls.add(clean_link)
 
 
 
433
  result['link'] = clean_link
434
  all_results.append(result)
435
+ logger.info(f"Added new result: {result['title']} ({result['link']})")
436
 
437
  # Small delay between searches
438
  await asyncio.sleep(1.0)
439
 
440
  # If we have enough unique results, we can stop early
441
+ if len(all_results) >= MAX_SOURCES_TO_PROCESS * 1.5: # Get more than we need for selection
442
+ logger.info(f"Reached sufficient results: {len(all_results)}")
443
  break
 
444
  except Exception as e:
445
+ logger.error(f"Error during search for '{q}': {e}")
446
  await asyncio.sleep(2.0) # Wait a bit before trying again
447
 
448
+ logger.info(f"Completed continuous search. Total results: {len(all_results)}")
 
 
449
 
450
  # Filter and sort results by relevance
451
  if all_results:
452
+ # Simple relevance scoring (could be enhanced with more sophisticated methods)
453
  def score_result(result):
454
+ # Score based on how many query terms appear in title/snippet
455
  query_terms = set(query.lower().split())
456
  title = result['title'].lower()
457
  snippet = result['snippet'].lower()
 
464
  # Also consider length of snippet as a proxy for content richness
465
  snippet_length = len(result['snippet'].split())
466
 
467
+ return matches * 10 + snippet_length
 
 
 
 
468
 
469
  # Sort by score, descending
470
  all_results.sort(key=lambda x: score_result(x), reverse=True)
 
474
  async def filter_and_select_sources(results: List[dict]) -> List[dict]:
475
  """
476
  Filter and select the best sources from search results.
 
477
  """
478
  if not results:
479
+ logger.warning("No search results to filter.")
480
+ return []
481
+
482
+ logger.info(f"Filtering {len(results)} search results...")
483
 
484
  # Group by domain to ensure diversity
485
  domain_counts = defaultdict(int)
486
  domain_results = defaultdict(list)
487
  for result in results:
488
+ domain = urlparse(result['link']).netloc
489
  domain_counts[domain] += 1
490
  domain_results[domain].append(result)
491
 
492
  selected = []
 
493
 
494
  # First pass: take the top result from each domain
495
  for domain, domain_res in domain_results.items():
 
497
  break
498
  # Take the best result from this domain (sorted by position in original results)
499
  if domain_res:
 
 
500
  selected.append(domain_res[0])
501
+ logger.info(f"Selected top result from domain {domain}: {domain_res[0]['link']}")
502
 
503
  # Second pass: if we need more, take additional results from domains with good content
504
  if len(selected) < MAX_SOURCES_TO_PROCESS:
505
  # Calculate average snippet length as a proxy for content quality
506
  domain_quality = {}
507
  for domain, domain_res in domain_results.items():
 
 
508
  avg_length = sum(len(r['snippet'].split()) for r in domain_res) / len(domain_res)
509
  domain_quality[domain] = avg_length
510
 
 
518
  for res in domain_results[domain]:
519
  if res not in selected:
520
  selected.append(res)
521
+ logger.info(f"Added additional result from high-quality domain {domain}: {res['link']}")
522
  if len(selected) >= MAX_SOURCES_TO_PROCESS:
523
  break
524
 
525
+ # Final pass: if still need more, add remaining high-snippet-length results
526
  if len(selected) < MAX_SOURCES_TO_PROCESS:
527
+ all_results_sorted = sorted(results, key=lambda x: len(x['snippet'].split()), reverse=True)
528
+ for res in all_results_sorted:
529
+ if res not in selected:
530
+ selected.append(res)
531
+ logger.info(f"Added fallback high-snippet result: {res['link']}")
532
+ if len(selected) >= MAX_SOURCES_TO_PROCESS:
533
+ break
 
 
534
 
535
+ logger.info(f"Selected {len(selected)} sources after filtering.")
536
+ return selected[:MAX_SOURCES_TO_PROCESS]
537
 
538
  async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncGenerator[str, None]:
539
  def format_sse(data: dict) -> str:
 
548
  # Initialize the SSE stream with start message
549
  yield format_sse({
550
  "event": "status",
551
+ "data": f"Starting deep research on '{query}'. Search time limit: {search_time} seconds."
552
  })
553
 
554
  async with aiohttp.ClientSession() as session:
555
  # Step 1: Generate research plan
556
  yield format_sse({"event": "status", "data": "Generating comprehensive research plan..."})
557
  sub_questions = await generate_research_plan(query, session)
558
+ yield format_sse({"event": "plan", "data": sub_questions})
 
 
 
 
 
 
559
 
560
  # Step 2: Continuous search for better results
561
  yield format_sse({
562
  "event": "status",
563
+ "data": f"Performing continuous search for up to {search_time} seconds..."
564
  })
565
 
566
+ search_results = await continuous_search(query, search_time)
 
 
 
 
 
 
 
 
567
  yield format_sse({
568
  "event": "status",
569
+ "data": f"Found {len(search_results)} potential sources. Selecting the best ones..."
570
  })
 
 
 
 
 
571
  yield format_sse({
572
+ "event": "found_sources",
573
+ "data": search_results
574
  })
575
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
576
  if not search_results:
577
  yield format_sse({
578
  "event": "error",
 
581
  return
582
 
583
  # Select the best sources
584
+ selected_sources = await filter_and_select_sources(search_results)
 
 
 
585
  yield format_sse({
586
  "event": "status",
587
+ "data": f"Selected {len(selected_sources)} high-quality sources to process."
588
+ })
589
+ yield format_sse({
590
+ "event": "selected_sources",
591
+ "data": selected_sources
592
  })
593
 
594
  if not selected_sources:
 
598
  })
599
  return
600
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
601
  # Step 3: Process selected sources with concurrency control
602
  semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
603
  consolidated_context = ""
 
616
  if elapsed > TOTAL_TIMEOUT * 0.8: # Leave 20% of time for synthesis
617
  yield format_sse({
618
  "event": "status",
619
+ "data": f"Approaching time limit, stopping source processing at {i}/{len(selected_sources)}"
620
  })
621
  break
622
 
 
624
  if i > 0:
625
  await asyncio.sleep(REQUEST_DELAY * 0.5)
626
 
 
 
 
 
 
 
 
 
 
 
 
 
 
627
  task = asyncio.create_task(process_with_semaphore(source))
628
  processing_tasks.append(task)
629
 
630
+ if (i + 1) % 2 == 0 or (i + 1) == len(selected_sources):
631
+ yield format_sse({
632
+ "event": "status",
633
+ "data": f"Processed {min(i+1, len(selected_sources))}/{len(selected_sources)} sources..."
634
+ })
635
+
636
  # Process completed tasks as they finish
637
  for future in asyncio.as_completed(processing_tasks):
638
  processed_sources += 1
639
  content, source_info = await future
 
640
  if content and content.strip():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
641
  consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
642
  all_sources_used.append(source_info)
643
  successful_sources += 1
644
+ total_tokens += len(content.split()) # Rough token count
 
 
645
  yield format_sse({
646
+ "event": "processed_source",
647
+ "data": source_info
 
 
 
 
648
  })
649
+ else:
650
+ processing_errors += 1
651
 
652
  if not consolidated_context.strip():
653
  yield format_sse({
 
656
  })
657
  return
658
 
 
 
 
 
 
 
659
  # Step 4: Synthesize comprehensive report
660
  time_remaining = max(0, TOTAL_TIMEOUT - (time.time() - start_time))
661
  yield format_sse({
662
  "event": "status",
663
+ "data": f"Synthesizing comprehensive report from {successful_sources} sources..."
664
  })
665
 
666
  max_output_tokens = min(2000, int(time_remaining * 6)) # More aggressive token count
667
 
668
+ report_prompt = f"""Compose an in-depth analysis report on "{query}".
669
 
670
  Structure the report with these sections:
671
+ 1. Introduction and Background
672
  2. Key Features and Capabilities
673
+ 3. Comparative Analysis with Alternatives
674
+ 4. Current Developments and Trends
675
+ 5. Challenges and Limitations
676
+ 6. Future Outlook
677
+ 7. Conclusion and Recommendations
678
+
679
+ For each section, provide detailed analysis based on the source material.
680
+ Include specific examples and data points from the sources when available.
681
+ Compare and contrast different viewpoints from various sources.
682
+
683
+ Use markdown formatting for headings, subheadings, lists, and emphasis.
684
+ Cite sources where appropriate using inline citations like [1][2].
685
+
686
+ Available information from {successful_sources} sources:
687
+ {consolidated_context[:20000]} # Increased context size
688
+
689
+ Generate a comprehensive report of approximately {max_output_tokens//4} words.
690
+ Focus on providing deep insights, analysis, and actionable information.
691
+ """
692
+
693
+ report_payload = {
694
+ "model": LLM_MODEL,
695
+ "messages": [{"role": "user", "content": report_prompt}],
696
+ "stream": True,
697
+ "max_tokens": max_output_tokens
698
+ }
699
+
700
+ # Stream the report generation
701
+ async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
702
+ response.raise_for_status()
703
+ async for line in response.content:
704
+ if time.time() - start_time > TOTAL_TIMEOUT:
705
+ yield format_sse({
706
+ "event": "warning",
707
+ "data": "Time limit reached, ending report generation early."
708
+ })
709
+ break
710
+
711
+ line_str = line.decode('utf-8').strip()
712
+ if line_str.startswith('data:'):
713
+ line_str = line_str[5:].strip()
714
+ if line_str == "[DONE]":
715
+ break
716
+ try:
717
+ chunk = json.loads(line_str)
718
+ choices = chunk.get("choices")
719
+ if choices and isinstance(choices, list) and len(choices) > 0:
720
+ content = choices[0].get("delta", {}).get("content")
721
+ if content:
722
+ yield format_sse({"event": "chunk", "data": content})
723
+ except Exception as e:
724
+ logging.warning(f"Error processing stream chunk: {e}")
725
+ continue
726
+
727
+ # Final status update
728
+ duration = time.time() - start_time
729
+ stats = {
730
+ "total_time_seconds": round(duration),
731
+ "sources_processed": processed_sources,
732
+ "sources_successful": successful_sources,
733
+ "estimated_tokens": total_tokens,
734
+ "sources_used": len(all_sources_used)
735
+ }
736
+ yield format_sse({
737
+ "event": "status",
738
+ "data": f"Research completed successfully in {duration:.1f} seconds."
739
+ })
740
+ yield format_sse({"event": "stats", "data": stats})
741
+ yield format_sse({"event": "sources", "data": all_sources_used})
742
+
743
+ except asyncio.TimeoutError:
744
+ yield format_sse({
745
+ "event": "error",
746
+ "data": f"Research process timed out after {TOTAL_TIMEOUT} seconds."
747
+ })
748
+ except Exception as e:
749
+ logging.error(f"Critical error in research process: {e}", exc_info=True)
750
+ yield format_sse({
751
+ "event": "error",
752
+ "data": f"An unexpected error occurred: {str(e)[:200]}"
753
+ })
754
+ finally:
755
+ duration = time.time() - start_time
756
+ yield format_sse({
757
+ "event": "complete",
758
+ "data": f"Research process finished after {duration:.1f} seconds."
759
+ })
760
+
761
+ @app.post("/deep-research", response_class=StreamingResponse)
762
+ async def deep_research_endpoint(request: DeepResearchRequest):
763
+ """Endpoint for deep research that streams SSE responses."""
764
+ if not request.query or len(request.query.strip()) < 3:
765
+ raise HTTPException(status_code=400, detail="Query must be at least 3 characters long")
766
+
767
+ search_time = min(max(request.search_time, 60), 180) # Clamp between 60 and 180 seconds
768
+ return StreamingResponse(
769
+ run_deep_research_stream(request.query.strip(), search_time),
770
+ media_type="text/event-stream"
771
+ )
772
+
773
+ if __name__ == "__main__":
774
+ import uvicorn
775
+ uvicorn.run(app, host="0.0.0.0", port=8000)