File size: 18,286 Bytes
4b17916
2a0098d
6142af3
 
46a015f
132c134
6c6c904
 
ad8dd04
6c6c904
6142af3
ffce11c
6142af3
2a0098d
4b17916
2a0098d
6c6c904
4d61bdb
4b17916
 
b5390ee
6c6c904
 
 
 
 
2a0098d
 
6c6c904
2a0098d
 
0e14740
6ac9507
2a0098d
46a015f
b5390ee
4c88f38
31e12c0
b5390ee
 
 
 
 
 
 
46a015f
6c6c904
 
 
b5390ee
6c6c904
 
 
 
 
 
 
 
4b17916
6c6c904
 
 
 
 
1e679fd
6142af3
 
b5390ee
 
 
6142af3
ffce11c
 
b5390ee
 
6c6c904
 
 
 
 
 
b5390ee
ffce11c
 
b5390ee
 
132c134
b5390ee
cffab53
132c134
6c6c904
 
 
b5390ee
6c6c904
132c134
 
6c6c904
b5390ee
 
6c6c904
4d61bdb
b5390ee
4d61bdb
 
 
 
b5390ee
 
 
 
4d61bdb
 
b5390ee
 
 
4d61bdb
 
b5390ee
 
 
 
58de22e
 
 
b5390ee
 
58de22e
 
 
b5390ee
 
 
 
 
 
58de22e
 
b5390ee
 
58de22e
b5390ee
 
 
 
 
 
58de22e
b5390ee
 
 
58de22e
6c6c904
b5390ee
 
 
 
 
 
6c6c904
 
b5390ee
6c6c904
4906187
b5390ee
 
 
 
 
4906187
 
b5390ee
6c6c904
b5390ee
6c6c904
b5390ee
 
 
 
6c6c904
b5390ee
 
 
 
4906187
b5390ee
 
6c6c904
 
b5390ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6c6c904
 
b5390ee
 
6c6c904
 
b5390ee
 
6c6c904
b5390ee
 
4d61bdb
b5390ee
 
 
6c6c904
b5390ee
6c6c904
 
 
b5390ee
6142af3
b5390ee
 
6c6c904
b5390ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4d61bdb
b5390ee
6c6c904
 
b5390ee
 
6c6c904
b5390ee
 
 
 
 
 
 
 
 
 
6c6c904
 
 
b5390ee
 
 
 
cffab53
b5390ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cffab53
b5390ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cffab53
 
 
 
 
b5390ee
cffab53
 
b5390ee
ad8dd04
b5390ee
 
ad8dd04
 
b5390ee
cffab53
 
 
b5390ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cffab53
b5390ee
 
cffab53
b5390ee
cffab53
 
 
b5390ee
 
 
 
 
 
cffab53
b5390ee
ad8dd04
 
cffab53
 
 
 
b5390ee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
import os
import asyncio
import json
import logging
import random
import re
import time
from typing import AsyncGenerator, Optional, Tuple, List, Dict
from urllib.parse import quote_plus, urlparse, unquote
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from dotenv import load_dotenv
import aiohttp
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from collections import defaultdict

# --- Configuration ---

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
load_dotenv()

LLM_API_KEY = os.getenv("LLM_API_KEY")
if not LLM_API_KEY:
    raise RuntimeError("LLM_API_KEY must be set in a .env file.")
else:
    logging.info("LLM API Key loaded successfully.")

# --- Constants & Headers ---

LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
MAX_SOURCES_TO_PROCESS = 15
MAX_CONCURRENT_REQUESTS = 3
SEARCH_TIMEOUT = 180  # 3 minutes
TOTAL_TIMEOUT = 600  # 10 minutes total budget
REQUEST_DELAY = 1.0
RETRY_ATTEMPTS = 3
RETRY_DELAY = 3.0

# Initialize fake user agent generator
try:
    ua = UserAgent()
except Exception:
    class SimpleUA:
        def random(self):
            return random.choice([
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0"
            ])
    ua = SimpleUA()

LLM_HEADERS = {
    "Authorization": f"Bearer {LLM_API_KEY}",
    "Content-Type": "application/json",
    "Accept": "application/json"
}

