rkihacker commited on
Commit
43aeff7
·
verified ·
1 Parent(s): a38a28a

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +61 -54
main.py CHANGED
@@ -1,4 +1,3 @@
1
-
2
  import os
3
  import asyncio
4
  import json
@@ -6,6 +5,7 @@ import logging
6
  import random
7
  import re
8
  from typing import AsyncGenerator, Optional, Tuple, List
 
9
 
10
  from fastapi import FastAPI
11
  from fastapi.responses import StreamingResponse
@@ -14,7 +14,6 @@ from pydantic import BaseModel
14
  from dotenv import load_dotenv
15
  import aiohttp
16
  from bs4 import BeautifulSoup
17
- from ddgs import DDGS
18
 
19
  # --- Configuration ---
20
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -48,17 +47,11 @@ class DeepResearchRequest(BaseModel):
48
  app = FastAPI(
49
  title="AI Deep Research API",
50
  description="Provides robust, long-form, streaming deep research completions using the DuckDuckGo Search API.",
51
- version="9.2.0" # Robust async client handling
52
  )
53
 
54
  # Enable CORS for all origins
55
- app.add_middleware(
56
- CORSMiddleware,
57
- allow_origins=["*"],
58
- allow_credentials=True,
59
- allow_methods=["*"],
60
- allow_headers=["*"]
61
- )
62
 
63
  # --- Helper Functions ---
64
  def extract_json_from_llm_response(text: str) -> Optional[list]:
@@ -72,18 +65,48 @@ def extract_json_from_llm_response(text: str) -> Optional[list]:
72
 
73
  # --- Core Service Functions ---
74
  async def call_duckduckgo_search(session: aiohttp.ClientSession, query: str, max_results: int = 10) -> List[dict]:
75
- """Performs a search using the DDGS API with an existing aiohttp session."""
76
- logger.info(f"Searching DuckDuckGo API for: '{query}'")
 
 
 
 
77
  try:
78
- ddgs = DDGS(session=session)
79
- raw_results = [r async for r in ddgs.atext(query, max_results=max_results)]
80
-
81
- results = [
82
- {'title': r.get('title'), 'link': r.get('href'), 'snippet': r.get('body')}
83
- for r in raw_results if r.get('href') and r.get('title') and r.get('body')
84
- ]
85
- logger.info(f"Found {len(results)} sources from DuckDuckGo for: '{query}'")
86
- return results
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87
  except Exception as e:
88
  logger.error(f"DuckDuckGo search failed for query '{query}': {e}", exc_info=True)
89
  return []
@@ -94,24 +117,17 @@ async def research_and_process_source(session: aiohttp.ClientSession, source: di
94
  logger.info(f"Scraping: {source['link']}")
95
  if source['link'].lower().endswith('.pdf'):
96
  raise ValueError("PDF content")
97
-
98
  async with session.get(source['link'], headers=headers, timeout=10, ssl=False) as response:
99
  if response.status != 200:
100
  raise ValueError(f"HTTP status {response.status}")
101
-
102
  html = await response.text()
103
  soup = BeautifulSoup(html, "html.parser")
104
-
105
- # Remove unnecessary tags
106
  for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']):
107
  tag.decompose()
108
-
109
  content = " ".join(soup.stripped_strings)
110
  if not content.strip():
111
  raise ValueError("Parsed content is empty.")
112
-
113
  return content, source
114
-
115
  except Exception as e:
116
  logger.warning(f"Scraping failed for {source['link']} ({e}). Falling back to snippet.")
117
  return source.get('snippet', ''), source
@@ -122,10 +138,8 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
122
  return f"data: {json.dumps(data)}\n\n"
123
 
124
  try:
125
- # Create a single session for all HTTP requests in this stream
126
  async with aiohttp.ClientSession() as session:
127
  yield format_sse({"event": "status", "data": "Generating research plan..."})
128
-
129
  plan_prompt = {
130
  "model": LLM_MODEL,
131
  "messages": [{
@@ -133,7 +147,6 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
133
  "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array. Example: [\"Question 1?\"]"
134
  }]
135
  }
136
-
137
  try:
138
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=25) as response:
139
  response.raise_for_status()
@@ -146,13 +159,10 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
146
  return
147
 
148
  yield format_sse({"event": "plan", "data": sub_questions})
149
- yield format_sse({"event": "status", "data": f"Searching sources for {len(sub_questions)} topics..."})
150
 
151
- # Pass the single session to each search task
152
  search_tasks = [call_duckduckgo_search(session, sq) for sq in sub_questions]
153
  all_search_results = await asyncio.gather(*search_tasks)
154
-
155
- # Flatten and deduplicate sources by link
156
  unique_sources = list({source['link']: source for results in all_search_results for source in results}.values())
157
 
