File size: 10,994 Bytes
4b17916
2a0098d
6142af3
 
46a015f
132c134
46a015f
6142af3
5b2a6b6
6142af3
ffce11c
6142af3
2a0098d
4b17916
2a0098d
4b17916
 
1e679fd
0359a50
e1111e0
2a0098d
 
 
 
 
0e14740
6ac9507
2a0098d
46a015f
4c88f38
31e12c0
ffce11c
0359a50
46a015f
3c9a1a6
46a015f
 
 
bc2abd9
46a015f
4b17916
46a015f
1e679fd
6142af3
 
 
ffce11c
 
0359a50
 
ffce11c
 
43aeff7
ffce11c
132c134
 
 
6ac9507
 
132c134
 
0359a50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0eacd1e
0359a50
0eacd1e
0359a50
 
d38cf69
0359a50
 
 
 
 
d38cf69
0359a50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2a0098d
0eacd1e
46a015f
4906187
 
0359a50
6ac9507
4906187
6ac9507
4906187
 
6ac9507
4906187
6ac9507
4906187
 
0359a50
4906187
2a0098d
6142af3
6ac9507
6142af3
 
 
6ac9507
5b2a6b6
bc2abd9
6ac9507
46a015f
6ac9507
5b2a6b6
6ac9507
5b2a6b6
6142af3
a38a28a
0359a50
 
46a015f
64f616b
a38a28a
46a015f
0359a50
a38a28a
64f616b
43aeff7
a38a28a
64f616b
43aeff7
6ac9507
46a015f
 
0eacd1e
46a015f
 
 
1e679fd
0359a50
6142af3
46a015f
ffce11c
43aeff7
6142af3
46a015f
132c134
6142af3
bc2abd9
6ac9507
 
bc2abd9
 
64f616b
 
 
6ac9507
 
64f616b
46a015f
6142af3
6ac9507
43aeff7
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import os
import asyncio
import json
import logging
import random
import re
from typing import AsyncGenerator, Optional, Tuple, List

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from dotenv import load_dotenv
import aiohttp
from bs4 import BeautifulSoup

# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")

if not LLM_API_KEY:
    raise RuntimeError("LLM_API_KEY must be set in a .env file.")
else:
    logging.info("LLM API Key loaded successfully.")

# --- Constants & Headers ---
LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
MAX_SOURCES_TO_PROCESS = 15
SEARCH_PAGES_TO_FETCH = 2 # Fetch first 2 pages of results for each query

# Real Browser User Agents for SCRAPING
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0"
]

LLM_HEADERS = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json", "Accept": "application/json"}

class DeepResearchRequest(BaseModel):
    query: str

app = FastAPI(
    title="AI Deep Research API",
    description="Provides robust, long-form, streaming deep research completions using a live, multi-page DuckDuckGo search.",
    version="11.0.0"  # Implemented robust, multi-page live web search
)

app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])

def extract_json_from_llm_response(text: str) -> Optional[list]:
    match = re.search(r'\[.*\]', text, re.DOTALL)
    if match:
        try: return json.loads(match.group(0))
        except json.JSONDecodeError: return None
    return None

def parse_search_results(soup: BeautifulSoup) -> List[dict]:
    """Helper to parse results from a BeautifulSoup object."""
    results = []
    for result_div in soup.find_all('div', class_='result'):
        title_elem = result_div.find('a', class_='result__a')
        snippet_elem = result_div.find('a', class_='result__snippet')
        if title_elem and snippet_elem:
            link = title_elem.get('href')
            title = title_elem.get_text(strip=True)
            snippet = snippet_elem.get_text(strip=True)
            if link and title and snippet:
                results.append({'title': title, 'link': link, 'snippet': snippet})
    return results

async def call_duckduckgo_search(session: aiohttp.ClientSession, query: str, max_results: int = 15) -> List[dict]:
    """
    Performs a robust, multi-page search on DuckDuckGo's HTML interface.
    """
    logger.info(f"Starting multi-page search for: '{query}'")
    search_url = "https://html.duckduckgo.com/html/"
    
    headers = {
        'Content-Type': 'application/x-www-form-urlencoded',
        'User-Agent': random.choice(USER_AGENTS),
        'Referer': 'https://html.duckduckgo.com/'
    }
    
    all_results = []
    payload = {'q': query}
    
    try:
        for page in range(SEARCH_PAGES_TO_FETCH):
            logger.info(f"Searching page {page + 1} for '{query}'...")
            async with session.post(search_url, data=payload, headers=headers, timeout=15) as response:
                if response.status != 200:
                    logger.warning(f"Search for '{query}' page {page+1} returned status {response.status}. Stopping search for this query.")
                    break
                
                html = await response.text()
                soup = BeautifulSoup(html, "html.parser")
                
                page_results = parse_search_results(soup)
                all_results.extend(page_results)
                
                # Find the 'Next' form to get parameters for the next page request
                next_form = soup.find('form', action='/html/', method='post', string=lambda t: t and 'Next' in t)
                if not next_form:
                    logger.info(f"No 'Next' page found for '{query}'. Ending search.")
                    break
                
                # Update payload with hidden inputs for the next page
                payload = {inp.get('name'): inp.get('value') for inp in next_form.find_all('input')}
                if not payload:
                     logger.info(f"Could not find parameters for next page. Ending search.")
                     break
                
                await asyncio.sleep(random.uniform(0.5, 1.5)) # Small delay to mimic human behavior

    except Exception as e:
        logger.error(f"An error occurred during multi-page search for '{query}': {e}", exc_info=True)

    logger.info(f"Found a total of {len(all_results)} sources from {SEARCH_PAGES_TO_FETCH} pages for: '{query}'")
    return all_results[:max_results]