class DeepResearchRequest(BaseModel):
    query: str
    search_time: int = 180  # Default to 3 minutes

# --- FastAPI App Initialization ---

app = FastAPI(
    title="AI Deep Research API",
    description="Provides comprehensive, long-form research reports from real-time web searches.",
    version="4.1.0"
)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# --- Helper Functions ---

def extract_json_from_llm_response(text: str) -> Optional[list]:
    """Extracts a JSON array from the LLM's potentially messy string response."""
    match = re.search(r'\[.*\]', text, re.DOTALL)
    if match:
        try:
            return json.loads(match.group(0))
        except json.JSONDecodeError:
            logger.warning("Failed to decode JSON from LLM response.")
            return None
    return None

async def get_real_user_agent() -> str:
    """Provides a realistic, randomly rotated user agent string."""
    return ua.random

def clean_url(url: str) -> str:
    """Cleans and normalizes URLs, especially from search engine redirects."""
    if not url:
        return ""
    if url.startswith('//duckduckgo.com/l/'):
        try:
            parsed_url = urlparse(f"https:{url}")
            params = dict(p.split('=') for p in parsed_url.query.split('&'))
            return unquote(params.get('uddg', ''))
        except Exception:
            pass
    if url.startswith('//'):
        return 'https:' + url
    if not url.startswith(('http://', 'https://')):
        return 'https://' + url
    return url

async def fetch_search_results(query: str, session: aiohttp.ClientSession, max_results: int = 10) -> List[dict]:
    """Performs a web search using DuckDuckGo's HTML interface with robust retry logic."""
    search_url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
    headers = {'User-Agent': await get_real_user_agent(), 'Accept-Language': 'en-US,en;q=0.5'}

    for attempt in range(RETRY_ATTEMPTS):
        try:
            async with session.get(search_url, headers=headers, timeout=10) as response:
                if response.status == 200:
                    html = await response.text()
                    soup = BeautifulSoup(html, 'html.parser')
                    results = []
                    for result in soup.select('.result'):
                        title_elem = result.select_one('.result__title a')
                        snippet_elem = result.select_one('.result__snippet')
                        if title_elem and snippet_elem and title_elem.has_attr('href'):
                            link = clean_url(title_elem['href'])
                            if link:
                                results.append({
                                    'title': title_elem.get_text(strip=True),
                                    'link': link,
                                    'snippet': snippet_elem.get_text(strip=True)
                                })
                        if len(results) >= max_results:
                            break
                    logger.info(f"Found {len(results)} search results for query: '{query}'")
                    return results
                else:
                    logger.warning(f"Search attempt {attempt+1} for '{query}' failed with status: {response.status}")
        except Exception as e:
            logger.error(f"Search attempt {attempt+1} for '{query}' failed with error: {e}")
        if attempt < RETRY_ATTEMPTS - 1:
            await asyncio.sleep(RETRY_DELAY)
    return []

async def process_web_source(session: aiohttp.ClientSession, source: dict, timeout: int = 15) -> Tuple[Optional[str], dict]:
    """Fetches and extracts the main textual content from a web page."""
    url = source.get('link')
    if not url:
        return None, source

    headers = {'User-Agent': await get_real_user_agent()}
    source_info = source.copy()
    start_time = time.time()

    try:
        if any(url.lower().endswith(ext) for ext in ['.pdf', '.jpg', '.png', '.zip', '.mp3', '.mp4']):
            return None, source
        async with session.get(url, headers=headers, timeout=timeout, ssl=False) as response:
            if response.status != 200 or 'text/html' not in response.headers.get('Content-Type', ''):
                return None, source
            html = await response.text()
            soup = BeautifulSoup(html, "html.parser")
            for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']):
                tag.decompose()
            main_content = soup.select_one('main, article, #main, #content, .main, .post-content, .entry-content')
            if not main_content:
                main_content = soup.body
            if main_content:
                content = " ".join(main_content.stripped_strings)
                content = re.sub(r'\s{2,}', ' ', content).strip()
                if len(content.split()) < 50:
                    return None, source
                source_info['word_count'] = len(content.split())
                source_info['processing_time'] = time.time() - start_time
                return content, source_info
    except Exception as e:
        logger.warning(f"Error processing source {url}: {str(e)}")
    return None, source

async def generate_research_plan(query: str, session: aiohttp.ClientSession) -> List[str]:
    """Generates a list of sub-questions to guide the research process."""
    plan_prompt = {
        "model": LLM_MODEL,
        "messages": [{
            "role": "user",
            "content": f"""You are a research strategist. Your task is to generate 5 distinct, insightful sub-questions to form a comprehensive research plan for the topic: '{query}'.
These questions will guide an AI in searching the web. Focus on different facets of the topic, such as its background, current state, key components, challenges, and future trends.
Your response MUST be ONLY a raw JSON array of strings, with no other text or explanation.
Example: ["What is the history of X?", "How does X compare to its main competitors?", "What are the primary use cases for X in 2025?"]"""
        }],
        "temperature": 0.7,
        "max_tokens": 500
    }
    fallback_plan = [
        f"What is the foundational definition and history of {query}?",
        f"What are the core components and key features of {query}?",
        f"Who are the major competitors or alternatives to {query}?",
        f"What are the primary challenges and limitations associated with {query}?",
        f"What are the latest trends and future predictions for {query}?"
    ]
    try:
        async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=30) as response:
            if response.status == 200:
                result = await response.json()
                content = result['choices'][0]['message']['content']
                sub_questions = extract_json_from_llm_response(content)
                if sub_questions and isinstance(sub_questions, list) and len(sub_questions) > 2:
                    return [str(q) for q in sub_questions]
    except Exception as e:
        logger.error(f"Failed to generate research plan: {e}")
    return fallback_plan

async def run_deep_research_stream(query: str, search_time: int) -> AsyncGenerator[str, None]:
    """The main orchestrator for the deep research process, yielding SSE events."""
    
    def format_sse(data: dict) -> str:
        """Formats a dictionary into an SSE message string."""
        return f"data: {json.dumps(data)}\n\n"

    start_time = time.time()
    
    try:
        yield format_sse({"event": "status", "data": "Initializing deep research..."})
        
        async with aiohttp.ClientSession() as session:
            yield format_sse({"event": "status", "data": "Step 1: Generating research plan..."})
            plan = await generate_research_plan(query, session)
            yield format_sse({"event": "plan", "data": plan})

            yield format_sse({"event": "status", "data": f"Step 2: Searching web for up to {search_time} seconds..."})
            search_tasks = [fetch_search_results(q, session) for q in [query] + plan]
            search_results_lists = await asyncio.gather(*search_tasks)
            
            seen_urls = set()
            all_sources = []
            for result_list in search_results_lists:
                for result in result_list:
                    if result['link'] not in seen_urls:
                        seen_urls.add(result['link'])
                        all_sources.append(result)
            
            query_terms = query.lower().split()
            all_sources.sort(key=lambda s: sum(1 for term in query_terms if term in s['title'].lower()), reverse=True)
            selected_sources = all_sources[:MAX_SOURCES_TO_PROCESS]
            
            yield format_sse({"event": "found_sources", "data": selected_sources})
            if not selected_sources:
                yield format_sse({"event": "error", "data": "Could not find any relevant web sources. Please try a different query."})
                return

            yield format_sse({"event": "status", "data": f"Step 3: Processing {len(selected_sources)} sources..."})
            
            semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
            
            async def process_source_and_yield(source):
                """Helper to yield status before and after processing a source."""
                yield format_sse({"event": "processing_source", "data": {"link": source.get('link'), "title": source.get('title')}})
                content, source_info = await process_web_source(session, source)
                if content:
                    yield format_sse({"event": "processed_source_success", "data": source_info})
                else:
                    yield format_sse({"event": "processed_source_failure", "data": source_info})
                return content, source_info

            async def process_with_semaphore(source):
                async with semaphore:
                    results = []
                    async for result in process_source_and_yield(source):
                        results.append(result)
                    return results

            tasks = [asyncio.create_task(process_with_semaphore(source)) for source in selected_sources]
            
            consolidated_context = ""
            successful_sources = []

            for task in asyncio.as_completed(tasks):
                results = await task
                for res in results:
                    if isinstance(res, str): # SSE event
                        yield res
                
                # The actual return value is the last item
                content, source_info = results[-1]
                if content:
                    consolidated_context += f"Source URL: {source_info['link']}\nSource Title: {source_info['title']}\n\nContent:\n{content}\n\n---\n\n"
                    successful_sources.append(source_info)

            if not successful_sources:
                yield format_sse({"event": "error", "data": "Failed to extract content from any of the selected sources."})
                return

            yield format_sse({"event": "status", "data": f"Step 4: Synthesizing report from {len(successful_sources)} sources..."})

            report_prompt = f"""You are an expert research analyst. Your task is to write a comprehensive, in-depth, and exceptionally long-form report on the topic: "{query}".
You have been provided with detailed content from {len(successful_sources)} web sources. You must use this information as the primary basis for your report.
**Report Requirements:**
1.  **Length:** The final report must be extremely detailed and thorough, aiming for a word count between 4,000 and 8,000 words. Do not write a short summary.
2.  **Structure:** Organize the report into the following sections. Elaborate extensively within each one.
    - **Executive Summary:** A concise, high-level overview of the entire report.
    - **Introduction:** Set the context and background for '{query}'.
    - **Deep Dive Analysis:** This should be the largest part of the report. Break it down into multiple sub-sections based on the research plan: {', '.join(plan)}. Analyze, synthesize, and connect information from the provided sources. Do not just list facts; provide deep insights.
    - **Challenges and Limitations:** A dedicated analysis of the problems, criticisms, and hurdles related to the topic.
    - **Future Outlook and Predictions:** Analyze the future trends, potential developments, and expert predictions.
    - **Conclusion:** Summarize the key findings and provide a final, conclusive statement.
3.  **Citations:** While you write, you MUST cite the information you use. After a sentence or paragraph that uses a source, add a citation like `[Source: example.com/article]`. Use the actual URLs provided in the context.
4.  **Formatting:** Use Markdown for clear headings, subheadings, bullet points, and bold text to improve readability.
**SOURCE MATERIAL:**
{consolidated_context[:1000000]}
Begin writing the full, comprehensive report now. Adhere strictly to all instructions.
"""

            report_payload = {
                "model": LLM_MODEL,
                "messages": [{"role": "user", "content": report_prompt}],
                "stream": True,
                # **CHANGE**: max_tokens parameter has been removed to allow for automatic/unlimited length.
            }

            async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload, timeout=TOTAL_TIMEOUT) as response:
                if response.status != 200:
                    error_text = await response.text()
                    yield format_sse({"event": "error", "data": f"Report generation failed with status {response.status}: {error_text}"})
                    return

                # **CHANGE**: Switched to direct chunk streaming as requested.
                async for line in response.content:
                    line_str = line.decode('utf-8').strip()
                    if line_str.startswith('data:'):
                        content = line_str[5:].strip()
                        if content == "[DONE]":
                            break
                        try:
                            chunk = json.loads(content)
                            delta = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
                            if delta:
                                # Stream chunks as they arrive without line buffering
                                yield format_sse({"event": "chunk", "data": delta})
                        except (json.JSONDecodeError, IndexError):
                            continue

        duration = time.time() - start_time
        stats = {
            "duration_seconds": round(duration, 1),
            "sources_found": len(all_sources),
            "sources_processed": len(selected_sources),
            "sources_successful": len(successful_sources)
        }
        yield format_sse({"event": "stats", "data": stats})
        yield format_sse({"event": "sources", "data": successful_sources})

    except Exception as e:
        logger.critical(f"A critical error occurred in the research stream: {e}", exc_info=True)
        yield format_sse({"event": "error", "data": f"An unexpected critical error occurred: {str(e)}"})
    finally:
        yield format_sse({"event": "complete", "data": "Research process finished."})

@app.post("/deep-research", response_class=StreamingResponse)
async def deep_research_endpoint(request: DeepResearchRequest):
    """The main API endpoint for initiating a deep research task."""
    query = request.query.strip()
    if not query or len(query) < 5:
        raise HTTPException(status_code=400, detail="Query is too short. Please provide a more detailed query.")
    search_time = max(60, min(request.search_time, 300))
    
    return StreamingResponse(
        run_deep_research_stream(query, search_time),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
    )

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)