rkihacker commited on
Commit
a38a28a
·
verified ·
1 Parent(s): 5366574

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +82 -35
main.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import os
2
  import asyncio
3
  import json
@@ -51,14 +52,22 @@ app = FastAPI(
51
  )
52
 
53
  # Enable CORS for all origins
54
- app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
 
 
 
 
 
 
55
 
56
  # --- Helper Functions ---
57
  def extract_json_from_llm_response(text: str) -> Optional[list]:
58
  match = re.search(r'\[.*\]', text, re.DOTALL)
59
  if match:
60
- try: return json.loads(match.group(0))
61
- except json.JSONDecodeError: return None
 
 
62
  return None
63
 
64
  # --- Core Service Functions ---
@@ -66,10 +75,9 @@ async def call_duckduckgo_search(session: aiohttp.ClientSession, query: str, max
66
  """Performs a search using the DDGS API with an existing aiohttp session."""
67
  logger.info(f"Searching DuckDuckGo API for: '{query}'")
68
  try:
69
- # Initialize DDGS with the provided session, no 'async with' needed here
70
  ddgs = DDGS(session=session)
71
  raw_results = [r async for r in ddgs.atext(query, max_results=max_results)]
72
-
73
  results = [
74
  {'title': r.get('title'), 'link': r.get('href'), 'snippet': r.get('body')}
75
  for r in raw_results if r.get('href') and r.get('title') and r.get('body')
@@ -77,7 +85,6 @@ async def call_duckduckgo_search(session: aiohttp.ClientSession, query: str, max
77
  logger.info(f"Found {len(results)} sources from DuckDuckGo for: '{query}'")
78
  return results
79
  except Exception as e:
80
- # Log the full traceback for detailed debugging
81
  logger.error(f"DuckDuckGo search failed for query '{query}': {e}", exc_info=True)
82
  return []
83
 
@@ -85,52 +92,83 @@ async def research_and_process_source(session: aiohttp.ClientSession, source: di
85
  headers = {'User-Agent': random.choice(USER_AGENTS)}
86
  try:
87
  logger.info(f"Scraping: {source['link']}")
88
- if source['link'].lower().endswith('.pdf'): raise ValueError("PDF content")
 
 
89
  async with session.get(source['link'], headers=headers, timeout=10, ssl=False) as response:
90
- if response.status != 200: raise ValueError(f"HTTP status {response.status}")
 
 
91
  html = await response.text()
92
  soup = BeautifulSoup(html, "html.parser")
93
- for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): tag.decompose()
 
 
 
 
94
  content = " ".join(soup.stripped_strings)
95
- if not content.strip(): raise ValueError("Parsed content is empty.")
 
 
96
  return content, source
 
97
  except Exception as e:
98
  logger.warning(f"Scraping failed for {source['link']} ({e}). Falling back to snippet.")
99
  return source.get('snippet', ''), source
100
 
101
  # --- Streaming Deep Research Logic ---
102
  async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
103
- def format_sse(data: dict) -> str: return f"data: {json.dumps(data)}\n\n"
 
 
104
  try:
105
  # Create a single session for all HTTP requests in this stream
106
  async with aiohttp.ClientSession() as session:
107
  yield format_sse({"event": "status", "data": "Generating research plan..."})
108
- plan_prompt = {"model": LLM_MODEL, "messages": [{"role": "user", "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array. Example: [\"Question 1?\"]"}]}
 
 
 
 
 
 
 
 
109
  try:
110
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=25) as response:
111
- response.raise_for_status(); result = await response.json()
 
112
  sub_questions = result if isinstance(result, list) else extract_json_from_llm_response(result['choices'][0]['message']['content'])
113
- if not isinstance(sub_questions, list): raise ValueError(f"Invalid plan from LLM: {result}")
 
114
  except Exception as e:
115
- yield format_sse({"event": "error", "data": f"Could not generate research plan. Reason: {e}"}); return
 
116
 
117
  yield format_sse({"event": "plan", "data": sub_questions})
118
-
119
  yield format_sse({"event": "status", "data": f"Searching sources for {len(sub_questions)} topics..."})
 
120
  # Pass the single session to each search task
121
  search_tasks = [call_duckduckgo_search(session, sq) for sq in sub_questions]
122
  all_search_results = await asyncio.gather(*search_tasks)
 
 
123
  unique_sources = list({source['link']: source for results in all_search_results for source in results}.values())
124
-
125
  if not unique_sources:
126
- yield format_sse({"event": "error", "data": "All search queries returned zero usable sources."}); return
127
-
 
128
  sources_to_process = unique_sources[:MAX_SOURCES_TO_PROCESS]
129
- yield format_sse({"event": "status", "data": f"Found {len(unique_sources)} unique sources. Processing the top {len(sources_to_process)}..."})
130
-
 
 
 
131
  processing_tasks = [research_and_process_source(session, source) for source in sources_to_process]
132
- consolidated_context, all_sources_used = "", []
133
-
 
134
  for task in asyncio.as_completed(processing_tasks):
135
  content, source_info = await task
136
  if content:
@@ -138,34 +176,43 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
138
  all_sources_used.append(source_info)
139
 
140
  if not consolidated_context.strip():
141
- yield format_sse({"event": "error", "data": "Failed to gather any research context."}); return
 
142
 
143
  yield format_sse({"event": "status", "data": "Synthesizing final report..."})
 
144
  report_prompt = f'Synthesize the provided context into a long-form, comprehensive, multi-page report on "{query}". Use markdown. Elaborate extensively on each point. Base your entire report ONLY on the provided context.\n\n## Research Context ##\n{consolidated_context}'
145
- report_payload = {"model": LLM_MODEL, "messages": [{"role": "user", "content": report_prompt}], "stream": True}
 
 
 
 
146
 
147
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
148
  response.raise_for_status()
 
149
  async for line in response.content:
150
  line_str = line.decode('utf-8').strip()
151
- if line_str.startswith('data:'): line_str = line_str[5:].strip()
152
- if line_str == "[DONE]": break
 
 
 
 
 
153
  try:
154
  chunk = json.loads(line_str)
155
  choices = chunk.get("choices")
 
156
  if choices and isinstance(choices, list) and len(choices) > 0:
157
  content = choices[0].get("delta", {}).get("content")
158
  if content:
159
  yield format_sse({"event": "chunk", "data": content})
160
- except json.JSONDecodeError: continue
 
161
 
162
  yield format_sse({"event": "sources", "data": all_sources_used})
 
163
  except Exception as e:
164
  logger.error(f"A critical error occurred: {e}", exc_info=True)
165
- yield format_sse({"event": "error", "data": str(e)})
166
- finally:
167
- yield format_sse({"event": "done", "data": "Deep research complete."})
168
-
169
- @app.post("/v1/deepresearch/completions")
170
- async def deep_research_endpoint(request: DeepResearchRequest):
171
- return StreamingResponse(run_deep_research_stream(request.query), media_type="text/event-stream")
 
1
+
2
  import os
3
  import asyncio
4
  import json
 
52
  )
53
 
54
  # Enable CORS for all origins
55
+ app.add_middleware(
56
+ CORSMiddleware,
57
+ allow_origins=["*"],
58
+ allow_credentials=True,
59
+ allow_methods=["*"],
60
+ allow_headers=["*"]
61
+ )
62
 
63
  # --- Helper Functions ---
64
  def extract_json_from_llm_response(text: str) -> Optional[list]:
65
  match = re.search(r'\[.*\]', text, re.DOTALL)
66
  if match:
67
+ try:
68
+ return json.loads(match.group(0))
69
+ except json.JSONDecodeError:
70
+ return None
71
  return None
72
 
73
  # --- Core Service Functions ---
 
75
  """Performs a search using the DDGS API with an existing aiohttp session."""
76
  logger.info(f"Searching DuckDuckGo API for: '{query}'")
77
  try:
 
78
  ddgs = DDGS(session=session)
79
  raw_results = [r async for r in ddgs.atext(query, max_results=max_results)]
80
+
81
  results = [
82
  {'title': r.get('title'), 'link': r.get('href'), 'snippet': r.get('body')}
83
  for r in raw_results if r.get('href') and r.get('title') and r.get('body')
 
85
  logger.info(f"Found {len(results)} sources from DuckDuckGo for: '{query}'")
86
  return results
87
  except Exception as e:
 
88
  logger.error(f"DuckDuckGo search failed for query '{query}': {e}", exc_info=True)
89
  return []
90
 
 
92
  headers = {'User-Agent': random.choice(USER_AGENTS)}
93
  try:
94
  logger.info(f"Scraping: {source['link']}")
95
+ if source['link'].lower().endswith('.pdf'):
96
+ raise ValueError("PDF content")
97
+
98
  async with session.get(source['link'], headers=headers, timeout=10, ssl=False) as response:
99
+ if response.status != 200:
100
+ raise ValueError(f"HTTP status {response.status}")
101
+
102
  html = await response.text()
103
  soup = BeautifulSoup(html, "html.parser")
104
+
105
+ # Remove unnecessary tags
106
+ for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']):
107
+ tag.decompose()
108
+
109
  content = " ".join(soup.stripped_strings)
110
+ if not content.strip():
111
+ raise ValueError("Parsed content is empty.")
112
+
113
  return content, source
114
+
115
  except Exception as e:
116
  logger.warning(f"Scraping failed for {source['link']} ({e}). Falling back to snippet.")
117
  return source.get('snippet', ''), source
118
 
119
  # --- Streaming Deep Research Logic ---
120
  async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
121
+ def format_sse(data: dict) -> str:
122
+ return f"data: {json.dumps(data)}\n\n"
123
+
124
  try:
125
  # Create a single session for all HTTP requests in this stream
126
  async with aiohttp.ClientSession() as session:
127
  yield format_sse({"event": "status", "data": "Generating research plan..."})
128
+
129
+ plan_prompt = {
130
+ "model": LLM_MODEL,
131
+ "messages": [{
132
+ "role": "user",
133
+ "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array. Example: [\"Question 1?\"]"
134
+ }]
135
+ }
136
+
137
  try:
138
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=25) as response:
139
+ response.raise_for_status()
140
+ result = await response.json()
141
  sub_questions = result if isinstance(result, list) else extract_json_from_llm_response(result['choices'][0]['message']['content'])
142
+ if not isinstance(sub_questions, list):
143
+ raise ValueError(f"Invalid plan from LLM: {result}")
144
  except Exception as e:
145
+ yield format_sse({"event": "error", "data": f"Could not generate research plan. Reason: {e}"})
146
+ return
147
 
148
  yield format_sse({"event": "plan", "data": sub_questions})
 
149
  yield format_sse({"event": "status", "data": f"Searching sources for {len(sub_questions)} topics..."})
150
+
151
  # Pass the single session to each search task
152
  search_tasks = [call_duckduckgo_search(session, sq) for sq in sub_questions]
153
  all_search_results = await asyncio.gather(*search_tasks)
154
+
155
+ # Flatten and deduplicate sources by link
156
  unique_sources = list({source['link']: source for results in all_search_results for source in results}.values())
157
+
158
  if not unique_sources:
159
+ yield format_sse({"event": "error", "data": "All search queries returned zero usable sources."})
160
+ return
161
+
162
  sources_to_process = unique_sources[:MAX_SOURCES_TO_PROCESS]
163
+ yield format_sse({
164
+ "event": "status",
165
+ "data": f"Found {len(unique_sources)} unique sources. Processing the top {len(sources_to_process)}..."
166
+ })
167
+
168
  processing_tasks = [research_and_process_source(session, source) for source in sources_to_process]
169
+ consolidated_context = ""
170
+ all_sources_used = []
171
+
172
  for task in asyncio.as_completed(processing_tasks):
173
  content, source_info = await task
174
  if content:
 
176
  all_sources_used.append(source_info)
177
 
178
  if not consolidated_context.strip():
179
+ yield format_sse({"event": "error", "data": "Failed to gather any research context."})
180
+ return
181
 
182
  yield format_sse({"event": "status", "data": "Synthesizing final report..."})
183
+
184
  report_prompt = f'Synthesize the provided context into a long-form, comprehensive, multi-page report on "{query}". Use markdown. Elaborate extensively on each point. Base your entire report ONLY on the provided context.\n\n## Research Context ##\n{consolidated_context}'
185
+ report_payload = {
186
+ "model": LLM_MODEL,
187
+ "messages": [{"role": "user", "content": report_prompt}],
188
+ "stream": True
189
+ }
190
 
191
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
192
  response.raise_for_status()
193
+
194
  async for line in response.content:
195
  line_str = line.decode('utf-8').strip()
196
+
197
+ if line_str.startswith('data:'):
198
+ line_str = line_str[5:].strip()
199
+
200
+ if line_str == "[DONE]":
201
+ break
202
+
203
  try:
204
  chunk = json.loads(line_str)
205
  choices = chunk.get("choices")
206
+
207
  if choices and isinstance(choices, list) and len(choices) > 0:
208
  content = choices[0].get("delta", {}).get("content")
209
  if content:
210
  yield format_sse({"event": "chunk", "data": content})
211
+ except json.JSONDecodeError:
212
+ continue
213
 
214
  yield format_sse({"event": "sources", "data": all_sources_used})
215
+
216
  except Exception as e:
217
  logger.error(f"A critical error occurred: {e}", exc_info=True)
218
+ yield format_sse({"event": "error", "data": str(e)})