async def research_and_process_source(session: aiohttp.ClientSession, source: dict) -> Tuple[str, dict]:
    headers = {'User-Agent': random.choice(USER_AGENTS)}
    try:
        logger.info(f"Scraping: {source['link']}")
        if source['link'].lower().endswith('.pdf'): raise ValueError("PDF content")
        async with session.get(source['link'], headers=headers, timeout=10, ssl=False) as response:
            if response.status != 200: raise ValueError(f"HTTP status {response.status}")
            html = await response.text()
            soup = BeautifulSoup(html, "html.parser")
            for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): tag.decompose()
            content = " ".join(soup.stripped_strings)
            if not content.strip(): raise ValueError("Parsed content is empty.")
            return content, source
    except Exception as e:
        logger.warning(f"Scraping failed for {source['link']} ({e}). Falling back to snippet.")
        return source.get('snippet', ''), source

async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
    def format_sse(data: dict) -> str: return f"data: {json.dumps(data)}\n\n"
    try:
        async with aiohttp.ClientSession() as session:
            yield format_sse({"event": "status", "data": "Generating research plan..."})
            plan_prompt = {"model": LLM_MODEL, "messages": [{"role": "user", "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array. Example: [\"Question 1?\"]"}]}
            try:
                async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=25) as response:
                    response.raise_for_status(); result = await response.json()
                    sub_questions = result if isinstance(result, list) else extract_json_from_llm_response(result['choices'][0]['message']['content'])
                    if not isinstance(sub_questions, list) or not sub_questions: raise ValueError(f"Invalid plan from LLM: {result}")
            except Exception as e:
                yield format_sse({"event": "error", "data": f"Could not generate research plan. Reason: {e}"}); return

            yield format_sse({"event": "plan", "data": sub_questions})

            yield format_sse({"event": "status", "data": f"Performing deep search for {len(sub_questions)} topics..."})
            search_tasks = [call_duckduckgo_search(session, sq) for sq in sub_questions]
            all_search_results = await asyncio.gather(*search_tasks)
            unique_sources = list({source['link']: source for results in all_search_results for source in results}.values())

            if not unique_sources:
                yield format_sse({"event": "error", "data": f"The live multi-page search could not find any relevant sources for '{query}'. The topic might be too obscure."}); return

            sources_to_process = unique_sources[:MAX_SOURCES_TO_PROCESS]
            yield format_sse({"event": "status", "data": f"Found {len(unique_sources)} unique sources. Processing the top {len(sources_to_process)}..."})

            processing_tasks = [research_and_process_source(session, source) for source in sources_to_process]
            consolidated_context, all_sources_used = "", []
            
            for task in asyncio.as_completed(processing_tasks):
                content, source_info = await task
                if content and content.strip():
                    consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
                    all_sources_used.append(source_info)

            if not consolidated_context.strip():
                yield format_sse({"event": "error", "data": "Found sources, but failed to scrape meaningful content from any of them."}); return

            yield format_sse({"event": "status", "data": "Synthesizing final report..."})
            report_prompt = f'Synthesize the provided context into a long-form, comprehensive, multi-page report on "{query}". Use markdown. Elaborate extensively on each point. Base your entire report ONLY on the provided context.\n\n## Research Context ##\n{consolidated_context}'
            report_payload = {"model": LLM_MODEL, "messages": [{"role": "user", "content": report_prompt}], "stream": True}

            async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
                response.raise_for_status()
                async for line in response.content:
                    line_str = line.decode('utf-8').strip()
                    if line_str.startswith('data:'): line_str = line_str[5:].strip()
                    if line_str == "[DONE]": break
                    try:
                        chunk = json.loads(line_str)
                        choices = chunk.get("choices")
                        if choices and isinstance(choices, list) and len(choices) > 0:
                            content = choices[0].get("delta", {}).get("content")
                            if content: yield format_sse({"event": "chunk", "data": content})
                    except json.JSONDecodeError: continue

            yield format_sse({"event": "sources", "data": all_sources_used})
    except Exception as e:
        logging.error(f"A critical error occurred: {e}", exc_info=True)
        yield format_sse({"event": "error", "data": f"An unexpected error occurred: {str(e)}"})

@app.post("/deep-research", response_class=StreamingResponse)
async def deep_research_endpoint(request: DeepResearchRequest):
    return StreamingResponse(run_deep_research_stream(request.query), media_type="text/event-stream")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)