|
|
import os |
|
|
import asyncio |
|
|
import json |
|
|
import logging |
|
|
import random |
|
|
import re |
|
|
import time |
|
|
from typing import AsyncGenerator, Optional, Tuple, List, Dict |
|
|
from urllib.parse import quote_plus, urlparse, unquote |
|
|
from fastapi import FastAPI, HTTPException |
|
|
from fastapi.responses import StreamingResponse |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from pydantic import BaseModel |
|
|
from dotenv import load_dotenv |
|
|
import aiohttp |
|
|
from bs4 import BeautifulSoup |
|
|
from fake_useragent import UserAgent |
|
|
from collections import defaultdict |
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
load_dotenv() |
|
|
|
|
|
LLM_API_KEY = os.getenv("LLM_API_KEY") |
|
|
if not LLM_API_KEY: |
|
|
raise RuntimeError("LLM_API_KEY must be set in a .env file.") |
|
|
else: |
|
|
logging.info("LLM API Key loaded successfully.") |
|
|
|
|
|
|
|
|
|
|
|
LLM_API_URL = "https://api.typegpt.net/v1/chat/completions" |
|
|
LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8" |
|
|
MAX_SOURCES_TO_PROCESS = 15 |
|
|
MAX_CONCURRENT_REQUESTS = 3 |
|
|
SEARCH_TIMEOUT = 180 |
|
|
TOTAL_TIMEOUT = 600 |
|
|
REQUEST_DELAY = 1.0 |
|
|
RETRY_ATTEMPTS = 3 |
|
|
RETRY_DELAY = 3.0 |
|
|
|
|
|
|
|
|
try: |
|
|
ua = UserAgent() |
|
|
except Exception: |
|
|
class SimpleUA: |
|
|
def random(self): |
|
|
return random.choice([ |
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36", |
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36", |
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0" |
|
|
]) |
|
|
ua = SimpleUA() |
|
|
|
|
|
LLM_HEADERS = { |
|
|
"Authorization": f"Bearer {LLM_API_KEY}", |
|
|
"Content-Type": "application/json", |
|
|
"Accept": "application/json" |
|
|
} |
|
|
|
|
|
class DeepResearchRequest(BaseModel): |
|
|
query: str |
|
|
search_time: int = 180 |
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="AI Deep Research API", |
|
|
description="Provides comprehensive, long-form research reports from real-time web searches.", |
|
|
version="4.1.0" |
|
|
) |
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def extract_json_from_llm_response(text: str) -> Optional[list]: |
|
|
"""Extracts a JSON array from the LLM's potentially messy string response.""" |
|
|
match = re.search(r'\[.*\]', text, re.DOTALL) |
|
|
if match: |
|
|
try: |
|
|
return json.loads(match.group(0)) |
|
|
except json.JSONDecodeError: |
|
|
logger.warning("Failed to decode JSON from LLM response.") |
|
|
return None |
|
|
return None |
|
|
|
|
|
async def get_real_user_agent() -> str: |
|
|
"""Provides a realistic, randomly rotated user agent string.""" |
|
|
return ua.random |
|
|
|
|
|
def clean_url(url: str) -> str: |
|
|
"""Cleans and normalizes URLs, especially from search engine redirects.""" |
|
|
if not url: |
|
|
return "" |
|
|
if url.startswith('//duckduckgo.com/l/'): |
|
|
try: |
|
|
parsed_url = urlparse(f"https:{url}") |
|
|
params = dict(p.split('=') for p in parsed_url.query.split('&')) |
|
|
return unquote(params.get('uddg', '')) |
|
|
except Exception: |
|
|
pass |
|
|
if url.startswith('//'): |
|
|
return 'https:' + url |
|
|
if not url.startswith(('http://', 'https://')): |
|
|
return 'https://' + url |
|
|
return url |
|
|
|
|
|
async def fetch_search_results(query: str, session: aiohttp.ClientSession, max_results: int = 10) -> List[dict]: |
|
|
"""Performs a web search using DuckDuckGo's HTML interface with robust retry logic.""" |
|
|
search_url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}" |
|
|
headers = {'User-Agent': await get_real_user_agent(), 'Accept-Language': 'en-US,en;q=0.5'} |
|
|
|
|
|
for attempt in range(RETRY_ATTEMPTS): |
|
|
try: |
|
|
async with session.get(search_url, headers=headers, timeout=10) as response: |
|
|
if response.status == 200: |
|
|
html = await response.text() |
|
|
soup = BeautifulSoup(html, 'html.parser') |
|
|
results = [] |
|
|
for result in soup.select('.result'): |
|
|
title_elem = result.select_one('.result__title a') |
|
|
snippet_elem = result.select_one('.result__snippet') |
|
|
if title_elem and snippet_elem and title_elem.has_attr('href'): |
|
|
link = clean_url(title_elem['href']) |
|
|
if link: |
|
|
results.append({ |
|
|
'title': title_elem.get_text(strip=True), |
|
|
'link': link, |
|
|
'snippet': snippet_elem.get_text(strip=True) |
|
|
}) |
|
|
if len(results) >= max_results: |
|
|
break |
|
|
logger.info(f"Found {len(results)} search results for query: '{query}'") |
|
|
return results |
|
|
else: |
|
|
logger.warning(f"Search attempt {attempt+1} for '{query}' failed with status: {response.status}") |
|
|
except Exception as e: |
|
|
logger.error(f"Search attempt {attempt+1} for '{query}' failed with error: {e}") |
|
|
if attempt < RETRY_ATTEMPTS - 1: |
|
|
await asyncio.sleep(RETRY_DELAY) |
|
|
return [] |
|
|
|
|
|
async def process_web_source(session: aiohttp.ClientSession, source: dict, timeout: int = 15) -> Tuple[Optional[str], dict]: |
|
|
"""Fetches and extracts the main textual content from a web page.""" |
|
|
url = source.get('link') |
|
|
if not url: |
|
|
return None, source |
|
|
|
|
|
headers = {'User-Agent': await get_real_user_agent()} |
|
|
source_info = source.copy() |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
if any(url.lower().endswith(ext) for ext in ['.pdf', '.jpg', '.png', '.zip', '.mp3', '.mp4']): |
|
|
return None, source |
|
|
async with session.get(url, headers=headers, timeout=timeout, ssl=False) as response: |
|
|
if response.status != 200 or 'text/html' not in response.headers.get('Content-Type', ''): |
|
|
return None, source |
|
|
html = await response.text() |
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']): |
|
|
tag.decompose() |
|
|
main_content = soup.select_one('main, article, #main, #content, .main, .post-content, .entry-content') |
|
|
if not main_content: |
|
|
main_content = soup.body |
|
|
if main_content: |
|
|
content = " ".join(main_content.stripped_strings) |
|
|
content = re.sub(r'\s{2,}', ' ', content).strip() |
|
|
if len(content.split()) < 50: |
|
|
return None, source |
|
|
source_info['word_count'] = len(content.split()) |
|
|
source_info['processing_time'] = time.time() - start_time |
|
|
return content, source_info |
|
|
except Exception as e: |
|
|
logger.warning(f"Error processing source {url}: {str(e)}") |
|
|
return None, source |
|
|
|
|
|
async def generate_research_plan(query: str, session: aiohttp.ClientSession) -> List[str]: |
|
|
"""Generates a list of sub-questions to guide the research process.""" |
|
|
plan_prompt = { |
|
|
"model": LLM_MODEL, |
|
|
"messages": [{ |
|
|
"role": "user", |
|
|
"content": f"""You are a research strategist. Your task is to generate 5 distinct, insightful sub-questions to form a comprehensive research plan for the topic: '{query}'. |
|
|
These questions will guide an AI in searching the web. Focus on different facets of the topic, such as its background, current state, key components, challenges, and future trends. |
|
|
Your response MUST be ONLY a raw JSON array of strings, with no other text or explanation. |
|
|
Example: ["What is the history of X?", "How does X compare to its main competitors?", "What are the primary use cases for X in 2025?"]""" |
|
|
}], |
|
|
"temperature": 0.7, |
|
|
"max_tokens": 500 |
|
|
} |
|
|
fallback_plan = [ |
|
|
f"What is the foundational definition and history of {query}?", |
|
|
f"What are the core components and key features of {query}?", |
|
|
f"Who are the major competitors or alternatives to {query}?", |
|
|
f"What are the primary challenges and limitations associated with {query}?", |
|
|
f"What are the latest trends and future predictions for {query}?" |
|
|
] |
|
|
try: |
|
|
async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=30) as response: |
|
|
if response.status == 200: |
|
|
result = await response.json() |
|
|
content = result['choices'][0]['message']['content'] |
|
|
sub_questions = extract_json_from_llm_response(content) |
|
|
if sub_questions and isinstance(sub_questions, list) and len(sub_questions) > 2: |
|
|
return [str(q) for q in sub_questions] |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to generate research plan: {e}") |
|
|
return fallback_plan |
|
|
|
|
|
async def run_deep_research_stream(query: str, search_time: int) -> AsyncGenerator[str, None]: |
|
|
"""The main orchestrator for the deep research process, yielding SSE events.""" |
|
|
|
|
|
def format_sse(data: dict) -> str: |
|
|
"""Formats a dictionary into an SSE message string.""" |
|
|
return f"data: {json.dumps(data)}\n\n" |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
yield format_sse({"event": "status", "data": "Initializing deep research..."}) |
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
yield format_sse({"event": "status", "data": "Step 1: Generating research plan..."}) |
|
|
plan = await generate_research_plan(query, session) |
|
|
yield format_sse({"event": "plan", "data": plan}) |
|
|
|
|
|
yield format_sse({"event": "status", "data": f"Step 2: Searching web for up to {search_time} seconds..."}) |
|
|
search_tasks = [fetch_search_results(q, session) for q in [query] + plan] |
|
|
search_results_lists = await asyncio.gather(*search_tasks) |
|
|
|
|
|
seen_urls = set() |
|
|
all_sources = [] |
|
|
for result_list in search_results_lists: |
|
|
for result in result_list: |
|
|
if result['link'] not in seen_urls: |
|
|
seen_urls.add(result['link']) |
|
|
all_sources.append(result) |
|
|
|
|
|
query_terms = query.lower().split() |
|
|
all_sources.sort(key=lambda s: sum(1 for term in query_terms if term in s['title'].lower()), reverse=True) |
|
|
selected_sources = all_sources[:MAX_SOURCES_TO_PROCESS] |
|
|
|
|
|
yield format_sse({"event": "found_sources", "data": selected_sources}) |
|
|
if not selected_sources: |
|
|
yield format_sse({"event": "error", "data": "Could not find any relevant web sources. Please try a different query."}) |
|
|
return |
|
|
|
|
|
yield format_sse({"event": "status", "data": f"Step 3: Processing {len(selected_sources)} sources..."}) |
|
|
|
|
|
semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) |
|
|
|
|
|
async def process_source_and_yield(source): |
|
|
"""Helper to yield status before and after processing a source.""" |
|
|
yield format_sse({"event": "processing_source", "data": {"link": source.get('link'), "title": source.get('title')}}) |
|
|
content, source_info = await process_web_source(session, source) |
|
|
if content: |
|
|
yield format_sse({"event": "processed_source_success", "data": source_info}) |
|
|
else: |
|
|
yield format_sse({"event": "processed_source_failure", "data": source_info}) |
|
|
return content, source_info |
|
|
|
|
|
async def process_with_semaphore(source): |
|
|
async with semaphore: |
|
|
results = [] |
|
|
async for result in process_source_and_yield(source): |
|
|
results.append(result) |
|
|
return results |
|
|
|
|
|
tasks = [asyncio.create_task(process_with_semaphore(source)) for source in selected_sources] |
|
|
|
|
|
consolidated_context = "" |
|
|
successful_sources = [] |
|
|
|
|
|
for task in asyncio.as_completed(tasks): |
|
|
results = await task |
|
|
for res in results: |
|
|
if isinstance(res, str): |
|
|
yield res |
|
|
|
|
|
|
|
|
content, source_info = results[-1] |
|
|
if content: |
|
|
consolidated_context += f"Source URL: {source_info['link']}\nSource Title: {source_info['title']}\n\nContent:\n{content}\n\n---\n\n" |
|
|
successful_sources.append(source_info) |
|
|
|
|
|
if not successful_sources: |
|
|
yield format_sse({"event": "error", "data": "Failed to extract content from any of the selected sources."}) |
|
|
return |
|
|
|
|
|
yield format_sse({"event": "status", "data": f"Step 4: Synthesizing report from {len(successful_sources)} sources..."}) |
|
|
|
|
|
report_prompt = f"""You are an expert research analyst. Your task is to write a comprehensive, in-depth, and exceptionally long-form report on the topic: "{query}". |
|
|
You have been provided with detailed content from {len(successful_sources)} web sources. You must use this information as the primary basis for your report. |
|
|
**Report Requirements:** |
|
|
1. **Length:** The final report must be extremely detailed and thorough, aiming for a word count between 4,000 and 8,000 words. Do not write a short summary. |
|
|
2. **Structure:** Organize the report into the following sections. Elaborate extensively within each one. |
|
|
- **Executive Summary:** A concise, high-level overview of the entire report. |
|
|
- **Introduction:** Set the context and background for '{query}'. |
|
|
- **Deep Dive Analysis:** This should be the largest part of the report. Break it down into multiple sub-sections based on the research plan: {', '.join(plan)}. Analyze, synthesize, and connect information from the provided sources. Do not just list facts; provide deep insights. |
|
|
- **Challenges and Limitations:** A dedicated analysis of the problems, criticisms, and hurdles related to the topic. |
|
|
- **Future Outlook and Predictions:** Analyze the future trends, potential developments, and expert predictions. |
|
|
- **Conclusion:** Summarize the key findings and provide a final, conclusive statement. |
|
|
3. **Citations:** While you write, you MUST cite the information you use. After a sentence or paragraph that uses a source, add a citation like `[Source: example.com/article]`. Use the actual URLs provided in the context. |
|
|
4. **Formatting:** Use Markdown for clear headings, subheadings, bullet points, and bold text to improve readability. |
|
|
**SOURCE MATERIAL:** |
|
|
{consolidated_context[:1000000]} |
|
|
Begin writing the full, comprehensive report now. Adhere strictly to all instructions. |
|
|
""" |
|
|
|
|
|
report_payload = { |
|
|
"model": LLM_MODEL, |
|
|
"messages": [{"role": "user", "content": report_prompt}], |
|
|
"stream": True, |
|
|
|
|
|
} |
|
|
|
|
|
async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload, timeout=TOTAL_TIMEOUT) as response: |
|
|
if response.status != 200: |
|
|
error_text = await response.text() |
|
|
yield format_sse({"event": "error", "data": f"Report generation failed with status {response.status}: {error_text}"}) |
|
|
return |
|
|
|
|
|
|
|
|
async for line in response.content: |
|
|
line_str = line.decode('utf-8').strip() |
|
|
if line_str.startswith('data:'): |
|
|
content = line_str[5:].strip() |
|
|
if content == "[DONE]": |
|
|
break |
|
|
try: |
|
|
chunk = json.loads(content) |
|
|
delta = chunk.get("choices", [{}])[0].get("delta", {}).get("content") |
|
|
if delta: |
|
|
|
|
|
yield format_sse({"event": "chunk", "data": delta}) |
|
|
except (json.JSONDecodeError, IndexError): |
|
|
continue |
|
|
|
|
|
duration = time.time() - start_time |
|
|
stats = { |
|
|
"duration_seconds": round(duration, 1), |
|
|
"sources_found": len(all_sources), |
|
|
"sources_processed": len(selected_sources), |
|
|
"sources_successful": len(successful_sources) |
|
|
} |
|
|
yield format_sse({"event": "stats", "data": stats}) |
|
|
yield format_sse({"event": "sources", "data": successful_sources}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.critical(f"A critical error occurred in the research stream: {e}", exc_info=True) |
|
|
yield format_sse({"event": "error", "data": f"An unexpected critical error occurred: {str(e)}"}) |
|
|
finally: |
|
|
yield format_sse({"event": "complete", "data": "Research process finished."}) |
|
|
|
|
|
@app.post("/deep-research", response_class=StreamingResponse) |
|
|
async def deep_research_endpoint(request: DeepResearchRequest): |
|
|
"""The main API endpoint for initiating a deep research task.""" |
|
|
query = request.query.strip() |
|
|
if not query or len(query) < 5: |
|
|
raise HTTPException(status_code=400, detail="Query is too short. Please provide a more detailed query.") |
|
|
search_time = max(60, min(request.search_time, 300)) |
|
|
|
|
|
return StreamingResponse( |
|
|
run_deep_research_stream(query, search_time), |
|
|
media_type="text/event-stream", |
|
|
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |