|
|
import os |
|
|
import asyncio |
|
|
import json |
|
|
import logging |
|
|
import random |
|
|
import re |
|
|
from typing import AsyncGenerator, Optional, Tuple, List |
|
|
|
|
|
from fastapi import FastAPI |
|
|
from fastapi.responses import StreamingResponse |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from pydantic import BaseModel |
|
|
from dotenv import load_dotenv |
|
|
import aiohttp |
|
|
from bs4 import BeautifulSoup |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
load_dotenv() |
|
|
LLM_API_KEY = os.getenv("LLM_API_KEY") |
|
|
|
|
|
if not LLM_API_KEY: |
|
|
raise RuntimeError("LLM_API_KEY must be set in a .env file.") |
|
|
else: |
|
|
logging.info("LLM API Key loaded successfully.") |
|
|
|
|
|
|
|
|
LLM_API_URL = "https://api.typegpt.net/v1/chat/completions" |
|
|
LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8" |
|
|
MAX_SOURCES_TO_PROCESS = 15 |
|
|
SEARCH_PAGES_TO_FETCH = 2 |
|
|
|
|
|
|
|
|
USER_AGENTS = [ |
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36", |
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36", |
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0" |
|
|
] |
|
|
|
|
|
LLM_HEADERS = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json", "Accept": "application/json"} |
|
|
|
|
|
class DeepResearchRequest(BaseModel): |
|
|
query: str |
|
|
|
|
|
app = FastAPI( |
|
|
title="AI Deep Research API", |
|
|
description="Provides robust, long-form, streaming deep research completions using a live, multi-page DuckDuckGo search.", |
|
|
version="11.0.0" |
|
|
) |
|
|
|
|
|
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) |
|
|
|
|
|
def extract_json_from_llm_response(text: str) -> Optional[list]: |
|
|
match = re.search(r'\[.*\]', text, re.DOTALL) |
|
|
if match: |
|
|
try: return json.loads(match.group(0)) |
|
|
except json.JSONDecodeError: return None |
|
|
return None |
|
|
|
|
|
def parse_search_results(soup: BeautifulSoup) -> List[dict]: |
|
|
"""Helper to parse results from a BeautifulSoup object.""" |
|
|
results = [] |
|
|
for result_div in soup.find_all('div', class_='result'): |
|
|
title_elem = result_div.find('a', class_='result__a') |
|
|
snippet_elem = result_div.find('a', class_='result__snippet') |
|
|
if title_elem and snippet_elem: |
|
|
link = title_elem.get('href') |
|
|
title = title_elem.get_text(strip=True) |
|
|
snippet = snippet_elem.get_text(strip=True) |
|
|
if link and title and snippet: |
|
|
results.append({'title': title, 'link': link, 'snippet': snippet}) |
|
|
return results |
|
|
|
|
|
async def call_duckduckgo_search(session: aiohttp.ClientSession, query: str, max_results: int = 15) -> List[dict]: |
|
|
""" |
|
|
Performs a robust, multi-page search on DuckDuckGo's HTML interface. |
|
|
""" |
|
|
logger.info(f"Starting multi-page search for: '{query}'") |
|
|
search_url = "https://html.duckduckgo.com/html/" |
|
|
|
|
|
headers = { |
|
|
'Content-Type': 'application/x-www-form-urlencoded', |
|
|
'User-Agent': random.choice(USER_AGENTS), |
|
|
'Referer': 'https://html.duckduckgo.com/' |
|
|
} |
|
|
|
|
|
all_results = [] |
|
|
payload = {'q': query} |
|
|
|
|
|
try: |
|
|
for page in range(SEARCH_PAGES_TO_FETCH): |
|
|
logger.info(f"Searching page {page + 1} for '{query}'...") |
|
|
async with session.post(search_url, data=payload, headers=headers, timeout=15) as response: |
|
|
if response.status != 200: |
|
|
logger.warning(f"Search for '{query}' page {page+1} returned status {response.status}. Stopping search for this query.") |
|
|
break |
|
|
|
|
|
html = await response.text() |
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
|
|
|
page_results = parse_search_results(soup) |
|
|
all_results.extend(page_results) |
|
|
|
|
|
|
|
|
next_form = soup.find('form', action='/html/', method='post', string=lambda t: t and 'Next' in t) |
|
|
if not next_form: |
|
|
logger.info(f"No 'Next' page found for '{query}'. Ending search.") |
|
|
break |
|
|
|
|
|
|
|
|
payload = {inp.get('name'): inp.get('value') for inp in next_form.find_all('input')} |
|
|
if not payload: |
|
|
logger.info(f"Could not find parameters for next page. Ending search.") |
|
|
break |
|
|
|
|
|
await asyncio.sleep(random.uniform(0.5, 1.5)) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"An error occurred during multi-page search for '{query}': {e}", exc_info=True) |
|
|
|
|
|
logger.info(f"Found a total of {len(all_results)} sources from {SEARCH_PAGES_TO_FETCH} pages for: '{query}'") |
|
|
return all_results[:max_results] |
|
|
|
|
|
|
|
|
async def research_and_process_source(session: aiohttp.ClientSession, source: dict) -> Tuple[str, dict]: |
|
|
headers = {'User-Agent': random.choice(USER_AGENTS)} |
|
|
try: |
|
|
logger.info(f"Scraping: {source['link']}") |
|
|
if source['link'].lower().endswith('.pdf'): raise ValueError("PDF content") |
|
|
async with session.get(source['link'], headers=headers, timeout=10, ssl=False) as response: |
|
|
if response.status != 200: raise ValueError(f"HTTP status {response.status}") |
|
|
html = await response.text() |
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): tag.decompose() |
|
|
content = " ".join(soup.stripped_strings) |
|
|
if not content.strip(): raise ValueError("Parsed content is empty.") |
|
|
return content, source |
|
|
except Exception as e: |
|
|
logger.warning(f"Scraping failed for {source['link']} ({e}). Falling back to snippet.") |
|
|
return source.get('snippet', ''), source |
|
|
|
|
|
async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]: |
|
|
def format_sse(data: dict) -> str: return f"data: {json.dumps(data)}\n\n" |
|
|
try: |
|
|
async with aiohttp.ClientSession() as session: |
|
|
yield format_sse({"event": "status", "data": "Generating research plan..."}) |
|
|
plan_prompt = {"model": LLM_MODEL, "messages": [{"role": "user", "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array. Example: [\"Question 1?\"]"}]} |
|
|
try: |
|
|
async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=25) as response: |
|
|
response.raise_for_status(); result = await response.json() |
|
|
sub_questions = result if isinstance(result, list) else extract_json_from_llm_response(result['choices'][0]['message']['content']) |
|
|
if not isinstance(sub_questions, list) or not sub_questions: raise ValueError(f"Invalid plan from LLM: {result}") |
|
|
except Exception as e: |
|
|
yield format_sse({"event": "error", "data": f"Could not generate research plan. Reason: {e}"}); return |
|
|
|
|
|
yield format_sse({"event": "plan", "data": sub_questions}) |
|
|
|
|
|
yield format_sse({"event": "status", "data": f"Performing deep search for {len(sub_questions)} topics..."}) |
|
|
search_tasks = [call_duckduckgo_search(session, sq) for sq in sub_questions] |
|
|
all_search_results = await asyncio.gather(*search_tasks) |
|
|
unique_sources = list({source['link']: source for results in all_search_results for source in results}.values()) |
|
|
|
|
|
if not unique_sources: |
|
|
yield format_sse({"event": "error", "data": f"The live multi-page search could not find any relevant sources for '{query}'. The topic might be too obscure."}); return |
|
|
|
|
|
sources_to_process = unique_sources[:MAX_SOURCES_TO_PROCESS] |
|
|
yield format_sse({"event": "status", "data": f"Found {len(unique_sources)} unique sources. Processing the top {len(sources_to_process)}..."}) |
|
|
|
|
|
processing_tasks = [research_and_process_source(session, source) for source in sources_to_process] |
|
|
consolidated_context, all_sources_used = "", [] |
|
|
|
|
|
for task in asyncio.as_completed(processing_tasks): |
|
|
content, source_info = await task |
|
|
if content and content.strip(): |
|
|
consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n" |
|
|
all_sources_used.append(source_info) |
|
|
|
|
|
if not consolidated_context.strip(): |
|
|
yield format_sse({"event": "error", "data": "Found sources, but failed to scrape meaningful content from any of them."}); return |
|
|
|
|
|
yield format_sse({"event": "status", "data": "Synthesizing final report..."}) |
|
|
report_prompt = f'Synthesize the provided context into a long-form, comprehensive, multi-page report on "{query}". Use markdown. Elaborate extensively on each point. Base your entire report ONLY on the provided context.\n\n## Research Context ##\n{consolidated_context}' |
|
|
report_payload = {"model": LLM_MODEL, "messages": [{"role": "user", "content": report_prompt}], "stream": True} |
|
|
|
|
|
async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response: |
|
|
response.raise_for_status() |
|
|
async for line in response.content: |
|
|
line_str = line.decode('utf-8').strip() |
|
|
if line_str.startswith('data:'): line_str = line_str[5:].strip() |
|
|
if line_str == "[DONE]": break |
|
|
try: |
|
|
chunk = json.loads(line_str) |
|
|
choices = chunk.get("choices") |
|
|
if choices and isinstance(choices, list) and len(choices) > 0: |
|
|
content = choices[0].get("delta", {}).get("content") |
|
|
if content: yield format_sse({"event": "chunk", "data": content}) |
|
|
except json.JSONDecodeError: continue |
|
|
|
|
|
yield format_sse({"event": "sources", "data": all_sources_used}) |
|
|
except Exception as e: |
|
|
logging.error(f"A critical error occurred: {e}", exc_info=True) |
|
|
yield format_sse({"event": "error", "data": f"An unexpected error occurred: {str(e)}"}) |
|
|
|
|
|
@app.post("/deep-research", response_class=StreamingResponse) |
|
|
async def deep_research_endpoint(request: DeepResearchRequest): |
|
|
return StreamingResponse(run_deep_research_stream(request.query), media_type="text/event-stream") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |