rkihacker commited on
Commit
ad8dd04
·
verified ·
1 Parent(s): 58de22e

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +46 -23
main.py CHANGED
@@ -6,7 +6,7 @@ import random
6
  import re
7
  import time
8
  from typing import AsyncGenerator, Optional, Tuple, List, Dict
9
- from urllib.parse import quote_plus, urlparse
10
  from fastapi import FastAPI, HTTPException
11
  from fastapi.responses import StreamingResponse
12
  from fastapi.middleware.cors import CORSMiddleware
@@ -35,13 +35,13 @@ else:
35
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
36
  LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
37
  MAX_SOURCES_TO_PROCESS = 10
38
- MAX_CONCURRENT_REQUESTS = 3 # Reduced to avoid rate-limiting
39
- SEARCH_TIMEOUT = 90 # Reduced to ensure time for processing
40
  TOTAL_TIMEOUT = 180
41
- REQUEST_DELAY = 2.0 # Increased delay to avoid rate-limiting
 
 
42
  USER_AGENT_ROTATION = True
43
- RETRY_ATTEMPTS = 3 # Number of retries for failed search requests
44
- RETRY_DELAY = 3.0 # Delay between retries
45
 
46
  # Initialize fake user agent generator
47
  try:
@@ -64,7 +64,7 @@ LLM_HEADERS = {
64
 
65
  class DeepResearchRequest(BaseModel):
66
  query: str
67
- search_time: int = 90 # Default to 90 seconds
68
 
69
  app = FastAPI(
70
  title="AI Deep Research API",
@@ -103,7 +103,6 @@ def clean_url(url: str) -> str:
103
  if not url:
104
  return ""
105
 
106
- # Handle DuckDuckGo redirect URLs
107
  if url.startswith('//duckduckgo.com/l/'):
108
  url = f"https:{url}"
109
  try:
@@ -112,17 +111,10 @@ def clean_url(url: str) -> str:
112
  if 'uddg=' in query_params:
113
  match = re.search(r'uddg=([^&]+)', query_params)
114
  if match:
115
- encoded_url = match.group(1)
116
- try:
117
- # Properly decode the URL
118
- from urllib.parse import unquote
119
- return unquote(encoded_url)
120
- except:
121
- pass
122
  except:
123
  pass
124
 
125
- # Ensure URL has proper scheme
126
  if url.startswith('//'):
127
  url = 'https:' + url
128
  elif not url.startswith(('http://', 'https://')):
@@ -157,7 +149,7 @@ async def check_robots_txt(url: str) -> bool:
157
 
158
  async def fetch_search_results(query: str, max_results: int = 5) -> List[dict]:
159
  """
160
- Perform a real search using DuckDuckGo's HTML interface with retry logic.
161
  """
162
  headers = {
163
  "User-Agent": await get_real_user_agent(),
@@ -218,7 +210,7 @@ async def fetch_search_results(query: str, max_results: int = 5) -> List[dict]:
218
  logging.error(f"Search attempt {attempt + 1} failed for '{query}': {e}")
219
  if attempt < RETRY_ATTEMPTS - 1:
220
  await asyncio.sleep(RETRY_DELAY)
221
- continue
222
  logging.error(f"All {RETRY_ATTEMPTS} search attempts failed for '{query}'")
223
  return []
224
 
@@ -376,6 +368,10 @@ async def generate_research_plan(query: str, session: aiohttp.ClientSession) ->
376
  ]
377
  except Exception as e:
378
  logging.error(f"Failed to generate research plan: {e}")
 
 
 
 
379
  return [
380
  f"What is {query}?",
381
  f"What are the key aspects of {query}?",
@@ -390,6 +386,7 @@ async def continuous_search(query: str, search_time: int = 90) -> List[dict]:
390
  start_time = time.time()
391
  all_results = []
392
  seen_urls = set()
 
393
 
394
  query_variations = [
395
  query,
@@ -405,7 +402,7 @@ async def continuous_search(query: str, search_time: int = 90) -> List[dict]:
405
  while time.time() - start_time < search_time:
406
  iteration += 1
407
  random.shuffle(query_variations)
408
- for q in query_variations[:3]:
409
  if time.time() - start_time >= search_time:
410
  logger.info(f"Search timed out after {search_time} seconds. Found {len(all_results)} results.")
411
  break
@@ -421,6 +418,7 @@ async def continuous_search(query: str, search_time: int = 90) -> List[dict]:
421
  result['link'] = clean_link
422
  all_results.append(result)
423
  logger.info(f"Added new result: {result['title']} ({result['link']})")
 
424
 
425
  await asyncio.sleep(REQUEST_DELAY)
426
  if len(all_results) >= MAX_SOURCES_TO_PROCESS * 1.5:
@@ -435,6 +433,11 @@ async def continuous_search(query: str, search_time: int = 90) -> List[dict]:
435
 
436
  logger.info(f"Completed continuous search. Total results: {len(all_results)}")
437
 
 
 
 
 
 
438
  if all_results:
439
  def score_result(result):
440
  query_terms = set(query.lower().split())
@@ -606,6 +609,10 @@ async def run_deep_research_stream(query: str, search_time: int = 90) -> AsyncGe
606
  })
607
  else:
608
  processing_errors += 1
 
 
 
 
609
 
610
  if not consolidated_context.strip():
611
  yield format_sse({
@@ -655,7 +662,14 @@ async def run_deep_research_stream(query: str, search_time: int = 90) -> AsyncGe
655
  }
656
 
657
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
658
- response.raise_for_status()
 
 
 
 
 
 
 
659
  async for line in response.content:
660
  if time.time() - start_time > TOTAL_TIMEOUT:
661
  yield format_sse({
@@ -668,6 +682,8 @@ async def run_deep_research_stream(query: str, search_time: int = 90) -> AsyncGe
668
  if line_str.startswith('data:'):
669
  line_str = line_str[5:].strip()
670
  if line_str == "[DONE]":
 
 
671
  break
672
  try:
673
  chunk = json.loads(line_str)
@@ -675,11 +691,17 @@ async def run_deep_research_stream(query: str, search_time: int = 90) -> AsyncGe
675
  if choices and isinstance(choices, list) and len(choices) > 0:
676
  content = choices[0].get("delta", {}).get("content")
677
  if content:
678
- yield format_sse({"event": "chunk", "data": content})
 
 
 
679
  except Exception as e:
680
  logging.warning(f"Error processing stream chunk: {e}")
681
  continue
682
 
 
 
 
683
  duration = time.time() - start_time
684
  stats = {
685
  "total_time_seconds": round(duration),
@@ -722,9 +744,10 @@ async def deep_research_endpoint(request: DeepResearchRequest):
722
  search_time = min(max(request.search_time, 60), 180)
723
  return StreamingResponse(
724
  run_deep_research_stream(request.query.strip(), search_time),
725
- media_type="text/event-stream"
 
726
  )
727
 
728
  if __name__ == "__main__":
729
  import uvicorn
730
- uvicorn.run(app, host="0.0.0.0", port=8000)
 
6
  import re
7
  import time
8
  from typing import AsyncGenerator, Optional, Tuple, List, Dict
9
+ from urllib.parse import quote_plus, urlparse, unquote
10
  from fastapi import FastAPI, HTTPException
11
  from fastapi.responses import StreamingResponse
12
  from fastapi.middleware.cors import CORSMiddleware
 
35
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
36
  LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
37
  MAX_SOURCES_TO_PROCESS = 10
38
+ MAX_CONCURRENT_REQUESTS = 2 # Further reduced to avoid rate-limiting
39
+ SEARCH_TIMEOUT = 90
40
  TOTAL_TIMEOUT = 180
41
+ REQUEST_DELAY = 3.0 # Increased to avoid rate-limiting
42
+ RETRY_ATTEMPTS = 5 # Increased retry attempts
43
+ RETRY_DELAY = 5.0 # Increased delay between retries
44
  USER_AGENT_ROTATION = True
 
 
45
 
46
  # Initialize fake user agent generator
47
  try:
 
64
 
65
  class DeepResearchRequest(BaseModel):
66
  query: str
67
+ search_time: int = 90
68
 
69
  app = FastAPI(
70
  title="AI Deep Research API",
 
103
  if not url:
104
  return ""
105
 
 
106
  if url.startswith('//duckduckgo.com/l/'):
107
  url = f"https:{url}"
108
  try:
 
111
  if 'uddg=' in query_params:
112
  match = re.search(r'uddg=([^&]+)', query_params)
113
  if match:
114
+ return unquote(match.group(1))
 
 
 
 
 
 
115
  except:
116
  pass
117
 
 
118
  if url.startswith('//'):
119
  url = 'https:' + url
120
  elif not url.startswith(('http://', 'https://')):
 
149
 
150
  async def fetch_search_results(query: str, max_results: int = 5) -> List[dict]:
151
  """
152
+ Perform a real search using DuckDuckGo's HTML interface with robust retry logic.
153
  """
154
  headers = {
155
  "User-Agent": await get_real_user_agent(),
 
210
  logging.error(f"Search attempt {attempt + 1} failed for '{query}': {e}")
211
  if attempt < RETRY_ATTEMPTS - 1:
212
  await asyncio.sleep(RETRY_DELAY)
213
+ continue
214
  logging.error(f"All {RETRY_ATTEMPTS} search attempts failed for '{query}'")
215
  return []
216
 
 
368
  ]
369
  except Exception as e:
370
  logging.error(f"Failed to generate research plan: {e}")
371
+ yield format_sse({
372
+ "event": "error",
373
+ "data": f"Failed to generate research plan: {str(e)[:200]}"
374
+ })
375
  return [
376
  f"What is {query}?",
377
  f"What are the key aspects of {query}?",
 
386
  start_time = time.time()
387
  all_results = []
388
  seen_urls = set()
389
+ fallback_results = []
390
 
391
  query_variations = [
392
  query,
 
402
  while time.time() - start_time < search_time:
403
  iteration += 1
404
  random.shuffle(query_variations)
405
+ for q in query_variations:
406
  if time.time() - start_time >= search_time:
407
  logger.info(f"Search timed out after {search_time} seconds. Found {len(all_results)} results.")
408
  break
 
418
  result['link'] = clean_link
419
  all_results.append(result)
420
  logger.info(f"Added new result: {result['title']} ({result['link']})")
421
+ fallback_results.append(result) # Store for fallback
422
 
423
  await asyncio.sleep(REQUEST_DELAY)
424
  if len(all_results) >= MAX_SOURCES_TO_PROCESS * 1.5:
 
433
 
434
  logger.info(f"Completed continuous search. Total results: {len(all_results)}")
435
 
436
+ # Fallback if insufficient results
437
+ if len(all_results) < MAX_SOURCES_TO_PROCESS:
438
+ logger.warning(f"Insufficient results ({len(all_results)}), using fallback results")
439
+ all_results.extend(fallback_results[:MAX_SOURCES_TO_PROCESS - len(all_results)])
440
+
441
  if all_results:
442
  def score_result(result):
443
  query_terms = set(query.lower().split())
 
609
  })
610
  else:
611
  processing_errors += 1
612
+ yield format_sse({
613
+ "event": "warning",
614
+ "data": f"Failed to extract content from {source_info['link']}"
615
+ })
616
 
617
  if not consolidated_context.strip():
618
  yield format_sse({
 
662
  }
663
 
664
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
665
+ if response.status != 200:
666
+ yield format_sse({
667
+ "event": "error",
668
+ "data": f"Failed to generate report: HTTP {response.status}"
669
+ })
670
+ return
671
+
672
+ buffer = ""
673
  async for line in response.content:
674
  if time.time() - start_time > TOTAL_TIMEOUT:
675
  yield format_sse({
 
682
  if line_str.startswith('data:'):
683
  line_str = line_str[5:].strip()
684
  if line_str == "[DONE]":
685
+ if buffer:
686
+ yield format_sse({"event": "chunk", "data": buffer})
687
  break
688
  try:
689
  chunk = json.loads(line_str)
 
691
  if choices and isinstance(choices, list) and len(choices) > 0:
692
  content = choices[0].get("delta", {}).get("content")
693
  if content:
694
+ buffer += content
695
+ if len(buffer) > 100: # Flush buffer periodically
696
+ yield format_sse({"event": "chunk", "data": buffer})
697
+ buffer = ""
698
  except Exception as e:
699
  logging.warning(f"Error processing stream chunk: {e}")
700
  continue
701
 
702
+ if buffer:
703
+ yield format_sse({"event": "chunk", "data": buffer})
704
+
705
  duration = time.time() - start_time
706
  stats = {
707
  "total_time_seconds": round(duration),
 
744
  search_time = min(max(request.search_time, 60), 180)
745
  return StreamingResponse(
746
  run_deep_research_stream(request.query.strip(), search_time),
747
+ media_type="text/event-stream",
748
+ headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
749
  )
750
 
751
  if __name__ == "__main__":
752
  import uvicorn
753
+ uvicorn.run(app, host="0.0.0.0", port=8000)