rkihacker commited on
Commit
6142af3
·
verified ·
1 Parent(s): e1111e0

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +151 -110
main.py CHANGED
@@ -1,13 +1,17 @@
1
  import os
2
  import asyncio
 
 
 
 
3
  from fastapi import FastAPI, HTTPException, Query
 
 
4
  from dotenv import load_dotenv
5
  import aiohttp
6
  from bs4 import BeautifulSoup
7
- import logging
8
 
9
  # --- Configuration ---
10
- # Configure logging to see what's happening
11
  logging.basicConfig(level=logging.INFO)
12
  logger = logging.getLogger(__name__)
13
 
@@ -17,45 +21,28 @@ LLM_API_KEY = os.getenv("LLM_API_KEY")
17
  if not LLM_API_KEY:
18
  raise RuntimeError("LLM_API_KEY must be set in a .env file.")
19
 
20
- # Snapzion Search API Configuration
21
  SNAPZION_API_URL = "https://search.snapzion.com/get-snippets"
22
- SNAPZION_HEADERS = {
23
- 'accept': '*/*',
24
- 'accept-language': 'en-US,en;q=0.9',
25
- 'content-type': 'application/json',
26
- 'origin': 'https://search.snapzion.com',
27
- 'priority': 'u=1, i',
28
- 'referer': 'https://search.snapzion.com/docs',
29
- 'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"',
30
- 'sec-ch-ua-mobile': '?0',
31
- 'sec-ch-ua-platform': '"Windows"',
32
- 'sec-fetch-dest': 'empty',
33
- 'sec-fetch-mode': 'cors',
34
- 'sec-fetch-site': 'same-origin',
35
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36',
36
- }
37
-
38
- # ***** CHANGE 1: Add general-purpose browser headers for scraping *****
39
- SCRAPING_HEADERS = {
40
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36',
41
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
42
- 'Accept-Language': 'en-US,en;q=0.9',
43
- 'Connection': 'keep-alive',
44
- 'Upgrade-Insecure-Requests': '1',
45
- }
46
-
47
- # LLM Configuration
48
  LLM_API_URL = "https://api.inference.net/v1/chat/completions"
49
  LLM_MODEL = "meta-llama/llama-3.1-8b-instruct/fp-8"
50
 
 
 
 
 
 
 
 
 
 
51
  # --- FastAPI App Initialization ---
52
  app = FastAPI(
53
- title="AI Search Snippets API (Snapzion)",
54
- description="Provides AI-generated summaries from Snapzion search results.",
55
- version="1.1.0" # Version bump for new resilience feature
56
  )
57
 
58
- # --- Core Asynchronous Functions ---
59
 
60
  async def call_snapzion_search(session: aiohttp.ClientSession, query: str) -> list:
61
  try:
@@ -64,98 +51,152 @@ async def call_snapzion_search(session: aiohttp.ClientSession, query: str) -> li
64
  data = await response.json()
65
  return data.get("organic_results", [])
66
  except Exception as e:
67
- logger.error(f"Snapzion API call failed: {e}")
68
- raise HTTPException(status_code=503, detail=f"Search service (Snapzion) failed: {e}")
69
 
70
- # ***** CHANGE 2: Improve the scraping function *****
71
  async def scrape_url(session: aiohttp.ClientSession, url: str) -> str:
72
- """Asynchronously scrapes text from a URL, now with browser headers."""
73
- if url.lower().endswith('.pdf'):
74
- return "Error: Content is a PDF, which cannot be scraped."
75
  try:
76
- # Use the new scraping headers to look like a real browser
77
  async with session.get(url, headers=SCRAPING_HEADERS, timeout=10, ssl=False) as response:
78
- if response.status != 200:
79
- logger.warning(f"Failed to fetch {url}, status code: {response.status}")
80
- return f"Error: Failed to fetch with status {response.status}"
81
  html = await response.text()
82
  soup = BeautifulSoup(html, "html.parser")
83
  for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']):
84
  tag.decompose()
85
  return " ".join(soup.stripped_strings)
86
  except Exception as e:
87
- logger.warning(f"Could not scrape {url}. Reason: {e}")
88
- return f"Error: Could not scrape. Reason: {e}"
89
 
90
- async def get_ai_snippet(query: str, context: str, sources: list) -> str:
91
- headers = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json"}
92
- source_list_str = "\n".join([f"[{i+1}] {source['title']}: {source['link']}" for i, source in enumerate(sources)])
93
- prompt = f"""
94
- Based *only* on the provided context, provide a concise, factual answer to the user's query. Cite every sentence with the corresponding source number(s), like `[1]` or `[2, 3]`.
95
 
96
- Sources:
97
- {source_list_str}
98
 
99
- Context:
100
- ---
101
- {context}
102
- ---
 
103
 
104
- User Query: "{query}"
105
 
106
- Answer with citations:
107
- """
108
- data = {"model": LLM_MODEL, "messages": [{"role": "user", "content": prompt}], "max_tokens": 500}
109
- async with aiohttp.ClientSession() as session:
110
- try:
111
- async with session.post(LLM_API_URL, headers=headers, json=data, timeout=45) as response:
 
 
 
 
 
 
 
 
 
 
 
 
 
112
  response.raise_for_status()
113
  result = await response.json()
114
- return result['choices'][0]['message']['content']
115
- except Exception as e:
116
- logger.error(f"LLM API call failed: {e}")
117
- raise HTTPException(status_code=502, detail=f"Failed to get response from LLM: {e}")
118
-
119
- # --- API Endpoint ---
120
-
121
- @app.get("/search")
122
- async def ai_search(q: str = Query(..., min_length=3, description="The search query.")):
123
- async with aiohttp.ClientSession() as session:
124
- search_results = await call_snapzion_search(session, q)
125
- if not search_results:
126
- raise HTTPException(status_code=404, detail="Could not find any relevant sources for the query.")
127
-
128
- sources = search_results[:5] # Use top 5 sources
129
- scrape_tasks = [scrape_url(session, source["link"]) for source in sources]
130
- scraped_contents = await asyncio.gather(*scrape_tasks)
131
-
132
- # ***** CHANGE 3: Implement the robust fallback logic *****
133
- successful_scrapes = [content for content in scraped_contents if not content.startswith("Error:")]
134
-
135
- full_context = ""
136
- if successful_scrapes:
137
- logger.info(f"Successfully scraped {len(successful_scrapes)} out of {len(sources)} sources.")
138
- # Build context from successfully scraped content
139
- full_context = "\n\n".join(
140
- f"Source [{i+1}] ({sources[i]['link']}):\n{scraped_contents[i]}"
141
- for i in range(len(sources)) if not scraped_contents[i].startswith("Error:")
142
- )
143
- else:
144
- # If ALL scrapes failed, fall back to using the snippets from the search API
145
- logger.warning("All scraping attempts failed. Falling back to using API snippets for context.")
146
- full_context = "\n\n".join(
147
- f"Source [{i+1}] ({source['link']}):\n{source['snippet']}"
148
- for i, source in enumerate(sources)
149
- )
150
-
151
- if not full_context.strip():
152
- # This is a final safety net, should rarely be hit now
153
- raise HTTPException(status_code=500, detail="Could not construct any context from sources or snippets.")
154
-
155
- ai_summary = await get_ai_snippet(q, full_context, sources)
156
-
157
- return {"ai_summary": ai_summary, "sources": sources}
158
-
159
- @app.get("/")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
160
  def root():
161
- return {"message": "AI Search API is active. Use the /docs endpoint to test."}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
  import asyncio
3
+ import json
4
+ import logging
5
+ from typing import AsyncGenerator
6
+
7
  from fastapi import FastAPI, HTTPException, Query
8
+ from fastapi.responses import StreamingResponse
9
+ from pydantic import BaseModel
10
  from dotenv import load_dotenv
11
  import aiohttp
12
  from bs4 import BeautifulSoup
 
13
 
14
  # --- Configuration ---
 
15
  logging.basicConfig(level=logging.INFO)
16
  logger = logging.getLogger(__name__)
17
 
 
21
  if not LLM_API_KEY:
22
  raise RuntimeError("LLM_API_KEY must be set in a .env file.")
23
 
24
+ # API URLs and Models
25
  SNAPZION_API_URL = "https://search.snapzion.com/get-snippets"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  LLM_API_URL = "https://api.inference.net/v1/chat/completions"
27
  LLM_MODEL = "meta-llama/llama-3.1-8b-instruct/fp-8"
28
 
29
+ # Headers for external services
30
+ SNAPZION_HEADERS = { 'Content-Type': 'application/json', 'User-Agent': 'AI-Deep-Research-Agent/1.0' }
31
+ SCRAPING_HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36' }
32
+ LLM_HEADERS = { "Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json" }
33
+
34
+ # --- Pydantic Models for Request Body ---
35
+ class DeepResearchRequest(BaseModel):
36
+ query: str
37
+
38
  # --- FastAPI App Initialization ---
39
  app = FastAPI(
40
+ title="AI Deep Research API",
41
+ description="Provides single-shot AI search and streaming deep research completions.",
42
+ version="2.0.0"
43
  )
44
 
45
+ # --- Core Service Functions (Reused and New) ---
46
 
47
  async def call_snapzion_search(session: aiohttp.ClientSession, query: str) -> list:
48
  try:
 
51
  data = await response.json()
52
  return data.get("organic_results", [])
53
  except Exception as e:
54
+ logger.error(f"Snapzion search failed for query '{query}': {e}")
55
+ return [] # Return empty list on failure instead of crashing
56
 
 
57
  async def scrape_url(session: aiohttp.ClientSession, url: str) -> str:
58
+ if url.lower().endswith('.pdf'): return "Error: PDF content cannot be scraped."
 
 
59
  try:
 
60
  async with session.get(url, headers=SCRAPING_HEADERS, timeout=10, ssl=False) as response:
61
+ if response.status != 200: return f"Error: HTTP status {response.status}"
 
 
62
  html = await response.text()
63
  soup = BeautifulSoup(html, "html.parser")
64
  for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']):
65
  tag.decompose()
66
  return " ".join(soup.stripped_strings)
67
  except Exception as e:
68
+ logger.warning(f"Scraping failed for {url}: {e}")
69
+ return f"Error: {e}"
70
 
71
+ async def search_and_scrape(session: aiohttp.ClientSession, query: str) -> tuple[str, list]:
72
+ """Performs the search and scrape pipeline for a given query."""
73
+ search_results = await call_snapzion_search(session, query)
74
+ sources = search_results[:4] # Use top 4 sources per sub-query
75
+ if not sources: return "", []
76
 
77
+ scrape_tasks = [scrape_url(session, source["link"]) for source in sources]
78
+ scraped_contents = await asyncio.gather(*scrape_tasks)
79
 
80
+ context = "\n\n".join(
81
+ f"Source [{i+1}] (from {sources[i]['link']}):\n{content}"
82
+ for i, content in enumerate(scraped_contents) if not content.startswith("Error:")
83
+ )
84
+ return context, sources
85
 
86
+ # --- Streaming Deep Research Logic ---
87
 
88
+ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
89
+ """The main async generator for the deep research process."""
90
+
91
+ def format_sse(data: dict) -> str:
92
+ """Formats a dictionary as a Server-Sent Event string."""
93
+ return f"data: {json.dumps(data)}\n\n"
94
+
95
+ try:
96
+ async with aiohttp.ClientSession() as session:
97
+ # Step 1: Generate Sub-Questions
98
+ yield format_sse({"event": "status", "data": "Generating research plan..."})
99
+ sub_question_prompt = {
100
+ "model": LLM_MODEL,
101
+ "messages": [{
102
+ "role": "user",
103
+ "content": f"You are a research planner. Based on the user's query '{query}', generate a list of 3 to 4 crucial sub-questions that would form the basis of a comprehensive research report. Respond with ONLY a JSON array of strings. Example: [\"Question 1?\", \"Question 2?\"]"
104
+ }]
105
+ }
106
+ async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=sub_question_prompt) as response:
107
  response.raise_for_status()
