rkihacker commited on
Commit
132c134
·
verified ·
1 Parent(s): 1e679fd

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +71 -72
main.py CHANGED
@@ -1,8 +1,30 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
  import asyncio
3
  import json
4
  import logging
5
- from typing import AsyncGenerator
 
6
 
7
  from fastapi import FastAPI
8
  from fastapi.responses import StreamingResponse
@@ -21,47 +43,55 @@ LLM_API_KEY = os.getenv("LLM_API_KEY")
21
  if not LLM_API_KEY:
22
  raise RuntimeError("LLM_API_KEY must be set in a .env file.")
23
  else:
24
- logger.info(f"LLM API Key loaded successfully.")
25
 
26
- # ***** CHANGE 1: Update constants to match your new API provider *****
27
  SNAPZION_API_URL = "https://search.snapzion.com/get-snippets"
28
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
29
  LLM_MODEL = "gpt-4.1-mini"
30
  MAX_CONTEXT_CHAR_LENGTH = 120000
31
 
32
- # Headers for external services
33
  SNAPZION_HEADERS = { 'Content-Type': 'application/json', 'User-Agent': 'AI-Deep-Research-Agent/1.0' }
34
  SCRAPING_HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36' }
 
35
 
36
- # ***** CHANGE 2: Create more standard and robust headers for the LLM call *****
37
- LLM_HEADERS = {
38
- "Authorization": f"Bearer {LLM_API_KEY}",
39
- "Content-Type": "application/json",
40
- "Accept": "application/json", # Explicitly request a JSON response
41
- "User-Agent": "AI-Deep-Research-Client/2.3"
42
- }
43
-
44
- # --- Pydantic Models ---
45
  class DeepResearchRequest(BaseModel):
46
  query: str
47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
  # --- FastAPI App ---
49
  app = FastAPI(
50
  title="AI Deep Research API",
51
  description="Provides streaming deep research completions.",
52
- version="2.3.0" # Version bump for advanced error handling
53
  )
54
 
55
  # --- Core Service Functions (Unchanged) ---
56
  async def call_snapzion_search(session: aiohttp.ClientSession, query: str) -> list:
57
  try:
58
  async with session.post(SNAPZION_API_URL, headers=SNAPZION_HEADERS, json={"query": query}, timeout=15) as response:
59
- response.raise_for_status()
60
- data = await response.json()
61
  return data.get("organic_results", [])
62
  except Exception as e:
63
- logger.error(f"Snapzion search failed for query '{query}': {e}")
64
- return []
65
 
66
  async def scrape_url(session: aiohttp.ClientSession, url: str) -> str:
67
  if url.lower().endswith('.pdf'): return "Error: PDF content cannot be scraped."
@@ -70,74 +100,48 @@ async def scrape_url(session: aiohttp.ClientSession, url: str) -> str:
70
  if response.status != 200: return f"Error: HTTP status {response.status}"
71
  html = await response.text()
72
  soup = BeautifulSoup(html, "html.parser")
73
- for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']):
74
- tag.decompose()
75
  return " ".join(soup.stripped_strings)
76
  except Exception as e:
77
- logger.warning(f"Scraping failed for {url}: {e}")
78
- return f"Error: {e}"
79
 
80
  async def search_and_scrape(session: aiohttp.ClientSession, query: str) -> tuple[str, list]:
81
- search_results = await call_snapzion_search(session, query)
82
- sources = search_results[:4]
83
  if not sources: return "", []
84
  scrape_tasks = [scrape_url(session, source["link"]) for source in sources]
85
  scraped_contents = await asyncio.gather(*scrape_tasks)
86
- context = "\n\n".join(
87
- f"Source Details: Title '{sources[i]['title']}', URL '{sources[i]['link']}'\nContent:\n{content}"
88
- for i, content in enumerate(scraped_contents) if not content.startswith("Error:")
89
- )
90
  return context, sources
91
 
92
  # --- Streaming Deep Research Logic ---
93
-
94
  async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
95
-
96
- def format_sse(data: dict) -> str:
97
- return f"data: {json.dumps(data)}\n\n"
98
-
99
- raw_response_text_for_debugging = "" # Variable to hold response text for logging
100
  try:
101
  async with aiohttp.ClientSession() as session:
102
  # Step 1: Generate Sub-Questions
