File size: 10,994 Bytes
4b17916 2a0098d 6142af3 46a015f 132c134 46a015f 6142af3 5b2a6b6 6142af3 ffce11c 6142af3 2a0098d 4b17916 2a0098d 4b17916 1e679fd 0359a50 e1111e0 2a0098d 0e14740 6ac9507 2a0098d 46a015f 4c88f38 31e12c0 ffce11c 0359a50 46a015f 3c9a1a6 46a015f bc2abd9 46a015f 4b17916 46a015f 1e679fd 6142af3 ffce11c 0359a50 ffce11c 43aeff7 ffce11c 132c134 6ac9507 132c134 0359a50 0eacd1e 0359a50 0eacd1e 0359a50 d38cf69 0359a50 d38cf69 0359a50 2a0098d 0eacd1e 46a015f 4906187 0359a50 6ac9507 4906187 6ac9507 4906187 6ac9507 4906187 6ac9507 4906187 0359a50 4906187 2a0098d 6142af3 6ac9507 6142af3 6ac9507 5b2a6b6 bc2abd9 6ac9507 46a015f 6ac9507 5b2a6b6 6ac9507 5b2a6b6 6142af3 a38a28a 0359a50 46a015f 64f616b a38a28a 46a015f 0359a50 a38a28a 64f616b 43aeff7 a38a28a 64f616b 43aeff7 6ac9507 46a015f 0eacd1e 46a015f 1e679fd 0359a50 6142af3 46a015f ffce11c 43aeff7 6142af3 46a015f 132c134 6142af3 bc2abd9 6ac9507 bc2abd9 64f616b 6ac9507 64f616b 46a015f 6142af3 6ac9507 43aeff7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
import os
import asyncio
import json
import logging
import random
import re
from typing import AsyncGenerator, Optional, Tuple, List
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from dotenv import load_dotenv
import aiohttp
from bs4 import BeautifulSoup
# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
if not LLM_API_KEY:
raise RuntimeError("LLM_API_KEY must be set in a .env file.")
else:
logging.info("LLM API Key loaded successfully.")
# --- Constants & Headers ---
LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
MAX_SOURCES_TO_PROCESS = 15
SEARCH_PAGES_TO_FETCH = 2 # Fetch first 2 pages of results for each query
# Real Browser User Agents for SCRAPING
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0"
]
LLM_HEADERS = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json", "Accept": "application/json"}
class DeepResearchRequest(BaseModel):
query: str
app = FastAPI(
title="AI Deep Research API",
description="Provides robust, long-form, streaming deep research completions using a live, multi-page DuckDuckGo search.",
version="11.0.0" # Implemented robust, multi-page live web search
)
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
def extract_json_from_llm_response(text: str) -> Optional[list]:
match = re.search(r'\[.*\]', text, re.DOTALL)
if match:
try: return json.loads(match.group(0))
except json.JSONDecodeError: return None
return None
def parse_search_results(soup: BeautifulSoup) -> List[dict]:
"""Helper to parse results from a BeautifulSoup object."""
results = []
for result_div in soup.find_all('div', class_='result'):
title_elem = result_div.find('a', class_='result__a')
snippet_elem = result_div.find('a', class_='result__snippet')
if title_elem and snippet_elem:
link = title_elem.get('href')
title = title_elem.get_text(strip=True)
snippet = snippet_elem.get_text(strip=True)
if link and title and snippet:
results.append({'title': title, 'link': link, 'snippet': snippet})
return results
async def call_duckduckgo_search(session: aiohttp.ClientSession, query: str, max_results: int = 15) -> List[dict]:
"""
Performs a robust, multi-page search on DuckDuckGo's HTML interface.
"""
logger.info(f"Starting multi-page search for: '{query}'")
search_url = "https://html.duckduckgo.com/html/"
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': random.choice(USER_AGENTS),
'Referer': 'https://html.duckduckgo.com/'
}
all_results = []
payload = {'q': query}
try:
for page in range(SEARCH_PAGES_TO_FETCH):
logger.info(f"Searching page {page + 1} for '{query}'...")
async with session.post(search_url, data=payload, headers=headers, timeout=15) as response:
if response.status != 200:
logger.warning(f"Search for '{query}' page {page+1} returned status {response.status}. Stopping search for this query.")
break
html = await response.text()
soup = BeautifulSoup(html, "html.parser")
page_results = parse_search_results(soup)
all_results.extend(page_results)
# Find the 'Next' form to get parameters for the next page request
next_form = soup.find('form', action='/html/', method='post', string=lambda t: t and 'Next' in t)
if not next_form:
logger.info(f"No 'Next' page found for '{query}'. Ending search.")
break
# Update payload with hidden inputs for the next page
payload = {inp.get('name'): inp.get('value') for inp in next_form.find_all('input')}
if not payload:
logger.info(f"Could not find parameters for next page. Ending search.")
break
await asyncio.sleep(random.uniform(0.5, 1.5)) # Small delay to mimic human behavior
except Exception as e:
logger.error(f"An error occurred during multi-page search for '{query}': {e}", exc_info=True)
logger.info(f"Found a total of {len(all_results)} sources from {SEARCH_PAGES_TO_FETCH} pages for: '{query}'")
return all_results[:max_results]
async def research_and_process_source(session: aiohttp.ClientSession, source: dict) -> Tuple[str, dict]:
headers = {'User-Agent': random.choice(USER_AGENTS)}
try:
logger.info(f"Scraping: {source['link']}")
if source['link'].lower().endswith('.pdf'): raise ValueError("PDF content")
async with session.get(source['link'], headers=headers, timeout=10, ssl=False) as response:
if response.status != 200: raise ValueError(f"HTTP status {response.status}")
html = await response.text()
soup = BeautifulSoup(html, "html.parser")
for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): tag.decompose()
content = " ".join(soup.stripped_strings)
if not content.strip(): raise ValueError("Parsed content is empty.")
return content, source
except Exception as e:
logger.warning(f"Scraping failed for {source['link']} ({e}). Falling back to snippet.")
return source.get('snippet', ''), source
async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
def format_sse(data: dict) -> str: return f"data: {json.dumps(data)}\n\n"
try:
async with aiohttp.ClientSession() as session:
yield format_sse({"event": "status", "data": "Generating research plan..."})
plan_prompt = {"model": LLM_MODEL, "messages": [{"role": "user", "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array. Example: [\"Question 1?\"]"}]}
try:
async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=25) as response:
response.raise_for_status(); result = await response.json()
sub_questions = result if isinstance(result, list) else extract_json_from_llm_response(result['choices'][0]['message']['content'])
if not isinstance(sub_questions, list) or not sub_questions: raise ValueError(f"Invalid plan from LLM: {result}")
except Exception as e:
yield format_sse({"event": "error", "data": f"Could not generate research plan. Reason: {e}"}); return
yield format_sse({"event": "plan", "data": sub_questions})
yield format_sse({"event": "status", "data": f"Performing deep search for {len(sub_questions)} topics..."})
search_tasks = [call_duckduckgo_search(session, sq) for sq in sub_questions]
all_search_results = await asyncio.gather(*search_tasks)
unique_sources = list({source['link']: source for results in all_search_results for source in results}.values())
if not unique_sources:
yield format_sse({"event": "error", "data": f"The live multi-page search could not find any relevant sources for '{query}'. The topic might be too obscure."}); return
sources_to_process = unique_sources[:MAX_SOURCES_TO_PROCESS]
yield format_sse({"event": "status", "data": f"Found {len(unique_sources)} unique sources. Processing the top {len(sources_to_process)}..."})
processing_tasks = [research_and_process_source(session, source) for source in sources_to_process]
consolidated_context, all_sources_used = "", []
for task in asyncio.as_completed(processing_tasks):
content, source_info = await task
if content and content.strip():
consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
all_sources_used.append(source_info)
if not consolidated_context.strip():
yield format_sse({"event": "error", "data": "Found sources, but failed to scrape meaningful content from any of them."}); return
yield format_sse({"event": "status", "data": "Synthesizing final report..."})
report_prompt = f'Synthesize the provided context into a long-form, comprehensive, multi-page report on "{query}". Use markdown. Elaborate extensively on each point. Base your entire report ONLY on the provided context.\n\n## Research Context ##\n{consolidated_context}'
report_payload = {"model": LLM_MODEL, "messages": [{"role": "user", "content": report_prompt}], "stream": True}
async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
response.raise_for_status()
async for line in response.content:
line_str = line.decode('utf-8').strip()
if line_str.startswith('data:'): line_str = line_str[5:].strip()
if line_str == "[DONE]": break
try:
chunk = json.loads(line_str)
choices = chunk.get("choices")
if choices and isinstance(choices, list) and len(choices) > 0:
content = choices[0].get("delta", {}).get("content")
if content: yield format_sse({"event": "chunk", "data": content})
except json.JSONDecodeError: continue
yield format_sse({"event": "sources", "data": all_sources_used})
except Exception as e:
logging.error(f"A critical error occurred: {e}", exc_info=True)
yield format_sse({"event": "error", "data": f"An unexpected error occurred: {str(e)}"})
@app.post("/deep-research", response_class=StreamingResponse)
async def deep_research_endpoint(request: DeepResearchRequest):
return StreamingResponse(run_deep_research_stream(request.query), media_type="text/event-stream")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) |