108
  result = await response.json()
109
+ try:
110
+ sub_questions = json.loads(result['choices'][0]['message']['content'])
111
+ except (json.JSONDecodeError, IndexError):
112
+ yield format_sse({"event": "error", "data": "Failed to parse sub-questions from LLM."})
113
+ return
114
+
115
+ yield format_sse({"event": "plan", "data": sub_questions})
116
+
117
+ # Step 2: Concurrently research all sub-questions
118
+ research_tasks = [search_and_scrape(session, sq) for sq in sub_questions]
119
+ all_research_results = []
120
+
121
+ for i, task in enumerate(asyncio.as_completed(research_tasks)):
122
+ yield format_sse({"event": "status", "data": f"Researching: \"{sub_questions[i]}\""})
123
+ result = await task
124
+ all_research_results.append(result)
125
+
126
+ # Step 3: Consolidate all context and sources
127
+ yield format_sse({"event": "status", "data": "Consolidating research..."})
128
+ full_context = "\n\n---\n\n".join(res[0] for res in all_research_results if res[0])
129
+ all_sources = [source for res in all_research_results for source in res[1]]
130
+ unique_sources = list({s['link']: s for s in all_sources}.values()) # Deduplicate sources
131
+
132
+ if not full_context.strip():
133
+ yield format_sse({"event": "error", "data": "Failed to gather any research context."})
134
+ return
135
+
136
+ # Step 4: Generate the final report with streaming
137
+ yield format_sse({"event": "status", "data": "Generating final report..."})
138
+
139
+ final_report_prompt = f"""
140
+ You are a research analyst. Your task is to synthesize the provided context into a comprehensive, well-structured report on the topic: "{query}".
141
+ Use the context below exclusively. Do not use outside knowledge. Structure the report with markdown headings.
142
+
143
+ ## Research Context ##
144
+ {full_context}
145
+ """
146
+
147
+ final_report_payload = {
148
+ "model": LLM_MODEL,
149
+ "messages": [{"role": "user", "content": final_report_prompt}],
150
+ "stream": True # Enable streaming from the LLM
151
+ }
152
+
153
+ async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=final_report_payload) as response:
154
+ response.raise_for_status()
155
+ async for line in response.content:
156
+ if line.strip():
157
+ # The inference API might wrap its stream chunks in a 'data: ' prefix
158
+ line_str = line.decode('utf-8').strip()
159
+ if line_str.startswith('data:'):
160
+ line_str = line_str[5:].strip()
161
+ if line_str == "[DONE]":
162
+ break
163
+ try:
164
+ chunk = json.loads(line_str)
165
+ content = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
166
+ if content:
167
+ yield format_sse({"event": "chunk", "data": content})
168
+ except json.JSONDecodeError:
169
+ continue # Ignore empty or malformed lines
170
+
171
+ yield format_sse({"event": "sources", "data": unique_sources})
172
+
173
+ except Exception as e:
174
+ logger.error(f"An error occurred during deep research: {e}")
175
+ yield format_sse({"event": "error", "data": str(e)})
176
+ finally:
177
+ yield format_sse({"event": "done", "data": "Deep research complete."})
178
+
179
+
180
+ # --- API Endpoints ---
181
+
182
+ @app.get("/", include_in_schema=False)
183
  def root():
184
+ return {"message": "AI Deep Research API is active. See /docs for details."}
185
+
186
+ @app.post("/v1/deepresearch/completions")
187
+ async def deep_research_endpoint(request: DeepResearchRequest):
188
+ """
189
+ Performs a multi-step, streaming deep research task.
190
+
191
+ **Events Streamed:**
192
+ - `status`: Provides updates on the current stage of the process.
193
+ - `plan`: The list of sub-questions that will be researched.
194
+ - `chunk`: A piece of the final generated report.
195
+ - `sources`: The list of web sources used for the report.
196
+ - `error`: Indicates a fatal error occurred.
197
+ - `done`: Signals the end of the stream.
198
+ """
199
+ return StreamingResponse(
200
+ run_deep_research_stream(request.query),
201
+ media_type="text/event-stream"
202
+ )