103
  yield format_sse({"event": "status", "data": "Generating research plan..."})
104
 
 
105
  sub_question_prompt = {
106
- "model": LLM_MODEL,
107
- "messages": [{ "role": "user", "content": f"You are a research planner. For the topic '{query}', create a JSON array of 3-4 key sub-questions for a research report. Respond ONLY with the JSON array. Example: [\"Question 1?\", \"Question 2?\"]" }]
108
  }
109
-
110
- # ***** CHANGE 3: The most critical fix. Heavily reinforced error handling. *****
111
  try:
112
- logger.info(f"Sending request to LLM for planning. Model: {LLM_MODEL}, URL: {LLM_API_URL}")
113
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=sub_question_prompt, timeout=20) as response:
114
- raw_response_text_for_debugging = await response.text()
115
-
116
- if response.status != 200:
117
- logger.error(f"LLM API for planning failed! Status: {response.status}, Headers: {response.headers}, Body: {raw_response_text_for_debugging}")
118
- raise Exception(f"LLM provider returned non-200 status: {response.status}")
119
-
120
- if not raw_response_text_for_debugging:
121
- raise Exception("LLM provider returned an empty response body.")
122
-
123
- result = json.loads(raw_response_text_for_debugging)
124
- llm_content = result.get('choices', [{}])[0].get('message', {}).get('content', '')
125
 
126
- if not llm_content or not llm_content.strip().startswith('['):
127
- logger.error(f"LLM did not return a valid JSON array string. Received: {llm_content}")
128
- raise Exception("LLM failed to generate a valid research plan.")
129
-
130
- sub_questions = json.loads(llm_content)
131
-
132
  except Exception as e:
133
- # This will now catch the JSON error and log the problematic text
134
- logger.error(f"Failed to generate/parse research plan. Error: {e}. Raw API Response: '{raw_response_text_for_debugging}'")
135
- yield format_sse({"event": "error", "data": f"Could not generate research plan. Reason: {e}"})
136
- return
137
 
138
  yield format_sse({"event": "plan", "data": sub_questions})
139
 
140
- # (The rest of the logic remains the same, as it was not the point of failure)
141
  research_tasks = [search_and_scrape(session, sq) for sq in sub_questions]
142
  yield format_sse({"event": "status", "data": f"Starting research on {len(sub_questions)} topics..."})
143
 
@@ -148,22 +152,18 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
148
  if context: consolidated_context += context + "\n\n---\n\n"
149
  if sources: all_sources.extend(sources)
150
 
151
- yield format_sse({"event": "status", "data": "Consolidating research..."})
152
- if len(consolidated_context) > MAX_CONTEXT_CHAR_LENGTH:
153
- consolidated_context = consolidated_context[:MAX_CONTEXT_CHAR_LENGTH]
154
-
155
  if not consolidated_context.strip():
156
- yield format_sse({"event": "error", "data": "Failed to gather any research context."})
157
- return
158
 
159
  yield format_sse({"event": "status", "data": "Generating final report..."})
 
 
 
160
  final_report_prompt = f'Synthesize the provided context into a comprehensive report on "{query}". Use markdown. Context:\n{consolidated_context}'
161
  final_report_payload = {"model": LLM_MODEL, "messages": [{"role": "user", "content": final_report_prompt}], "stream": True}
162
 
163
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=final_report_payload) as response:
164
- if response.status != 200:
165
- error_text = await response.text()
166
- raise Exception(f"LLM API Error for final report: {response.status}, {error_text}")
167
  async for line in response.content:
168
  if line.strip():
169
  line_str = line.decode('utf-8').strip()
@@ -171,13 +171,12 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
171
  if line_str == "[DONE]": break
172
  try:
173
  chunk = json.loads(line_str)
174
- content = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
175
  if content: yield format_sse({"event": "chunk", "data": content})
176
  except json.JSONDecodeError: continue
177
 
178
  unique_sources = list({s['link']: s for s in all_sources}.values())
179
  yield format_sse({"event": "sources", "data": unique_sources})
180
-
181
  except Exception as e:
182
  logger.error(f"A critical error occurred in the main research stream: {e}")
183
  yield format_sse({"event": "error", "data": str(e)})
 
1
+
2
+ Your Python code (`json.loads()`) sees the triple backticks ` ``` ` and the word `json` and correctly determines that this is *not* a valid JSON array. It's a string containing a code block.
3
+
4
+ The fix is to make our code smarter by cleaning this "helpful" formatting *before* attempting to parse it as JSON.
5
+
6
+ ### The Solution
7
+
8
+ We will implement two key changes in `main.py`:
9
+
10
+ 1. **Smart JSON Extraction:** We'll add a function that uses a regular expression to find and extract the JSON array (`[...]`) from the LLM's response string, reliably ignoring any Markdown fences or other text.
11
+ 2. **Improved Prompting:** We will make our instruction in the prompt even more explicit to reduce the chance of the model adding extra formatting in the first place.
12
+
13
+ Regarding your request to "allow streaming for llm," the final report generation step **already does this correctly.** The `typegpt.net` API, being OpenAI-compatible, uses the exact streaming format that the code is built to handle. The previous error was simply preventing the process from ever *reaching* the final streaming step. This fix will unblock it.
14
+
15
+ The `Dockerfile` and `requirements.txt` do not need any changes.
16
+
17
+ ### Updated `main.py`
18
+
19
+ Replace the entire content of your `main.py` with this definitive, robust version.
20
+
21
+ ```python
22
  import os
23
  import asyncio
24
  import json
25
  import logging
26
+ import re
27
+ from typing import AsyncGenerator, Optional
28
 
29
  from fastapi import FastAPI
30
  from fastapi.responses import StreamingResponse
 
43
  if not LLM_API_KEY:
44
  raise RuntimeError("LLM_API_KEY must be set in a .env file.")
45
  else:
46
+ logger.info("LLM API Key loaded successfully.")
47
 
48
+ # API Provider Constants
49
  SNAPZION_API_URL = "https://search.snapzion.com/get-snippets"
50
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
51
  LLM_MODEL = "gpt-4.1-mini"
52
  MAX_CONTEXT_CHAR_LENGTH = 120000
53
 
54
+ # Headers
55
  SNAPZION_HEADERS = { 'Content-Type': 'application/json', 'User-Agent': 'AI-Deep-Research-Agent/1.0' }
56
  SCRAPING_HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36' }
57
+ LLM_HEADERS = { "Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json", "Accept": "application/json" }
58
 
59
+ # --- Pydantic Models & Helper Functions ---
 
 
 
 
 
 
 
 
60
  class DeepResearchRequest(BaseModel):
61
  query: str
62
 
63
+ # ***** CHANGE 1: The core of the fix. A robust JSON extraction function. *****
64
+ def extract_json_from_llm_response(text: str) -> Optional[list]:
65
+ """
66
+ Finds and parses a JSON array within a string, ignoring Markdown fences.
67
+ """
68
+ # Regex to find a string that starts with [ and ends with ], accounting for nesting
69
+ match = re.search(r'\[.*\]', text, re.DOTALL)
70
+ if match:
71
+ json_str = match.group(0)
72
+ try:
73
+ return json.loads(json_str)
74
+ except json.JSONDecodeError:
75
+ logger.error(f"Failed to parse extracted JSON string: {json_str}")
76
+ return None
77
+ logger.warning(f"No JSON array found in LLM response: {text}")
78
+ return None
79
+
80
  # --- FastAPI App ---
81
  app = FastAPI(
82
  title="AI Deep Research API",
83
  description="Provides streaming deep research completions.",
84
+ version="2.4.0" # Version bump for Markdown parsing fix
85
  )
86
 
87
  # --- Core Service Functions (Unchanged) ---
88
  async def call_snapzion_search(session: aiohttp.ClientSession, query: str) -> list:
89
  try:
90
  async with session.post(SNAPZION_API_URL, headers=SNAPZION_HEADERS, json={"query": query}, timeout=15) as response:
91
+ response.raise_for_status(); data = await response.json()
 
92
  return data.get("organic_results", [])
93
  except Exception as e:
94
+ logger.error(f"Snapzion search failed for query '{query}': {e}"); return []
 
95
 
96
  async def scrape_url(session: aiohttp.ClientSession, url: str) -> str:
97
  if url.lower().endswith('.pdf'): return "Error: PDF content cannot be scraped."
 
100
  if response.status != 200: return f"Error: HTTP status {response.status}"
101
  html = await response.text()
102
  soup = BeautifulSoup(html, "html.parser")
103
+ for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): tag.decompose()
 