158
  if not unique_sources:
@@ -160,14 +170,10 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
160
  return
161
 
162
  sources_to_process = unique_sources[:MAX_SOURCES_TO_PROCESS]
163
- yield format_sse({
164
- "event": "status",
165
- "data": f"Found {len(unique_sources)} unique sources. Processing the top {len(sources_to_process)}..."
166
- })
167
 
168
  processing_tasks = [research_and_process_source(session, source) for source in sources_to_process]
169
- consolidated_context = ""
170
- all_sources_used = []
171
 
172
  for task in asyncio.as_completed(processing_tasks):
173
  content, source_info = await task
@@ -180,30 +186,20 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
180
  return
181
 
182
  yield format_sse({"event": "status", "data": "Synthesizing final report..."})
183
-
184
  report_prompt = f'Synthesize the provided context into a long-form, comprehensive, multi-page report on "{query}". Use markdown. Elaborate extensively on each point. Base your entire report ONLY on the provided context.\n\n## Research Context ##\n{consolidated_context}'
185
- report_payload = {
186
- "model": LLM_MODEL,
187
- "messages": [{"role": "user", "content": report_prompt}],
188
- "stream": True
189
- }
190
 
191
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
192
  response.raise_for_status()
193
-
194
  async for line in response.content:
195
  line_str = line.decode('utf-8').strip()
196
-
197
  if line_str.startswith('data:'):
198
  line_str = line_str[5:].strip()
199
-
200
  if line_str == "[DONE]":
201
  break
202
-
203
  try:
204
  chunk = json.loads(line_str)
205
  choices = chunk.get("choices")
206
-
207
  if choices and isinstance(choices, list) and len(choices) > 0:
208
  content = choices[0].get("delta", {}).get("content")
209
  if content:
@@ -212,7 +208,18 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
212
  continue
213
 
214
  yield format_sse({"event": "sources", "data": all_sources_used})
215
-
216
  except Exception as e:
217
  logger.error(f"A critical error occurred: {e}", exc_info=True)
218
- yield format_sse({"event": "error", "data": str(e)})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
  import asyncio
3
  import json
 
5
  import random
6
  import re
7
  from typing import AsyncGenerator, Optional, Tuple, List
8
+ from urllib.parse import unquote
9
 
10
  from fastapi import FastAPI
11
  from fastapi.responses import StreamingResponse
 
14
  from dotenv import load_dotenv
15
  import aiohttp
16
  from bs4 import BeautifulSoup
 
17
 
18
  # --- Configuration ---
19
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
47
  app = FastAPI(
48
  title="AI Deep Research API",
49
  description="Provides robust, long-form, streaming deep research completions using the DuckDuckGo Search API.",
50
+ version="9.3.0" # Using direct DuckDuckGo HTML API
51
  )
52
 
53
  # Enable CORS for all origins
54
+ app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
 
 
 
 
 
 
55
 
56
  # --- Helper Functions ---
57
  def extract_json_from_llm_response(text: str) -> Optional[list]:
 
65
 
66
  # --- Core Service Functions ---
67
  async def call_duckduckgo_search(session: aiohttp.ClientSession, query: str, max_results: int = 10) -> List[dict]:
68
+ """Performs a search by directly scraping the DuckDuckGo HTML interface."""
69
+ logger.info(f"Searching DuckDuckGo for: '{query}'")
70
+ search_url = "https://html.duckduckgo.com/html/"
71
+ params = {"q": query}
72
+ headers = {"User-Agent": random.choice(USER_AGENTS)}
73
+
74
  try:
75
+ async with session.post(search_url, data=params, headers=headers, ssl=False) as response:
76
+ if response.status != 200:
77
+ logger.error(f"DuckDuckGo search failed with status {response.status} for query '{query}'")
78
+ return []
79
+
80
+ html = await response.text()
81
+ soup = BeautifulSoup(html, "html.parser")
82
+ results = []
83
+
84
+ for result in soup.find_all('div', class_='result'):
85
+ title_elem = result.find('a', class_='result__a')
86
+ snippet_elem = result.find('a', class_='result__snippet')
87
+ link_elem = result.find('a', class_='result__url')
88
+
89
+ if title_elem and snippet_elem and link_elem:
90
+ # Extract the raw href which is a redirect
91
+ raw_href = link_elem.get('href', '')
92
+
93
+ # The actual URL is in a query parameter 'uddg'
94
+ parsed_url_match = re.search(r'uddg=([^&]+)', raw_href)
95
+ if parsed_url_match:
96
+ # URL decode the extracted URL
97
+ link = unquote(parsed_url_match.group(1))
98
+ else:
99
+ continue # Skip if we can't find the clean URL
100
+
101
+ title = title_elem.get_text(strip=True)
102
+ snippet = snippet_elem.get_text(strip=True)
103
+
104
+ results.append({'title': title, 'link': link, 'snippet': snippet})
105
+ if len(results) >= max_results:
106
+ break
107
+
108
+ logger.info(f"Found {len(results)} sources from DuckDuckGo for: '{query}'")
109
+ return results
110
  except Exception as e:
111
  logger.error(f"DuckDuckGo search failed for query '{query}': {e}", exc_info=True)
112
  return []
 
117
  logger.info(f"Scraping: {source['link']}")
118
  if source['link'].lower().endswith('.pdf'):
119
  raise ValueError("PDF content")
 
120
  async with session.get(source['link'], headers=headers, timeout=10, ssl=False) as response:
121
  if response.status != 200:
122
  raise ValueError(f"HTTP status {response.status}")
 
123
  html = await response.text()
124
  soup = BeautifulSoup(html, "html.parser")
 
 
125
  for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']):
126
  tag.decompose()
 
127
  content = " ".join(soup.stripped_strings)
128
  if not content.strip():
129
  raise ValueError("Parsed content is empty.")
 
130
  return content, source
 
131
  except Exception as e:
132
  logger.warning(f"Scraping failed for {source['link']} ({e}). Falling back to snippet.")
133
  return source.get('snippet', ''), source
 
138
  return f"data: {json.dumps(data)}\n\n"
139
 
140
  try:
 
141
  async with aiohttp.ClientSession() as session:
142
  yield format_sse({"event": "status", "data": "Generating research plan..."})
 
143
  plan_prompt = {
144
  "model": LLM_MODEL,
145
  "messages": [{
 
147
  "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array. Example: [\"Question 1?\"]"
148
  }]
149
  }
 
150
  try:
151
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=25) as response:
152
  response.raise_for_status()
 
159
  return
160
 
161
  yield format_sse({"event": "plan", "data": sub_questions})
 
162
 
163
+ yield format_sse({"event": "status", "data": f"Searching sources for {len(sub_questions)} topics..."})
164
  search_tasks = [call_duckduckgo_search(session, sq) for sq in sub_questions]
165
  all_search_results = await asyncio.gather(*search_tasks)
 
 
166
  unique_sources = list({source['link']: source for results in all_search_results for source in results}.values())
167
 
168
  if not unique_sources:
 
170
  return
171
 
172
  sources_to_process = unique_sources[:MAX_SOURCES_TO_PROCESS]
173
+ yield format_sse({"event": "status", "data": f"Found {len(unique_sources)} unique sources. Processing the top {len(sources_to_process)}..."})
 
 
 
174
 
175
  processing_tasks = [research_and_process_source(session, source) for source in sources_to_process]
176
+ consolidated_context, all_sources_used = "", []
 
177
 
178
  for task in asyncio.as_completed(processing_tasks):
179
  content, source_info = await task
 
186
  return
187
 
188
  yield format_sse({"event": "status", "data": "Synthesizing final report..."})
 
189
  report_prompt = f'Synthesize the provided context into a long-form, comprehensive, multi-page report on "{query}". Use markdown. Elaborate extensively on each point. Base your entire report ONLY on the provided context.\n\n## Research Context ##\n{consolidated_context}'
190
+ report_payload = {"model": LLM_MODEL, "messages": [{"role": "user", "content": report_prompt}], "stream": True}
 
 
 
 
191
 
192
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
193
  response.raise_for_status()
 
194
  async for line in response.content:
195
  line_str = line.decode('utf-8').strip()
 
196
  if line_str.startswith('data:'):
197
  line_str = line_str[5:].strip()
 
198
  if line_str == "[DONE]":
199
  break
 
200
  try:
201
  chunk = json.loads(line_str)
202
  choices = chunk.get("choices")
 
203
  if choices and isinstance(choices, list) and len(choices) > 0:
204
  content = choices[0].get("delta", {}).get("content")
205
  if content:
 
208
  continue
209
 
210
  yield format_sse({"event": "sources", "data": all_sources_used})
 
211
  except Exception as e:
212
  logger.error(f"A critical error occurred: {e}", exc_info=True)
213
+ yield format_sse({"event": "error", "data": f"An unexpected error occurred: {str(e)}"})
214
+
215
+ @app.post("/deep-research", response_class=StreamingResponse)
216
+ async def deep_research_endpoint(request: DeepResearchRequest):
217
+ """
218
+ Accepts a query and streams back a detailed research report.
219
+ Events: status, plan, chunk, sources, error
220
+ """
221
+ return StreamingResponse(run_deep_research_stream(request.query), media_type="text/event-stream")
222
+
223
+ if __name__ == "__main__":
224
+ import uvicorn
225
+ uvicorn.run(app, host="0.0.0.0", port=8000)