File size: 18,286 Bytes
4b17916 2a0098d 6142af3 46a015f 132c134 6c6c904 ad8dd04 6c6c904 6142af3 ffce11c 6142af3 2a0098d 4b17916 2a0098d 6c6c904 4d61bdb 4b17916 b5390ee 6c6c904 2a0098d 6c6c904 2a0098d 0e14740 6ac9507 2a0098d 46a015f b5390ee 4c88f38 31e12c0 b5390ee 46a015f 6c6c904 b5390ee 6c6c904 4b17916 6c6c904 1e679fd 6142af3 b5390ee 6142af3 ffce11c b5390ee 6c6c904 b5390ee ffce11c b5390ee 132c134 b5390ee cffab53 132c134 6c6c904 b5390ee 6c6c904 132c134 6c6c904 b5390ee 6c6c904 4d61bdb b5390ee 4d61bdb b5390ee 4d61bdb b5390ee 4d61bdb b5390ee 58de22e b5390ee 58de22e b5390ee 58de22e b5390ee 58de22e b5390ee 58de22e b5390ee 58de22e 6c6c904 b5390ee 6c6c904 b5390ee 6c6c904 4906187 b5390ee 4906187 b5390ee 6c6c904 b5390ee 6c6c904 b5390ee 6c6c904 b5390ee 4906187 b5390ee 6c6c904 b5390ee 6c6c904 b5390ee 6c6c904 b5390ee 6c6c904 b5390ee 4d61bdb b5390ee 6c6c904 b5390ee 6c6c904 b5390ee 6142af3 b5390ee 6c6c904 b5390ee 4d61bdb b5390ee 6c6c904 b5390ee 6c6c904 b5390ee 6c6c904 b5390ee cffab53 b5390ee cffab53 b5390ee cffab53 b5390ee cffab53 b5390ee ad8dd04 b5390ee ad8dd04 b5390ee cffab53 b5390ee cffab53 b5390ee cffab53 b5390ee cffab53 b5390ee cffab53 b5390ee ad8dd04 cffab53 b5390ee |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 |
import os
import asyncio
import json
import logging
import random
import re
import time
from typing import AsyncGenerator, Optional, Tuple, List, Dict
from urllib.parse import quote_plus, urlparse, unquote
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from dotenv import load_dotenv
import aiohttp
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from collections import defaultdict
# --- Configuration ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
if not LLM_API_KEY:
raise RuntimeError("LLM_API_KEY must be set in a .env file.")
else:
logging.info("LLM API Key loaded successfully.")
# --- Constants & Headers ---
LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
MAX_SOURCES_TO_PROCESS = 15
MAX_CONCURRENT_REQUESTS = 3
SEARCH_TIMEOUT = 180 # 3 minutes
TOTAL_TIMEOUT = 600 # 10 minutes total budget
REQUEST_DELAY = 1.0
RETRY_ATTEMPTS = 3
RETRY_DELAY = 3.0
# Initialize fake user agent generator
try:
ua = UserAgent()
except Exception:
class SimpleUA:
def random(self):
return random.choice([
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0"
])
ua = SimpleUA()
LLM_HEADERS = {
"Authorization": f"Bearer {LLM_API_KEY}",
"Content-Type": "application/json",
"Accept": "application/json"
}
class DeepResearchRequest(BaseModel):
query: str
search_time: int = 180 # Default to 3 minutes
# --- FastAPI App Initialization ---
app = FastAPI(
title="AI Deep Research API",
description="Provides comprehensive, long-form research reports from real-time web searches.",
version="4.1.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Helper Functions ---
def extract_json_from_llm_response(text: str) -> Optional[list]:
"""Extracts a JSON array from the LLM's potentially messy string response."""
match = re.search(r'\[.*\]', text, re.DOTALL)
if match:
try:
return json.loads(match.group(0))
except json.JSONDecodeError:
logger.warning("Failed to decode JSON from LLM response.")
return None
return None
async def get_real_user_agent() -> str:
"""Provides a realistic, randomly rotated user agent string."""
return ua.random
def clean_url(url: str) -> str:
"""Cleans and normalizes URLs, especially from search engine redirects."""
if not url:
return ""
if url.startswith('//duckduckgo.com/l/'):
try:
parsed_url = urlparse(f"https:{url}")
params = dict(p.split('=') for p in parsed_url.query.split('&'))
return unquote(params.get('uddg', ''))
except Exception:
pass
if url.startswith('//'):
return 'https:' + url
if not url.startswith(('http://', 'https://')):
return 'https://' + url
return url
async def fetch_search_results(query: str, session: aiohttp.ClientSession, max_results: int = 10) -> List[dict]:
"""Performs a web search using DuckDuckGo's HTML interface with robust retry logic."""
search_url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
headers = {'User-Agent': await get_real_user_agent(), 'Accept-Language': 'en-US,en;q=0.5'}
for attempt in range(RETRY_ATTEMPTS):
try:
async with session.get(search_url, headers=headers, timeout=10) as response:
if response.status == 200:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
results = []
for result in soup.select('.result'):
title_elem = result.select_one('.result__title a')
snippet_elem = result.select_one('.result__snippet')
if title_elem and snippet_elem and title_elem.has_attr('href'):
link = clean_url(title_elem['href'])
if link:
results.append({
'title': title_elem.get_text(strip=True),
'link': link,
'snippet': snippet_elem.get_text(strip=True)
})
if len(results) >= max_results:
break
logger.info(f"Found {len(results)} search results for query: '{query}'")
return results
else:
logger.warning(f"Search attempt {attempt+1} for '{query}' failed with status: {response.status}")
except Exception as e:
logger.error(f"Search attempt {attempt+1} for '{query}' failed with error: {e}")
if attempt < RETRY_ATTEMPTS - 1:
await asyncio.sleep(RETRY_DELAY)
return []
async def process_web_source(session: aiohttp.ClientSession, source: dict, timeout: int = 15) -> Tuple[Optional[str], dict]:
"""Fetches and extracts the main textual content from a web page."""
url = source.get('link')
if not url:
return None, source
headers = {'User-Agent': await get_real_user_agent()}
source_info = source.copy()
start_time = time.time()
try:
if any(url.lower().endswith(ext) for ext in ['.pdf', '.jpg', '.png', '.zip', '.mp3', '.mp4']):
return None, source
async with session.get(url, headers=headers, timeout=timeout, ssl=False) as response:
if response.status != 200 or 'text/html' not in response.headers.get('Content-Type', ''):
return None, source
html = await response.text()
soup = BeautifulSoup(html, "html.parser")
for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']):
tag.decompose()
main_content = soup.select_one('main, article, #main, #content, .main, .post-content, .entry-content')
if not main_content:
main_content = soup.body
if main_content:
content = " ".join(main_content.stripped_strings)
content = re.sub(r'\s{2,}', ' ', content).strip()
if len(content.split()) < 50:
return None, source
source_info['word_count'] = len(content.split())
source_info['processing_time'] = time.time() - start_time
return content, source_info
except Exception as e:
logger.warning(f"Error processing source {url}: {str(e)}")
return None, source
async def generate_research_plan(query: str, session: aiohttp.ClientSession) -> List[str]:
"""Generates a list of sub-questions to guide the research process."""
plan_prompt = {
"model": LLM_MODEL,
"messages": [{
"role": "user",
"content": f"""You are a research strategist. Your task is to generate 5 distinct, insightful sub-questions to form a comprehensive research plan for the topic: '{query}'.
These questions will guide an AI in searching the web. Focus on different facets of the topic, such as its background, current state, key components, challenges, and future trends.
Your response MUST be ONLY a raw JSON array of strings, with no other text or explanation.
Example: ["What is the history of X?", "How does X compare to its main competitors?", "What are the primary use cases for X in 2025?"]"""
}],
"temperature": 0.7,
"max_tokens": 500
}
fallback_plan = [
f"What is the foundational definition and history of {query}?",
f"What are the core components and key features of {query}?",
f"Who are the major competitors or alternatives to {query}?",
f"What are the primary challenges and limitations associated with {query}?",
f"What are the latest trends and future predictions for {query}?"
]
try:
async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=30) as response:
if response.status == 200:
result = await response.json()
content = result['choices'][0]['message']['content']
sub_questions = extract_json_from_llm_response(content)
if sub_questions and isinstance(sub_questions, list) and len(sub_questions) > 2:
return [str(q) for q in sub_questions]
except Exception as e:
logger.error(f"Failed to generate research plan: {e}")
return fallback_plan
async def run_deep_research_stream(query: str, search_time: int) -> AsyncGenerator[str, None]:
"""The main orchestrator for the deep research process, yielding SSE events."""
def format_sse(data: dict) -> str:
"""Formats a dictionary into an SSE message string."""
return f"data: {json.dumps(data)}\n\n"
start_time = time.time()
try:
yield format_sse({"event": "status", "data": "Initializing deep research..."})
async with aiohttp.ClientSession() as session:
yield format_sse({"event": "status", "data": "Step 1: Generating research plan..."})
plan = await generate_research_plan(query, session)
yield format_sse({"event": "plan", "data": plan})
yield format_sse({"event": "status", "data": f"Step 2: Searching web for up to {search_time} seconds..."})
search_tasks = [fetch_search_results(q, session) for q in [query] + plan]
search_results_lists = await asyncio.gather(*search_tasks)
seen_urls = set()
all_sources = []
for result_list in search_results_lists:
for result in result_list:
if result['link'] not in seen_urls:
seen_urls.add(result['link'])
all_sources.append(result)
query_terms = query.lower().split()
all_sources.sort(key=lambda s: sum(1 for term in query_terms if term in s['title'].lower()), reverse=True)
selected_sources = all_sources[:MAX_SOURCES_TO_PROCESS]
yield format_sse({"event": "found_sources", "data": selected_sources})
if not selected_sources:
yield format_sse({"event": "error", "data": "Could not find any relevant web sources. Please try a different query."})
return
yield format_sse({"event": "status", "data": f"Step 3: Processing {len(selected_sources)} sources..."})
semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
async def process_source_and_yield(source):
"""Helper to yield status before and after processing a source."""
yield format_sse({"event": "processing_source", "data": {"link": source.get('link'), "title": source.get('title')}})
content, source_info = await process_web_source(session, source)
if content:
yield format_sse({"event": "processed_source_success", "data": source_info})
else:
yield format_sse({"event": "processed_source_failure", "data": source_info})
return content, source_info
async def process_with_semaphore(source):
async with semaphore:
results = []
async for result in process_source_and_yield(source):
results.append(result)
return results
tasks = [asyncio.create_task(process_with_semaphore(source)) for source in selected_sources]
consolidated_context = ""
successful_sources = []
for task in asyncio.as_completed(tasks):
results = await task
for res in results:
if isinstance(res, str): # SSE event
yield res
# The actual return value is the last item
content, source_info = results[-1]
if content:
consolidated_context += f"Source URL: {source_info['link']}\nSource Title: {source_info['title']}\n\nContent:\n{content}\n\n---\n\n"
successful_sources.append(source_info)
if not successful_sources:
yield format_sse({"event": "error", "data": "Failed to extract content from any of the selected sources."})
return
yield format_sse({"event": "status", "data": f"Step 4: Synthesizing report from {len(successful_sources)} sources..."})
report_prompt = f"""You are an expert research analyst. Your task is to write a comprehensive, in-depth, and exceptionally long-form report on the topic: "{query}".
You have been provided with detailed content from {len(successful_sources)} web sources. You must use this information as the primary basis for your report.
**Report Requirements:**
1. **Length:** The final report must be extremely detailed and thorough, aiming for a word count between 4,000 and 8,000 words. Do not write a short summary.
2. **Structure:** Organize the report into the following sections. Elaborate extensively within each one.
- **Executive Summary:** A concise, high-level overview of the entire report.
- **Introduction:** Set the context and background for '{query}'.
- **Deep Dive Analysis:** This should be the largest part of the report. Break it down into multiple sub-sections based on the research plan: {', '.join(plan)}. Analyze, synthesize, and connect information from the provided sources. Do not just list facts; provide deep insights.
- **Challenges and Limitations:** A dedicated analysis of the problems, criticisms, and hurdles related to the topic.
- **Future Outlook and Predictions:** Analyze the future trends, potential developments, and expert predictions.
- **Conclusion:** Summarize the key findings and provide a final, conclusive statement.
3. **Citations:** While you write, you MUST cite the information you use. After a sentence or paragraph that uses a source, add a citation like `[Source: example.com/article]`. Use the actual URLs provided in the context.
4. **Formatting:** Use Markdown for clear headings, subheadings, bullet points, and bold text to improve readability.
**SOURCE MATERIAL:**
{consolidated_context[:1000000]}
Begin writing the full, comprehensive report now. Adhere strictly to all instructions.
"""
report_payload = {
"model": LLM_MODEL,
"messages": [{"role": "user", "content": report_prompt}],
"stream": True,
# **CHANGE**: max_tokens parameter has been removed to allow for automatic/unlimited length.
}
async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload, timeout=TOTAL_TIMEOUT) as response:
if response.status != 200:
error_text = await response.text()
yield format_sse({"event": "error", "data": f"Report generation failed with status {response.status}: {error_text}"})
return
# **CHANGE**: Switched to direct chunk streaming as requested.
async for line in response.content:
line_str = line.decode('utf-8').strip()
if line_str.startswith('data:'):
content = line_str[5:].strip()
if content == "[DONE]":
break
try:
chunk = json.loads(content)
delta = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
if delta:
# Stream chunks as they arrive without line buffering
yield format_sse({"event": "chunk", "data": delta})
except (json.JSONDecodeError, IndexError):
continue
duration = time.time() - start_time
stats = {
"duration_seconds": round(duration, 1),
"sources_found": len(all_sources),
"sources_processed": len(selected_sources),
"sources_successful": len(successful_sources)
}
yield format_sse({"event": "stats", "data": stats})
yield format_sse({"event": "sources", "data": successful_sources})
except Exception as e:
logger.critical(f"A critical error occurred in the research stream: {e}", exc_info=True)
yield format_sse({"event": "error", "data": f"An unexpected critical error occurred: {str(e)}"})
finally:
yield format_sse({"event": "complete", "data": "Research process finished."})
@app.post("/deep-research", response_class=StreamingResponse)
async def deep_research_endpoint(request: DeepResearchRequest):
"""The main API endpoint for initiating a deep research task."""
query = request.query.strip()
if not query or len(query) < 5:
raise HTTPException(status_code=400, detail="Query is too short. Please provide a more detailed query.")
search_time = max(60, min(request.search_time, 300))
return StreamingResponse(
run_deep_research_stream(query, search_time),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) |