rkihacker commited on
Commit
bc2abd9
·
verified ·
1 Parent(s): 46a015f

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +65 -57
main.py CHANGED
@@ -31,74 +31,89 @@ SNAPZION_API_URL = "https://search.snapzion.com/get-snippets"
31
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
32
  LLM_MODEL = "gpt-4.1-mini"
33
 
34
- # Automatic Context Sizing based on Tokens
35
- TARGET_TOKEN_LIMIT = 28000 # Safe limit for models with ~32k context windows
36
  ESTIMATED_CHARS_PER_TOKEN = 4
37
  MAX_CONTEXT_CHAR_LENGTH = TARGET_TOKEN_LIMIT * ESTIMATED_CHARS_PER_TOKEN
38
 
39
- # Real Browser User Agents for Rotation
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
  USER_AGENTS = [
41
  "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
42
  "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
43
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0",
44
- "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
45
- "Mozilla/5.0 (iPhone; CPU iPhone OS 17_5_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Mobile/15E148 Safari/604.1"
46
  ]
47
 
48
- # Headers
49
- SNAPZION_HEADERS = {'Content-Type': 'application/json'}
50
  LLM_HEADERS = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json", "Accept": "application/json"}
51
 
52
- # --- Pydantic Models & Helper Functions ---
53
  class DeepResearchRequest(BaseModel):
54
  query: str
55
 
56
  def extract_json_from_llm_response(text: str) -> Optional[list]:
57
  match = re.search(r'\[.*\]', text, re.DOTALL)
58
  if match:
59
- json_str = match.group(0)
60
- try: return json.loads(json_str)
61
  except json.JSONDecodeError: return None
62
  return None
63
 
64
- # --- FastAPI App ---
65
  app = FastAPI(
66
  title="AI Deep Research API",
67
  description="Provides robust, streaming deep research completions.",
68
- version="3.0.0" # Major version bump for robustness overhaul
69
  )
70
 
71
  # --- Core Service Functions ---
72
  async def call_snapzion_search(session: aiohttp.ClientSession, query: str) -> List[dict]:
 
73
  try:
74
- async with session.post(SNAPZION_API_URL, headers=SNAPZION_HEADERS, json={"query": query}, timeout=15) as response:
75
- response.raise_for_status(); data = await response.json()
76
- return data.get("organic_results", [])
 
 
 
77
  except Exception as e:
78
  logger.error(f"Snapzion search failed for query '{query}': {e}"); return []
79
 
80
  async def scrape_url(session: aiohttp.ClientSession, url: str) -> str:
81
- if url.lower().endswith('.pdf'): return "Error: PDF content cannot be scraped."
82
  try:
83
- # Rotate user agents for each request
84
  headers = {'User-Agent': random.choice(USER_AGENTS)}
85
  async with session.get(url, headers=headers, timeout=10, ssl=False) as response:
86
- if response.status != 200: return f"Error: HTTP status {response.status}"
87
- html = await response.text()
88
- soup = BeautifulSoup(html, "html.parser")
89
- for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): tag.decompose()
90
- return " ".join(soup.stripped_strings)
91
  except Exception as e:
92
- logger.warning(f"Scraping failed for {url}: {e}"); return f"Error: {e}"
 
 
 
 
 
93
 
94
  async def research_and_process_source(session: aiohttp.ClientSession, source: dict) -> Tuple[str, dict]:
95
- """Scrapes a single source and falls back to its snippet if scraping fails."""
96
- scraped_content = await scrape_url(session, source['link'])
97
- if scraped_content.startswith("Error:"):
98
- # SNIPPET FALLBACK LOGIC
99
- logger.warning(f"Scraping failed for {source['link']}. Falling back to snippet.")
100
- return source['snippet'], source
101
- return scraped_content, source
102
 
103
  # --- Streaming Deep Research Logic ---
104
  async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
@@ -107,36 +122,32 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
107
  async with aiohttp.ClientSession() as session:
108
  # Step 1: Generate Research Plan
109
  yield format_sse({"event": "status", "data": "Generating research plan..."})
110
- plan_prompt = {"model": LLM_MODEL, "messages": [{"role": "user", "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array, without markdown. Example: [\"Question 1?\"]"}]}
111
  try:
112
- async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=20) as response:
113
  response.raise_for_status(); result = await response.json()
114
  sub_questions = result if isinstance(result, list) else extract_json_from_llm_response(result['choices'][0]['message']['content'])
115
- if not isinstance(sub_questions, list): raise ValueError(f"Could not extract a valid list from LLM response: {result}")
116
  except Exception as e:
117
- logger.error(f"Failed to generate research plan: {e}")
118
  yield format_sse({"event": "error", "data": f"Could not generate research plan. Reason: {e}"}); return
119
 
120
  yield format_sse({"event": "plan", "data": sub_questions})
121
 
122
  # Step 2: Conduct Research in Parallel
123
- yield format_sse({"event": "status", "data": f"Searching for sources for {len(sub_questions)} topics..."})
124
  search_tasks = [call_snapzion_search(session, sq) for sq in sub_questions]
125
  all_search_results = await asyncio.gather(*search_tasks)
126
 
127
- # Deduplicate sources by link to avoid scraping the same page multiple times
128
- unique_sources = list({source['link']: source for results in all_search_results for source in results}.values())
129
 
130
  if not unique_sources:
131
- yield format_sse({"event": "error", "data": "Search did not return any usable sources."}); return
132
 
133
- yield format_sse({"event": "status", "data": f"Found {len(unique_sources)} unique sources. Scraping and processing..."})
134
 
135
- # Process all unique sources concurrently with snippet fallback
136
  processing_tasks = [research_and_process_source(session, source) for source in unique_sources]
137
 
138
- consolidated_context = ""
139
- all_sources_used = []
140
  successful_scrapes = 0
141
 
142
  for task in asyncio.as_completed(processing_tasks):
@@ -144,10 +155,9 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
144
  if content:
145
  consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
146
  all_sources_used.append(source_info)
147
- if not content == source_info['snippet']: # Count as success only if not a snippet
148
- successful_scrapes += 1
149
 
150
- logger.info(f"Context gathering complete. Successfully scraped {successful_scrapes}/{len(unique_sources)} pages. Used {len(all_sources_used)} total sources (including snippets).")
151
 
152
  if not consolidated_context.strip():
153
  yield format_sse({"event": "error", "data": "Failed to gather any research context from scraping or snippets."}); return
@@ -155,7 +165,6 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
155
  # Step 3: Synthesize Final Report
156
  yield format_sse({"event": "status", "data": "Synthesizing final report..."})
157
  if len(consolidated_context) > MAX_CONTEXT_CHAR_LENGTH:
158
- logger.warning(f"Context truncated from {len(consolidated_context)} to {MAX_CONTEXT_CHAR_LENGTH} chars.")
159
  consolidated_context = consolidated_context[:MAX_CONTEXT_CHAR_LENGTH]
160
 
161
  report_prompt = f'Synthesize the provided context into a comprehensive, well-structured report on "{query}". Use markdown. Context:\n{consolidated_context}'
@@ -164,19 +173,18 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
164
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
165
  response.raise_for_status()
166
  async for line in response.content:
167
- if line.strip():
168
- line_str = line.decode('utf-8').strip()
169
- if line_str.startswith('data:'): line_str = line_str[5:].strip()
170
- if line_str == "[DONE]": break
171
- try:
172
- chunk = json.loads(line_str)
173
- content = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
174
- if content: yield format_sse({"event": "chunk", "data": content})
175
- except json.JSONDecodeError: continue
176
 
177
  yield format_sse({"event": "sources", "data": all_sources_used})
178
  except Exception as e:
179
- logger.error(f"A critical error occurred in the main research stream: {e}", exc_info=True)
180
  yield format_sse({"event": "error", "data": str(e)})
181
  finally:
182
  yield format_sse({"event": "done", "data": "Deep research complete."})
 
31
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
32
  LLM_MODEL = "gpt-4.1-mini"
33
 
34
+ # Automatic Context Sizing (No more fixed limits)
35
+ TARGET_TOKEN_LIMIT = 28000
36
  ESTIMATED_CHARS_PER_TOKEN = 4
37
  MAX_CONTEXT_CHAR_LENGTH = TARGET_TOKEN_LIMIT * ESTIMATED_CHARS_PER_TOKEN
38
 
39
+ # ***** THE CRITICAL FIX: Full, legitimate headers for the Snapzion API call *****
40
+ SNAPZION_HEADERS = {
41
+ 'accept': '*/*',
42
+ 'accept-language': 'en-US,en;q=0.9',
43
+ 'content-type': 'application/json',
44
+ 'origin': 'https://search.snapzion.com',
45
+ 'priority': 'u=1, i',
46
+ 'referer': 'https://search.snapzion.com/docs',
47
+ 'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"',
48
+ 'sec-ch-ua-mobile': '?0',
49
+ 'sec-ch-ua-platform': '"Windows"',
50
+ 'sec-fetch-dest': 'empty',
51
+ 'sec-fetch-mode': 'cors',
52
+ 'sec-fetch-site': 'same-origin',
53
+ 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36',
54
+ }
55
+
56
+ # Real Browser User Agents for SCRAPING ROTATION
57
  USER_AGENTS = [
58
  "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
59
  "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
60
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0"
 
 
61
  ]
62
 
 
 
63
  LLM_HEADERS = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json", "Accept": "application/json"}
64
 
 
65
  class DeepResearchRequest(BaseModel):
66
  query: str
67
 
68
  def extract_json_from_llm_response(text: str) -> Optional[list]:
69
  match = re.search(r'\[.*\]', text, re.DOTALL)
70
  if match:
71
+ try: return json.loads(match.group(0))
 
72
  except json.JSONDecodeError: return None
73
  return None
74
 
 
75
  app = FastAPI(
76
  title="AI Deep Research API",
77
  description="Provides robust, streaming deep research completions.",
78
+ version="4.0.0" # Final Production Version
79
  )
80
 
81
  # --- Core Service Functions ---
82
  async def call_snapzion_search(session: aiohttp.ClientSession, query: str) -> List[dict]:
83
+ logger.info(f"Searching Snapzion for: '{query}'")
84
  try:
85
+ async with session.post(SNAPZION_API_URL, headers=SNAPZION_HEADERS, json={"query": query}, timeout=20) as response:
86
+ response.raise_for_status()
87
+ data = await response.json()
88
+ results = data.get("organic_results", [])
89
+ logger.info(f"Found {len(results)} sources for: '{query}'")
90
+ return results
91
  except Exception as e:
92
  logger.error(f"Snapzion search failed for query '{query}': {e}"); return []
93
 
94
  async def scrape_url(session: aiohttp.ClientSession, url: str) -> str:
95
+ if url.lower().endswith('.pdf'): return "Error: PDF"
96
  try:
 
97
  headers = {'User-Agent': random.choice(USER_AGENTS)}
98
  async with session.get(url, headers=headers, timeout=10, ssl=False) as response:
99
+ if response.status != 200: return f"Error: HTTP {response.status}"
100
+ return await response.text() # Return full HTML for parsing
 
 
 
101
  except Exception as e:
102
+ return f"Error: {e}"
103
+
104
+ def parse_html(html: str) -> str:
105
+ soup = BeautifulSoup(html, "html.parser")
106
+ for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): tag.decompose()
107
+ return " ".join(soup.stripped_strings)
108
 
109
  async def research_and_process_source(session: aiohttp.ClientSession, source: dict) -> Tuple[str, dict]:
110
+ html_or_error = await scrape_url(session, source['link'])
111
+ if html_or_error.startswith("Error:"):
112
+ logger.warning(f"Scraping failed for {source['link']} ({html_or_error}). Falling back to snippet.")
113
+ return source.get('snippet', ''), source
114
+
115
+ content = parse_html(html_or_error)
116
+ return content, source
117
 
118
  # --- Streaming Deep Research Logic ---
119
  async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
 
122
  async with aiohttp.ClientSession() as session:
123
  # Step 1: Generate Research Plan
124
  yield format_sse({"event": "status", "data": "Generating research plan..."})
125
+ plan_prompt = {"model": LLM_MODEL, "messages": [{"role": "user", "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array. Example: [\"Question 1?\"]"}]}
126
  try:
127
+ async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=25) as response:
128
  response.raise_for_status(); result = await response.json()
129
  sub_questions = result if isinstance(result, list) else extract_json_from_llm_response(result['choices'][0]['message']['content'])
130
+ if not isinstance(sub_questions, list): raise ValueError(f"Invalid plan from LLM: {result}")
131
  except Exception as e:
 
132
  yield format_sse({"event": "error", "data": f"Could not generate research plan. Reason: {e}"}); return
133
 
134
  yield format_sse({"event": "plan", "data": sub_questions})
135
 
136
  # Step 2: Conduct Research in Parallel
137
+ yield format_sse({"event": "status", "data": f"Searching sources for {len(sub_questions)} topics..."})
138
  search_tasks = [call_snapzion_search(session, sq) for sq in sub_questions]
139
  all_search_results = await asyncio.gather(*search_tasks)
140
 
141
+ unique_sources = list({source['link']: source for results in all_search_results for source in results if 'link' in source and 'snippet' in source}.values())
 
142
 
143
  if not unique_sources:
144
+ yield format_sse({"event": "error", "data": "All search queries returned zero usable sources. The search provider might be blocking requests or the topic is too obscure."}); return
145
 
146
+ yield format_sse({"event": "status", "data": f"Found {len(unique_sources)} unique sources. Processing..."})
147
 
 
148
  processing_tasks = [research_and_process_source(session, source) for source in unique_sources]
149
 
150
+ consolidated_context, all_sources_used = "", []
 
151
  successful_scrapes = 0
152
 
153
  for task in asyncio.as_completed(processing_tasks):
 
155
  if content:
156
  consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
157
  all_sources_used.append(source_info)
158
+ if not content == source_info.get('snippet'): successful_scrapes += 1
 
159
 
160
+ logger.info(f"Context complete. Scraped {successful_scrapes}/{len(unique_sources)} pages. Used {len(all_sources_used)} total sources (with snippet fallbacks).")
161
 
162
  if not consolidated_context.strip():
163
  yield format_sse({"event": "error", "data": "Failed to gather any research context from scraping or snippets."}); return
 
165
  # Step 3: Synthesize Final Report
166
  yield format_sse({"event": "status", "data": "Synthesizing final report..."})
167
  if len(consolidated_context) > MAX_CONTEXT_CHAR_LENGTH:
 
168
  consolidated_context = consolidated_context[:MAX_CONTEXT_CHAR_LENGTH]
169
 
170
  report_prompt = f'Synthesize the provided context into a comprehensive, well-structured report on "{query}". Use markdown. Context:\n{consolidated_context}'
 
173
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
174
  response.raise_for_status()
175
  async for line in response.content:
176
+ line_str = line.decode('utf-8').strip()
177
+ if line_str.startswith('data:'): line_str = line_str[5:].strip()
178
+ if line_str == "[DONE]": break
179
+ try:
180
+ chunk = json.loads(line_str)
181
+ content = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
182
+ if content: yield format_sse({"event": "chunk", "data": content})
183
+ except json.JSONDecodeError: continue
 
184
 
185
  yield format_sse({"event": "sources", "data": all_sources_used})
186
  except Exception as e:
187
+ logger.error(f"A critical error occurred: {e}", exc_info=True)
188
  yield format_sse({"event": "error", "data": str(e)})
189
  finally:
190
  yield format_sse({"event": "done", "data": "Deep research complete."})