Scrap / main.py
rkihacker's picture
Update main.py
b5390ee verified
raw
history blame
18.3 kB
import os
import asyncio
import json
import logging
import random
import re
import time
from typing import AsyncGenerator, Optional, Tuple, List, Dict
from urllib.parse import quote_plus, urlparse, unquote
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from dotenv import load_dotenv
import aiohttp
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from collections import defaultdict
# --- Configuration ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
if not LLM_API_KEY:
raise RuntimeError("LLM_API_KEY must be set in a .env file.")
else:
logging.info("LLM API Key loaded successfully.")
# --- Constants & Headers ---
LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
MAX_SOURCES_TO_PROCESS = 15
MAX_CONCURRENT_REQUESTS = 3
SEARCH_TIMEOUT = 180 # 3 minutes
TOTAL_TIMEOUT = 600 # 10 minutes total budget
REQUEST_DELAY = 1.0
RETRY_ATTEMPTS = 3
RETRY_DELAY = 3.0
# Initialize fake user agent generator
try:
ua = UserAgent()
except Exception:
class SimpleUA:
def random(self):
return random.choice([
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0"
])
ua = SimpleUA()
LLM_HEADERS = {
"Authorization": f"Bearer {LLM_API_KEY}",
"Content-Type": "application/json",
"Accept": "application/json"
}
class DeepResearchRequest(BaseModel):
query: str
search_time: int = 180 # Default to 3 minutes
# --- FastAPI App Initialization ---
app = FastAPI(
title="AI Deep Research API",
description="Provides comprehensive, long-form research reports from real-time web searches.",
version="4.1.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Helper Functions ---
def extract_json_from_llm_response(text: str) -> Optional[list]:
"""Extracts a JSON array from the LLM's potentially messy string response."""
match = re.search(r'\[.*\]', text, re.DOTALL)
if match:
try:
return json.loads(match.group(0))
except json.JSONDecodeError:
logger.warning("Failed to decode JSON from LLM response.")
return None
return None
async def get_real_user_agent() -> str:
"""Provides a realistic, randomly rotated user agent string."""
return ua.random
def clean_url(url: str) -> str:
"""Cleans and normalizes URLs, especially from search engine redirects."""
if not url:
return ""
if url.startswith('//duckduckgo.com/l/'):
try:
parsed_url = urlparse(f"https:{url}")
params = dict(p.split('=') for p in parsed_url.query.split('&'))
return unquote(params.get('uddg', ''))
except Exception:
pass
if url.startswith('//'):
return 'https:' + url
if not url.startswith(('http://', 'https://')):
return 'https://' + url
return url
async def fetch_search_results(query: str, session: aiohttp.ClientSession, max_results: int = 10) -> List[dict]:
"""Performs a web search using DuckDuckGo's HTML interface with robust retry logic."""
search_url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
headers = {'User-Agent': await get_real_user_agent(), 'Accept-Language': 'en-US,en;q=0.5'}
for attempt in range(RETRY_ATTEMPTS):
try:
async with session.get(search_url, headers=headers, timeout=10) as response:
if response.status == 200:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
results = []
for result in soup.select('.result'):
title_elem = result.select_one('.result__title a')
snippet_elem = result.select_one('.result__snippet')
if title_elem and snippet_elem and title_elem.has_attr('href'):
link = clean_url(title_elem['href'])
if link:
results.append({
'title': title_elem.get_text(strip=True),
'link': link,
'snippet': snippet_elem.get_text(strip=True)
})
if len(results) >= max_results:
break
logger.info(f"Found {len(results)} search results for query: '{query}'")
return results
else:
logger.warning(f"Search attempt {attempt+1} for '{query}' failed with status: {response.status}")
except Exception as e:
logger.error(f"Search attempt {attempt+1} for '{query}' failed with error: {e}")
if attempt < RETRY_ATTEMPTS - 1:
await asyncio.sleep(RETRY_DELAY)
return []
async def process_web_source(session: aiohttp.ClientSession, source: dict, timeout: int = 15) -> Tuple[Optional[str], dict]:
"""Fetches and extracts the main textual content from a web page."""
url = source.get('link')
if not url:
return None, source
headers = {'User-Agent': await get_real_user_agent()}
source_info = source.copy()
start_time = time.time()
try:
if any(url.lower().endswith(ext) for ext in ['.pdf', '.jpg', '.png', '.zip', '.mp3', '.mp4']):
return None, source
async with session.get(url, headers=headers, timeout=timeout, ssl=False) as response:
if response.status != 200 or 'text/html' not in response.headers.get('Content-Type', ''):
return None, source
html = await response.text()
soup = BeautifulSoup(html, "html.parser")
for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']):
tag.decompose()
main_content = soup.select_one('main, article, #main, #content, .main, .post-content, .entry-content')
if not main_content:
main_content = soup.body
if main_content:
content = " ".join(main_content.stripped_strings)
content = re.sub(r'\s{2,}', ' ', content).strip()
if len(content.split()) < 50:
return None, source
source_info['word_count'] = len(content.split())
source_info['processing_time'] = time.time() - start_time
return content, source_info
except Exception as e:
logger.warning(f"Error processing source {url}: {str(e)}")
return None, source
async def generate_research_plan(query: str, session: aiohttp.ClientSession) -> List[str]:
"""Generates a list of sub-questions to guide the research process."""
plan_prompt = {
"model": LLM_MODEL,
"messages": [{
"role": "user",
"content": f"""You are a research strategist. Your task is to generate 5 distinct, insightful sub-questions to form a comprehensive research plan for the topic: '{query}'.
These questions will guide an AI in searching the web. Focus on different facets of the topic, such as its background, current state, key components, challenges, and future trends.
Your response MUST be ONLY a raw JSON array of strings, with no other text or explanation.
Example: ["What is the history of X?", "How does X compare to its main competitors?", "What are the primary use cases for X in 2025?"]"""
}],
"temperature": 0.7,
"max_tokens": 500
}
fallback_plan = [
f"What is the foundational definition and history of {query}?",
f"What are the core components and key features of {query}?",
f"Who are the major competitors or alternatives to {query}?",
f"What are the primary challenges and limitations associated with {query}?",
f"What are the latest trends and future predictions for {query}?"
]
try:
async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=30) as response:
if response.status == 200:
result = await response.json()
content = result['choices'][0]['message']['content']
sub_questions = extract_json_from_llm_response(content)
if sub_questions and isinstance(sub_questions, list) and len(sub_questions) > 2:
return [str(q) for q in sub_questions]
except Exception as e:
logger.error(f"Failed to generate research plan: {e}")
return fallback_plan
async def run_deep_research_stream(query: str, search_time: int) -> AsyncGenerator[str, None]:
"""The main orchestrator for the deep research process, yielding SSE events."""
def format_sse(data: dict) -> str:
"""Formats a dictionary into an SSE message string."""
return f"data: {json.dumps(data)}\n\n"
start_time = time.time()
try:
yield format_sse({"event": "status", "data": "Initializing deep research..."})
async with aiohttp.ClientSession() as session:
yield format_sse({"event": "status", "data": "Step 1: Generating research plan..."})
plan = await generate_research_plan(query, session)
yield format_sse({"event": "plan", "data": plan})
yield format_sse({"event": "status", "data": f"Step 2: Searching web for up to {search_time} seconds..."})
search_tasks = [fetch_search_results(q, session) for q in [query] + plan]
search_results_lists = await asyncio.gather(*search_tasks)
seen_urls = set()
all_sources = []
for result_list in search_results_lists:
for result in result_list:
if result['link'] not in seen_urls:
seen_urls.add(result['link'])
all_sources.append(result)
query_terms = query.lower().split()
all_sources.sort(key=lambda s: sum(1 for term in query_terms if term in s['title'].lower()), reverse=True)
selected_sources = all_sources[:MAX_SOURCES_TO_PROCESS]
yield format_sse({"event": "found_sources", "data": selected_sources})
if not selected_sources:
yield format_sse({"event": "error", "data": "Could not find any relevant web sources. Please try a different query."})
return
yield format_sse({"event": "status", "data": f"Step 3: Processing {len(selected_sources)} sources..."})
semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
async def process_source_and_yield(source):
"""Helper to yield status before and after processing a source."""
yield format_sse({"event": "processing_source", "data": {"link": source.get('link'), "title": source.get('title')}})
content, source_info = await process_web_source(session, source)
if content:
yield format_sse({"event": "processed_source_success", "data": source_info})
else:
yield format_sse({"event": "processed_source_failure", "data": source_info})
return content, source_info
async def process_with_semaphore(source):
async with semaphore:
results = []
async for result in process_source_and_yield(source):
results.append(result)
return results
tasks = [asyncio.create_task(process_with_semaphore(source)) for source in selected_sources]
consolidated_context = ""
successful_sources = []
for task in asyncio.as_completed(tasks):
results = await task
for res in results:
if isinstance(res, str): # SSE event
yield res
# The actual return value is the last item
content, source_info = results[-1]
if content:
consolidated_context += f"Source URL: {source_info['link']}\nSource Title: {source_info['title']}\n\nContent:\n{content}\n\n---\n\n"
successful_sources.append(source_info)
if not successful_sources:
yield format_sse({"event": "error", "data": "Failed to extract content from any of the selected sources."})
return
yield format_sse({"event": "status", "data": f"Step 4: Synthesizing report from {len(successful_sources)} sources..."})
report_prompt = f"""You are an expert research analyst. Your task is to write a comprehensive, in-depth, and exceptionally long-form report on the topic: "{query}".
You have been provided with detailed content from {len(successful_sources)} web sources. You must use this information as the primary basis for your report.
**Report Requirements:**
1. **Length:** The final report must be extremely detailed and thorough, aiming for a word count between 4,000 and 8,000 words. Do not write a short summary.
2. **Structure:** Organize the report into the following sections. Elaborate extensively within each one.
- **Executive Summary:** A concise, high-level overview of the entire report.
- **Introduction:** Set the context and background for '{query}'.
- **Deep Dive Analysis:** This should be the largest part of the report. Break it down into multiple sub-sections based on the research plan: {', '.join(plan)}. Analyze, synthesize, and connect information from the provided sources. Do not just list facts; provide deep insights.
- **Challenges and Limitations:** A dedicated analysis of the problems, criticisms, and hurdles related to the topic.
- **Future Outlook and Predictions:** Analyze the future trends, potential developments, and expert predictions.
- **Conclusion:** Summarize the key findings and provide a final, conclusive statement.
3. **Citations:** While you write, you MUST cite the information you use. After a sentence or paragraph that uses a source, add a citation like `[Source: example.com/article]`. Use the actual URLs provided in the context.
4. **Formatting:** Use Markdown for clear headings, subheadings, bullet points, and bold text to improve readability.
**SOURCE MATERIAL:**
{consolidated_context[:1000000]}
Begin writing the full, comprehensive report now. Adhere strictly to all instructions.
"""
report_payload = {
"model": LLM_MODEL,
"messages": [{"role": "user", "content": report_prompt}],
"stream": True,
# **CHANGE**: max_tokens parameter has been removed to allow for automatic/unlimited length.
}
async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload, timeout=TOTAL_TIMEOUT) as response:
if response.status != 200:
error_text = await response.text()
yield format_sse({"event": "error", "data": f"Report generation failed with status {response.status}: {error_text}"})
return
# **CHANGE**: Switched to direct chunk streaming as requested.
async for line in response.content:
line_str = line.decode('utf-8').strip()
if line_str.startswith('data:'):
content = line_str[5:].strip()
if content == "[DONE]":
break
try:
chunk = json.loads(content)
delta = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
if delta:
# Stream chunks as they arrive without line buffering
yield format_sse({"event": "chunk", "data": delta})
except (json.JSONDecodeError, IndexError):
continue
duration = time.time() - start_time
stats = {
"duration_seconds": round(duration, 1),
"sources_found": len(all_sources),
"sources_processed": len(selected_sources),
"sources_successful": len(successful_sources)
}
yield format_sse({"event": "stats", "data": stats})
yield format_sse({"event": "sources", "data": successful_sources})
except Exception as e:
logger.critical(f"A critical error occurred in the research stream: {e}", exc_info=True)
yield format_sse({"event": "error", "data": f"An unexpected critical error occurred: {str(e)}"})
finally:
yield format_sse({"event": "complete", "data": "Research process finished."})
@app.post("/deep-research", response_class=StreamingResponse)
async def deep_research_endpoint(request: DeepResearchRequest):
"""The main API endpoint for initiating a deep research task."""
query = request.query.strip()
if not query or len(query) < 5:
raise HTTPException(status_code=400, detail="Query is too short. Please provide a more detailed query.")
search_time = max(60, min(request.search_time, 300))
return StreamingResponse(
run_deep_research_stream(query, search_time),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)