104
  return " ".join(soup.stripped_strings)
105
  except Exception as e:
106
+ logger.warning(f"Scraping failed for {url}: {e}"); return f"Error: {e}"
 
107
 
108
  async def search_and_scrape(session: aiohttp.ClientSession, query: str) -> tuple[str, list]:
109
+ search_results = await call_snapzion_search(session, query); sources = search_results[:4]
 
110
  if not sources: return "", []
111
  scrape_tasks = [scrape_url(session, source["link"]) for source in sources]
112
  scraped_contents = await asyncio.gather(*scrape_tasks)
113
+ context = "\n\n".join(f"Source: {sources[i]['link']}\nContent: {content}" for i, content in enumerate(scraped_contents) if not content.startswith("Error:"))
 
 
 
114
  return context, sources
115
 
116
  # --- Streaming Deep Research Logic ---
 
117
  async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
118
+ def format_sse(data: dict) -> str: return f"data: {json.dumps(data)}\n\n"
 
 
 
 
119
  try:
120
  async with aiohttp.ClientSession() as session:
121
  # Step 1: Generate Sub-Questions
122
  yield format_sse({"event": "status", "data": "Generating research plan..."})
123
 
124
+ # ***** CHANGE 2: Improved, stricter prompt *****
125
  sub_question_prompt = {
126
+ "model": LLM_MODEL, "messages": [{"role": "user", "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array, without markdown, explanations, or any other text. Example: [\"Question 1?\", \"Question 2?\"]"}]
 
127
  }
 
 
128
  try:
 
129
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=sub_question_prompt, timeout=20) as response:
130
+ response.raise_for_status()
131
+ raw_response_text = await response.text()
132
+ result = json.loads(raw_response_text)
133
+ llm_content = result.get('choices', [{}]).get('message', {}).get('content', '')
 
 
 
 
 
 
 
134
 
135
+ sub_questions = extract_json_from_llm_response(llm_content)
136
+ if not sub_questions:
137
+ raise ValueError(f"Could not extract valid JSON from LLM content: {llm_content}")
 
 
 
138
  except Exception as e:
139
+ logger.error(f"Failed to generate research plan: {e}")
140
+ yield format_sse({"event": "error", "data": f"Could not generate research plan. Reason: {e}"}); return
 
 
141
 
142
  yield format_sse({"event": "plan", "data": sub_questions})
143
 
144
+ # Steps 2, 3, 4 will now execute correctly
145
  research_tasks = [search_and_scrape(session, sq) for sq in sub_questions]
146
  yield format_sse({"event": "status", "data": f"Starting research on {len(sub_questions)} topics..."})
147
 
 
152
  if context: consolidated_context += context + "\n\n---\n\n"
153
  if sources: all_sources.extend(sources)
154
 
 
 
 
 
155
  if not consolidated_context.strip():
156
+ yield format_sse({"event": "error", "data": "Failed to gather any research context."}); return
 
157
 
158
  yield format_sse({"event": "status", "data": "Generating final report..."})
159
+ if len(consolidated_context) > MAX_CONTEXT_CHAR_LENGTH:
160
+ consolidated_context = consolidated_context[:MAX_CONTEXT_CHAR_LENGTH]
161
+
162
  final_report_prompt = f'Synthesize the provided context into a comprehensive report on "{query}". Use markdown. Context:\n{consolidated_context}'
163
  final_report_payload = {"model": LLM_MODEL, "messages": [{"role": "user", "content": final_report_prompt}], "stream": True}
164
 
165
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=final_report_payload) as response:
166
+ response.raise_for_status()
 
 
167
  async for line in response.content:
168
  if line.strip():
169
  line_str = line.decode('utf-8').strip()
 
171
  if line_str == "[DONE]": break
172
  try:
173
  chunk = json.loads(line_str)
174
+ content = chunk.get("choices", [{}]).get("delta", {}).get("content")
175
  if content: yield format_sse({"event": "chunk", "data": content})
176
  except json.JSONDecodeError: continue
177
 
178
  unique_sources = list({s['link']: s for s in all_sources}.values())
179
  yield format_sse({"event": "sources", "data": unique_sources})
 
180
  except Exception as e:
181
  logger.error(f"A critical error occurred in the main research stream: {e}")
182
  yield format_sse({"event": "error", "data": str(e)})