rkihacker commited on
Commit
b5390ee
·
verified ·
1 Parent(s): 8d499f8

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +225 -618
main.py CHANGED
@@ -1,4 +1,3 @@
1
-
2
  import os
3
  import asyncio
4
  import json
@@ -19,6 +18,7 @@ from fake_useragent import UserAgent
19
  from collections import defaultdict
20
 
21
  # --- Configuration ---
 
22
  logging.basicConfig(
23
  level=logging.INFO,
24
  format='%(asctime)s - %(levelname)s - %(message)s'
@@ -33,23 +33,21 @@ else:
33
  logging.info("LLM API Key loaded successfully.")
34
 
35
  # --- Constants & Headers ---
 
36
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
37
  LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
38
- MAX_SOURCES_TO_PROCESS = 20 # Increased for more research
39
- MAX_CONCURRENT_REQUESTS = 2
40
- SEARCH_TIMEOUT = 300 # 5 minutes for longer research
41
- TOTAL_TIMEOUT = 600 # Increased to allow more time for generation
42
- REQUEST_DELAY = 3.0
43
- RETRY_ATTEMPTS = 5
44
- RETRY_DELAY = 5.0
45
- USER_AGENT_ROTATION = True
46
- CONTEXT_WINDOW_SIZE = 10000000 # 10 million tokens
47
- MAX_CONTEXT_SIZE = 2000000 # Increased practical limit for prompt
48
 
49
  # Initialize fake user agent generator
50
  try:
51
  ua = UserAgent()
52
- except:
53
  class SimpleUA:
54
  def random(self):
55
  return random.choice([
@@ -67,710 +65,319 @@ LLM_HEADERS = {
67
 
68
  class DeepResearchRequest(BaseModel):
69
  query: str
70
- search_time: int = 300 # Default to 5 minutes
 
 
71
 
72
  app = FastAPI(
73
  title="AI Deep Research API",
74
- description="Provides comprehensive research reports from real web searches within 5 minutes.",
75
- version="3.0.0"
76
  )
77
  app.add_middleware(
78
  CORSMiddleware,
79
  allow_origins=["*"],
80
  allow_credentials=True,
81
  allow_methods=["*"],
82
- allow_headers=["*"]
83
  )
84
 
 
 
85
  def extract_json_from_llm_response(text: str) -> Optional[list]:
86
- """Extract JSON array from LLM response text."""
87
  match = re.search(r'\[.*\]', text, re.DOTALL)
88
  if match:
89
  try:
90
  return json.loads(match.group(0))
91
  except json.JSONDecodeError:
 
92
  return None
93
  return None
94
 
95
  async def get_real_user_agent() -> str:
96
- """Get a realistic user agent string."""
97
- try:
98
- if isinstance(ua, UserAgent):
99
- return ua.random
100
- return ua.random()
101
- except:
102
- return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
103
 
104
  def clean_url(url: str) -> str:
105
- """Clean up and normalize URLs."""
106
  if not url:
107
  return ""
108
-
109
  if url.startswith('//duckduckgo.com/l/'):
110
- url = f"https:{url}"
111
  try:
112
- parsed = urlparse(url)
113
- query_params = parsed.query
114
- if 'uddg=' in query_params:
115
- match = re.search(r'uddg=([^&]+)', query_params)
116
- if match:
117
- return unquote(match.group(1))
118
- except:
119
  pass
120
-
121
  if url.startswith('//'):
122
- url = 'https:' + url
123
- elif not url.startswith(('http://', 'https://')):
124
- url = 'https://' + url
125
-
126
  return url
127
 
128
- async def check_robots_txt(url: str) -> bool:
129
- """Check if scraping is allowed by robots.txt."""
130
- try:
131
- domain_match = re.search(r'https?://([^/]+)', url)
132
- if not domain_match:
133
- return False
134
-
135
- domain = domain_match.group(1)
136
- robots_url = f"https://{domain}/robots.txt"
137
-
138
- async with aiohttp.ClientSession() as session:
139
- headers = {'User-Agent': await get_real_user_agent()}
140
- async with session.get(robots_url, headers=headers, timeout=5) as response:
141
- if response.status == 200:
142
- robots = await response.text()
143
- if "Disallow: /" in robots:
144
- return False
145
- path = re.sub(r'https?://[^/]+', '', url)
146
- if any(f"Disallow: {p}" in robots for p in [path, path.rstrip('/') + '/']):
147
- return False
148
- return True
149
- except Exception as e:
150
- logging.warning(f"Could not check robots.txt for {url}: {e}")
151
- return False
152
-
153
- async def fetch_search_results(query: str, max_results: int = 5) -> List[dict]:
154
- """Perform a real search using DuckDuckGo's HTML interface with robust retry logic."""
155
- headers = {
156
- "User-Agent": await get_real_user_agent(),
157
- "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
158
- "Accept-Language": "en-US,en;q=0.5",
159
- "Referer": "https://duckduckgo.com/",
160
- "DNT": "1"
161
- }
162
 
163
  for attempt in range(RETRY_ATTEMPTS):
164
  try:
165
- search_url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
166
- async with aiohttp.ClientSession() as session:
167
- async with session.get(search_url, headers=headers, timeout=10) as response:
168
- if response.status != 200:
169
- if response.status == 202:
170
- logging.warning(f"Search attempt {attempt + 1} failed with status 202 for query '{query}'")
171
- if attempt < RETRY_ATTEMPTS - 1:
172
- await asyncio.sleep(RETRY_DELAY)
173
- continue
174
- logging.warning(f"Search failed with status {response.status} for query '{query}'")
175
- return []
176
-
177
  html = await response.text()
178
  soup = BeautifulSoup(html, 'html.parser')
179
-
180
  results = []
181
- for selector in ['.result__body', '.result__a', '.result']:
182
- if len(results) >= max_results:
183
- break
184
-
185
- for result in soup.select(selector)[:max_results]:
186
- try:
187
- title_elem = result.select_one('.result__title .result__a') or result.select_one('.result__a')
188
- if not title_elem:
189
- continue
190
-
191
- link = title_elem['href']
192
- snippet_elem = result.select_one('.result__snippet')
193
-
194
- clean_link = clean_url(link)
195
- if not clean_link or clean_link.startswith('javascript:'):
196
- continue
197
-
198
- snippet = snippet_elem.get_text(strip=True) if snippet_elem else ""
199
  results.append({
200
  'title': title_elem.get_text(strip=True),
201
- 'link': clean_link,
202
- 'snippet': snippet
203
  })
204
- except Exception as e:
205
- logging.warning(f"Error parsing search result: {e}")
206
- continue
207
-
208
- logging.info(f"Found {len(results)} real search results for '{query}'")
209
- return results[:max_results]
210
  except Exception as e:
211
- logging.error(f"Search attempt {attempt + 1} failed for '{query}': {e}")
212
- if attempt < RETRY_ATTEMPTS - 1:
213
- await asyncio.sleep(RETRY_DELAY)
214
- continue
215
- logging.error(f"All {RETRY_ATTEMPTS} search attempts failed for '{query}'")
216
  return []
217
 
218
- async def process_web_source(session: aiohttp.ClientSession, source: dict, timeout: int = 15) -> Tuple[str, dict]:
219
- """Process a real web source with improved content extraction and error handling."""
 
 
 
 
220
  headers = {'User-Agent': await get_real_user_agent()}
221
  source_info = source.copy()
222
- source_info['link'] = clean_url(source['link'])
223
-
224
- if not source_info['link'] or not source_info['link'].startswith(('http://', 'https://')):
225
- logging.warning(f"Invalid URL: {source_info['link']}")
226
- return source.get('snippet', ''), source_info
227
-
228
- if not await check_robots_txt(source_info['link']):
229
- logging.info(f"Scraping disallowed by robots.txt for {source_info['link']}")
230
- return source.get('snippet', ''), source_info
231
 
232
  try:
233
- logging.info(f"Processing source: {source_info['link']}")
234
- start_time = time.time()
235
-
236
- if any(source_info['link'].lower().endswith(ext) for ext in ['.pdf', '.doc', '.docx', '.ppt', '.pptx', '.xls', '.xlsx']):
237
- logging.info(f"Skipping non-HTML content at {source_info['link']}")
238
- return source.get('snippet', ''), source_info
239
-
240
- await asyncio.sleep(REQUEST_DELAY)
241
-
242
- async with session.get(source_info['link'], headers=headers, timeout=timeout, ssl=False) as response:
243
- if response.status != 200:
244
- logging.warning(f"HTTP {response.status} for {source_info['link']}")
245
- return source.get('snippet', ''), source_info
246
-
247
- content_type = response.headers.get('Content-Type', '').lower()
248
- if 'text/html' not in content_type:
249
- logging.info(f"Non-HTML content at {source_info['link']} (type: {content_type})")
250
- return source.get('snippet', ''), source_info
251
-
252
  html = await response.text()
253
  soup = BeautifulSoup(html, "html.parser")
254
-
255
- for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'iframe', 'noscript', 'form']):
256
  tag.decompose()
257
-
258
- selectors_to_try = [
259
- 'main',
260
- 'article',
261
- '[role="main"]',
262
- '.main-content',
263
- '.content',
264
- '.article-body',
265
- '.post-content',
266
- '.entry-content',
267
- '#content',
268
- '#main',
269
- '.main',
270
- '.article'
271
- ]
272
-
273
- main_content = None
274
- for selector in selectors_to_try:
275
- main_content = soup.select_one(selector)
276
- if main_content:
277
- break
278
-
279
  if not main_content:
280
- all_elements = soup.find_all()
281
- candidates = [el for el in all_elements if el.name not in ['script', 'style', 'nav', 'footer', 'header']]
282
- if candidates:
283
- candidates.sort(key=lambda x: len(x.get_text()), reverse=True)
284
- main_content = candidates[0] if candidates else soup
285
-
286
- if not main_content:
287
- main_content = soup.find('body') or soup
288
-
289
- content = " ".join(main_content.stripped_strings)
290
- content = re.sub(r'\s+', ' ', content).strip()
291
-
292
- if len(content.split()) < 50 and len(html) > 10000:
293
- paras = soup.find_all('p')
294
- content = " ".join([p.get_text() for p in paras if p.get_text().strip()])
295
- content = re.sub(r'\s+', ' ', content).strip()
296
-
297
  if len(content.split()) < 50:
298
- content = " ".join(soup.stripped_strings)
299
- content = re.sub(r'\s+', ' ', content).strip()
300
-
301
- if len(content.split()) < 30:
302
- for tag in ['div', 'section', 'article']:
303
- for element in soup.find_all(tag):
304
- if len(element.get_text().split()) > 200:
305
- content = " ".join(element.stripped_strings)
306
- content = re.sub(r'\s+', ' ', content).strip()
307
- if len(content.split()) >= 30:
308
- break
309
- if len(content.split()) >= 30:
310
- break
311
-
312
- if len(content.split()) < 30:
313
- logging.warning(f"Very little content extracted from {source_info['link']}")
314
- return source.get('snippet', ''), source_info
315
-
316
- source_info['word_count'] = len(content.split())
317
- source_info['processing_time'] = time.time() - start_time
318
- return content, source_info
319
-
320
- except asyncio.TimeoutError:
321
- logging.warning(f"Timeout while processing {source_info['link']}")
322
- return source.get('snippet', ''), source_info
323
  except Exception as e:
324
- logging.warning(f"Error processing {source_info['link']}: {str(e)[:200]}")
325
- return source.get('snippet', ''), source_info
326
 
327
  async def generate_research_plan(query: str, session: aiohttp.ClientSession) -> List[str]:
328
- """Generate a comprehensive research plan with sub-questions."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
329
  try:
330
- plan_prompt = {
331
- "model": LLM_MODEL,
332
- "messages": [{
333
- "role": "user",
334
- "content": f"""Generate 4-6 comprehensive sub-questions for in-depth research on '{query}'.
335
- Focus on key aspects that would provide a complete understanding of the topic.
336
- Your response MUST be ONLY the raw JSON array with no additional text.
337
- Example: ["What is the historical background of X?", "What are the current trends in X?"]"""
338
- }],
339
- "temperature": 0.7,
340
- "max_tokens": 300
341
- }
342
-
343
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=30) as response:
344
- response.raise_for_status()
345
- result = await response.json()
346
-
347
- if isinstance(result, list):
348
- return result
349
- elif isinstance(result, dict) and 'choices' in result:
350
  content = result['choices'][0]['message']['content']
351
  sub_questions = extract_json_from_llm_response(content)
352
- if sub_questions and isinstance(sub_questions, list):
353
- cleaned = []
354
- for q in sub_questions:
355
- if isinstance(q, str) and q.strip():
356
- cleaned_q = re.sub(r'^[^a-zA-Z0-9]*|[^a-zA-Z0-9]*$', '', q)
357
- if cleaned_q:
358
- cleaned.append(cleaned_q)
359
- return cleaned[:6]
360
-
361
- return [
362
- f"What is {query} and its key features?",
363
- f"How does {query} compare to alternatives?",
364
- f"What are the current developments in {query}?",
365
- f"What are the main challenges with {query}?",
366
- f"What does the future hold for {query}?"
367
- ]
368
  except Exception as e:
369
- logging.error(f"Failed to generate research plan: {e}")
370
- return [
371
- f"What is {query}?",
372
- f"What are the key aspects of {query}?",
373
- f"What are current trends in {query}?",
374
- f"What are the challenges with {query}?"
375
- ]
376
-
377
- async def continuous_search(query: str, search_time: int = 300) -> AsyncGenerator[Dict[str, any], None]:
378
- """Perform continuous searching with retries and diverse queries, yielding updates for each new result."""
379
- start_time = time.time()
380
- all_results = []
381
- seen_urls = set()
382
- fallback_results = []
383
-
384
- query_variations = [
385
- query,
386
- f"{query} comparison",
387
- f"{query} review",
388
- f"{query} latest developments",
389
- f"{query} features and benefits",
390
- f"{query} challenges and limitations"
391
- ]
392
 
393
- async with aiohttp.ClientSession() as session:
394
- iteration = 0
395
- result_count = 0
396
- while time.time() - start_time < search_time:
397
- iteration += 1
398
- random.shuffle(query_variations)
399
- for q in query_variations:
400
- if time.time() - start_time >= search_time:
401
- logger.info(f"Search timed out after {search_time} seconds. Found {len(all_results)} results.")
402
- break
403
-
404
- logger.info(f"Iteration {iteration}: Searching for query variation: {q}")
405
- yield {"event": "status", "data": f"Searching for '{q}'..."}
406
-
407
- try:
408
- results = await fetch_search_results(q, max_results=5)
409
- logger.info(f"Retrieved {len(results)} results for query '{q}'")
410
- for result in results:
411
- clean_link = clean_url(result['link'])
412
- if clean_link and clean_link not in seen_urls:
413
- seen_urls.add(clean_link)
414
- result['link'] = clean_link
415
- all_results.append(result)
416
- fallback_results.append(result)
417
- result_count += 1
418
- logger.info(f"Added new result: {result['title']} ({result['link']})")
419
- yield {"event": "found_result", "data": f"Found result {result_count}: {result['title']} ({result['link']})"}
420
-
421
- await asyncio.sleep(REQUEST_DELAY)
422
- if len(all_results) >= MAX_SOURCES_TO_PROCESS * 1.5:
423
- logger.info(f"Reached sufficient results: {len(all_results)}")
424
- break
425
- except Exception as e:
426
- logger.error(f"Error during search for '{q}': {e}")
427
- yield {"event": "warning", "data": f"Search error for '{q}': {str(e)[:100]}"}
428
- await asyncio.sleep(RETRY_DELAY)
429
-
430
- if len(all_results) >= MAX_SOURCES_TO_PROCESS * 1.5:
431
- break
432
-
433
- logger.info(f"Completed continuous search. Total results: {len(all_results)}")
434
-
435
- if len(all_results) < MAX_SOURCES_TO_PROCESS:
436
- logger.warning(f"Insufficient results ({len(all_results)}), using fallback results")
437
- yield {"event": "warning", "data": f"Insufficient results, using fallback results to reach minimum."}
438
- all_results.extend(fallback_results[:MAX_SOURCES_TO_PROCESS - len(all_results)])
439
-
440
- if all_results:
441
- def score_result(result):
442
- query_terms = set(query.lower().split())
443
- title = result['title'].lower()
444
- snippet = result['snippet'].lower()
445
- matches = sum(1 for term in query_terms if term in title or term in snippet)
446
- snippet_length = len(result['snippet'].split())
447
- return matches * 10 + snippet_length
448
-
449
- all_results.sort(key=score_result, reverse=True)
450
-
451
- yield {"event": "final_search_results", "data": all_results[:MAX_SOURCES_TO_PROCESS * 2]}
452
-
453
- async def filter_and_select_sources(results: List[dict]) -> List[dict]:
454
- """Filter and select the best sources from search results."""
455
- if not results:
456
- logger.warning("No search results to filter.")
457
- return []
458
-
459
- logger.info(f"Filtering {len(results)} search results...")
460
-
461
- domain_counts = defaultdict(int)
462
- domain_results = defaultdict(list)
463
- for result in results:
464
- domain = urlparse(result['link']).netloc
465
- domain_counts[domain] += 1
466
- domain_results[domain].append(result)
467
-
468
- selected = []
469
- for domain, domain_res in domain_results.items():
470
- if len(selected) >= MAX_SOURCES_TO_PROCESS:
471
- break
472
- if domain_res:
473
- selected.append(domain_res[0])
474
- logger.info(f"Selected top result from domain {domain}: {domain_res[0]['link']}")
475
-
476
- if len(selected) < MAX_SOURCES_TO_PROCESS:
477
- domain_quality = {}
478
- for domain, domain_res in domain_results.items():
479
- avg_length = sum(len(r['snippet'].split()) for r in domain_res) / len(domain_res)
480
- domain_quality[domain] = avg_length
481
-
482
- sorted_domains = sorted(domain_quality.items(), key=lambda x: x[1], reverse=True)
483
- for domain, _ in sorted_domains:
484
- if len(selected) >= MAX_SOURCES_TO_PROCESS:
485
- break
486
- for res in domain_results[domain]:
487
- if res not in selected:
488
- selected.append(res)
489
- logger.info(f"Added additional result from high-quality domain {domain}: {res['link']}")
490
- if len(selected) >= MAX_SOURCES_TO_PROCESS:
491
- break
492
-
493
- if len(selected) < MAX_SOURCES_TO_PROCESS:
494
- all_results_sorted = sorted(results, key=lambda x: len(x['snippet'].split()), reverse=True)
495
- for res in all_results_sorted:
496
- if res not in selected:
497
- selected.append(res)
498
- logger.info(f"Added fallback high-snippet result: {res['link']}")
499
- if len(selected) >= MAX_SOURCES_TO_PROCESS:
500
- break
501
-
502
- logger.info(f"Selected {len(selected)} sources after filtering.")
503
- return selected[:MAX_SOURCES_TO_PROCESS]
504
-
505
- async def run_deep_research_stream(query: str, search_time: int = 300) -> AsyncGenerator[str, None]:
506
  def format_sse(data: dict) -> str:
 
507
  return f"data: {json.dumps(data)}\n\n"
508
 
509
  start_time = time.time()
510
- processed_sources = 0
511
- successful_sources = 0
512
- total_tokens = 0
513
-
514
  try:
515
- yield format_sse({
516
- "event": "status",
517
- "data": f"Starting deep research on '{query}'. Search time limit: {search_time} seconds."
518
- })
519
-
520
  async with aiohttp.ClientSession() as session:
521
- yield format_sse({"event": "status", "data": "Generating comprehensive research plan..."})
522
- try:
523
- sub_questions = await generate_research_plan(query, session)
524
- yield format_sse({"event": "plan", "data": sub_questions})
525
- except Exception as e:
526
- yield format_sse({
527
- "event": "error",
528
- "data": f"Failed to generate research plan: {str(e)[:200]}"
529
- })
530
- sub_questions = [
531
- f"What is {query}?",
532
- f"What are the key aspects of {query}?",
533
- f"What are current trends in {query}?",
534
- f"What are the challenges with {query}?"
535
- ]
536
- yield format_sse({"event": "plan", "data": sub_questions})
537
-
538
- yield format_sse({
539
- "event": "status",
540
- "data": f"Performing continuous search for up to {search_time} seconds..."
541
- })
542
-
543
- search_results = []
544
- async for update in continuous_search(query, search_time):
545
- if update["event"] == "final_search_results":
546
- search_results = update["data"]
547
- else:
548
- yield format_sse(update)
549
-
550
- yield format_sse({
551
- "event": "status",
552
- "data": f"Found {len(search_results)} potential sources. Selecting the best ones..."
553
- })
554
- yield format_sse({
555
- "event": "found_sources",
556
- "data": search_results
557
- })
558
-
559
- if not search_results:
560
- yield format_sse({
561
- "event": "error",
562
- "data": "No search results found. Check your query and try again."
563
- })
564
- return
565
-
566
- selected_sources = await filter_and_select_sources(search_results)
567
- yield format_sse({
568
- "event": "status",
569
- "data": f"Selected {len(selected_sources)} high-quality sources to process."
570
- })
571
- yield format_sse({
572
- "event": "selected_sources",
573
- "data": selected_sources
574
- })
575
-
576
  if not selected_sources:
577
- yield format_sse({
578
- "event": "error",
579
- "data": "No valid sources found after filtering."
580
- })
581
  return
582
 
 
 
583
  semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
584
- consolidated_context = ""
585
- all_sources_used = []
586
- processing_errors = 0
 
 
 
 
 
 
 
587
 
588
  async def process_with_semaphore(source):
589
  async with semaphore:
590
- return await process_web_source(session, source, timeout=20)
591
-
592
- processing_tasks = []
593
- for i, source in enumerate(selected_sources):
594
- elapsed = time.time() - start_time
595
- if elapsed > TOTAL_TIMEOUT * 0.8:
596
- yield format_sse({
597
- "event": "status",
598
- "data": f"Approaching time limit, stopping source processing at {i}/{len(selected_sources)}"
599
- })
600
- break
601
-
602
- if i > 0:
603
- await asyncio.sleep(REQUEST_DELAY * 0.5)
604
-
605
- task = asyncio.create_task(process_with_semaphore(source))
606
- processing_tasks.append(task)
607
-
608
- if (i + 1) % 2 == 0 or (i + 1) == len(selected_sources):
609
- yield format_sse({
610
- "event": "status",
611
- "data": f"Processed {min(i+1, len(selected_sources))}/{len(selected_sources)} sources..."
612
- })
613
-
614
- for future in asyncio.as_completed(processing_tasks):
615
- processed_sources += 1
616
- content, source_info = await future
617
- if content and content.strip():
618
- consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
619
- all_sources_used.append(source_info)
620
- successful_sources += 1
621
- total_tokens += len(content.split())
622
- yield format_sse({
623
- "event": "processed_source",
624
- "data": source_info
625
- })
626
- else:
627
- processing_errors += 1
628
- yield format_sse({
629
- "event": "warning",
630
- "data": f"Failed to extract content from {source_info['link']}"
631
- })
632
-
633
- if not consolidated_context.strip():
634
- yield format_sse({
635
- "event": "error",
636
- "data": f"Failed to extract content from any sources. {processing_errors} errors occurred."
637
- })
638
- return
639
-
640
- time_remaining = max(0, TOTAL_TIMEOUT - (time.time() - start_time))
641
- yield format_sse({
642
- "event": "status",
643
- "data": f"Synthesizing comprehensive report from {successful_sources} sources..."
644
- })
645
-
646
- max_output_tokens = 16000 # Fixed to allow long response
647
-
648
- report_prompt = f"""Compose an in-depth analysis report on "{query}".
649
-
650
- Generate a very long, detailed report leveraging the large context window of 10 million tokens. Provide thorough, deep analysis with extensive details, examples, and insights in each section. Expand on each point with sub-sections, data, and comprehensive explanations to make the report as long and informative as possible, aiming for 5,000 to 10,000 words.
651
-
652
- Structure the report with these sections:
653
- 1. Introduction and Background
654
- 2. Key Features and Capabilities
655
- 3. Comparative Analysis with Alternatives
656
- 4. Current Developments and Trends
657
- 5. Challenges and Limitations
658
- 6. Future Outlook
659
- 7. Conclusion and Recommendations
660
-
661
- For each section, provide detailed analysis based on the source material.
662
- Include specific examples and data points from the sources when available.
663
- Compare and contrast different viewpoints from various sources.
664
-
665
- Use markdown formatting for headings, subheadings, lists, and emphasis.
666
- Cite sources where appropriate using inline citations like [1][2].
667
 
668
- Available information from {successful_sources} sources:
669
- {consolidated_context[:MAX_CONTEXT_SIZE]}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
670
 
671
- Generate a comprehensive report of approximately 5,000 to 10,000 words.
672
- Focus on providing deep insights, analysis, and actionable information.
673
- """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
674
 
675
  report_payload = {
676
  "model": LLM_MODEL,
677
  "messages": [{"role": "user", "content": report_prompt}],
678
  "stream": True,
679
- "max_tokens": max_output_tokens
680
  }
681
 
682
- async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
683
  if response.status != 200:
684
- yield format_sse({
685
- "event": "error",
686
- "data": f"Failed to generate report: HTTP {response.status}"
687
- })
688
  return
689
 
690
- buffer = ""
691
  async for line in response.content:
692
- if time.time() - start_time > TOTAL_TIMEOUT:
693
- yield format_sse({
694
- "event": "warning",
695
- "data": "Time limit reached, ending report generation early."
696
- })
697
- break
698
-
699
  line_str = line.decode('utf-8').strip()
700
  if line_str.startswith('data:'):
701
- line_str = line_str[5:].strip()
702
- if line_str == "[DONE]":
703
- if buffer:
704
- yield format_sse({"event": "chunk", "data": buffer})
705
- break
706
- if not line_str:
707
- continue # Skip empty lines
708
- try:
709
- chunk = json.loads(line_str)
710
- choices = chunk.get("choices")
711
- if choices and isinstance(choices, list) and len(choices) > 0:
712
- content = choices[0].get("delta", {}).get("content")
713
- if content:
714
- buffer += content
715
- if len(buffer) > 100:
716
- yield format_sse({"event": "chunk", "data": buffer})
717
- buffer = ""
718
- except json.JSONDecodeError as e:
719
- logging.warning(f"JSON decode error for line: {line_str} - {e}")
720
- continue
721
- except Exception as e:
722
- logging.warning(f"Error processing stream chunk: {e}")
723
- continue
724
-
725
- if buffer:
726
- yield format_sse({"event": "chunk", "data": buffer})
727
-
728
- duration = time.time() - start_time
729
- stats = {
730
- "total_time_seconds": round(duration),
731
- "sources_processed": processed_sources,
732
- "sources_successful": successful_sources,
733
- "estimated_tokens": total_tokens,
734
- "sources_used": len(all_sources_used)
735
- }
736
- yield format_sse({
737
- "event": "status",
738
- "data": f"Research completed successfully in {duration:.1f} seconds."
739
- })
740
- yield format_sse({"event": "stats", "data": stats})
741
- yield format_sse({"event": "sources", "data": all_sources_used})
742
-
743
- except asyncio.TimeoutError:
744
- yield format_sse({
745
- "event": "error",
746
- "data": f"Research process timed out after {TOTAL_TIMEOUT} seconds."
747
- })
748
  except Exception as e:
749
- logging.error(f"Critical error in research process: {e}", exc_info=True)
750
- yield format_sse({
751
- "event": "error",
752
- "data": f"An unexpected error occurred: {str(e)[:200]}"
753
- })
754
  finally:
755
- duration = time.time() - start_time
756
- yield format_sse({
757
- "event": "complete",
758
- "data": f"Research process finished after {duration:.1f} seconds."
759
- })
760
 
761
  @app.post("/deep-research", response_class=StreamingResponse)
762
  async def deep_research_endpoint(request: DeepResearchRequest):
763
- """Endpoint for deep research that streams SSE responses."""
764
- if not request.query or len(request.query.strip()) < 3:
765
- raise HTTPException(status_code=400, detail="Query must be at least 3 characters long")
766
-
767
- search_time = min(max(request.search_time, 60), 300) # Clamp to 5 minutes max
 
768
  return StreamingResponse(
769
- run_deep_research_stream(request.query.strip(), search_time),
770
  media_type="text/event-stream",
771
  headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
772
  )
773
 
774
  if __name__ == "__main__":
775
  import uvicorn
776
- uvicorn.run(app, host="0.0.0.0", port=8000)
 
 
1
  import os
2
  import asyncio
3
  import json
 
18
  from collections import defaultdict
19
 
20
  # --- Configuration ---
21
+
22
  logging.basicConfig(
23
  level=logging.INFO,
24
  format='%(asctime)s - %(levelname)s - %(message)s'
 
33
  logging.info("LLM API Key loaded successfully.")
34
 
35
  # --- Constants & Headers ---
36
+
37
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
38
  LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
39
+ MAX_SOURCES_TO_PROCESS = 15
40
+ MAX_CONCURRENT_REQUESTS = 3
41
+ SEARCH_TIMEOUT = 180 # 3 minutes
42
+ TOTAL_TIMEOUT = 600 # 10 minutes total budget
43
+ REQUEST_DELAY = 1.0
44
+ RETRY_ATTEMPTS = 3
45
+ RETRY_DELAY = 3.0
 
 
 
46
 
47
  # Initialize fake user agent generator
48
  try:
49
  ua = UserAgent()
50
+ except Exception:
51
  class SimpleUA:
52
  def random(self):
53
  return random.choice([
 
65
 
66
  class DeepResearchRequest(BaseModel):
67
  query: str
68
+ search_time: int = 180 # Default to 3 minutes
69
+
70
+ # --- FastAPI App Initialization ---
71
 
72
  app = FastAPI(
73
  title="AI Deep Research API",
74
+ description="Provides comprehensive, long-form research reports from real-time web searches.",
75
+ version="4.1.0"
76
  )
77
  app.add_middleware(
78
  CORSMiddleware,
79
  allow_origins=["*"],
80
  allow_credentials=True,
81
  allow_methods=["*"],
82
+ allow_headers=["*"],
83
  )
84
 
85
+ # --- Helper Functions ---
86
+
87
  def extract_json_from_llm_response(text: str) -> Optional[list]:
88
+ """Extracts a JSON array from the LLM's potentially messy string response."""
89
  match = re.search(r'\[.*\]', text, re.DOTALL)
90
  if match:
91
  try:
92
  return json.loads(match.group(0))
93
  except json.JSONDecodeError:
94
+ logger.warning("Failed to decode JSON from LLM response.")
95
  return None
96
  return None
97
 
98
  async def get_real_user_agent() -> str:
99
+ """Provides a realistic, randomly rotated user agent string."""
100
+ return ua.random
 
 
 
 
 
101
 
102
  def clean_url(url: str) -> str:
103
+ """Cleans and normalizes URLs, especially from search engine redirects."""
104
  if not url:
105
  return ""
 
106
  if url.startswith('//duckduckgo.com/l/'):
 
107
  try:
108
+ parsed_url = urlparse(f"https:{url}")
109
+ params = dict(p.split('=') for p in parsed_url.query.split('&'))
110
+ return unquote(params.get('uddg', ''))
111
+ except Exception:
 
 
 
112
  pass
 
113
  if url.startswith('//'):
114
+ return 'https:' + url
115
+ if not url.startswith(('http://', 'https://')):
116
+ return 'https://' + url
 
117
  return url
118
 
119
+ async def fetch_search_results(query: str, session: aiohttp.ClientSession, max_results: int = 10) -> List[dict]:
120
+ """Performs a web search using DuckDuckGo's HTML interface with robust retry logic."""
121
+ search_url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
122
+ headers = {'User-Agent': await get_real_user_agent(), 'Accept-Language': 'en-US,en;q=0.5'}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
 
124
  for attempt in range(RETRY_ATTEMPTS):
125
  try:
126
+ async with session.get(search_url, headers=headers, timeout=10) as response:
127
+ if response.status == 200:
 
 
 
 
 
 
 
 
 
 
128
  html = await response.text()
129
  soup = BeautifulSoup(html, 'html.parser')
 
130
  results = []
131
+ for result in soup.select('.result'):
132
+ title_elem = result.select_one('.result__title a')
133
+ snippet_elem = result.select_one('.result__snippet')
134
+ if title_elem and snippet_elem and title_elem.has_attr('href'):
135
+ link = clean_url(title_elem['href'])
136
+ if link:
 
 
 
 
 
 
 
 
 
 
 
 
137
  results.append({
138
  'title': title_elem.get_text(strip=True),
139
+ 'link': link,
140
+ 'snippet': snippet_elem.get_text(strip=True)
141
  })
142
+ if len(results) >= max_results:
143
+ break
144
+ logger.info(f"Found {len(results)} search results for query: '{query}'")
145
+ return results
146
+ else:
147
+ logger.warning(f"Search attempt {attempt+1} for '{query}' failed with status: {response.status}")
148
  except Exception as e:
149
+ logger.error(f"Search attempt {attempt+1} for '{query}' failed with error: {e}")
150
+ if attempt < RETRY_ATTEMPTS - 1:
151
+ await asyncio.sleep(RETRY_DELAY)
 
 
152
  return []
153
 
154
+ async def process_web_source(session: aiohttp.ClientSession, source: dict, timeout: int = 15) -> Tuple[Optional[str], dict]:
155
+ """Fetches and extracts the main textual content from a web page."""
156
+ url = source.get('link')
157
+ if not url:
158
+ return None, source
159
+
160
  headers = {'User-Agent': await get_real_user_agent()}
161
  source_info = source.copy()
162
+ start_time = time.time()
 
 
 
 
 
 
 
 
163
 
164
  try:
165
+ if any(url.lower().endswith(ext) for ext in ['.pdf', '.jpg', '.png', '.zip', '.mp3', '.mp4']):
166
+ return None, source
167
+ async with session.get(url, headers=headers, timeout=timeout, ssl=False) as response:
168
+ if response.status != 200 or 'text/html' not in response.headers.get('Content-Type', ''):
169
+ return None, source
 
 
 
 
 
 
 
 
 
 
 
 
 
 
170
  html = await response.text()
171
  soup = BeautifulSoup(html, "html.parser")
172
+ for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']):
 
173
  tag.decompose()
174
+ main_content = soup.select_one('main, article, #main, #content, .main, .post-content, .entry-content')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
175
  if not main_content:
176
+ main_content = soup.body
177
+ if main_content:
178
+ content = " ".join(main_content.stripped_strings)
179
+ content = re.sub(r'\s{2,}', ' ', content).strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
180
  if len(content.split()) < 50:
181
+ return None, source
182
+ source_info['word_count'] = len(content.split())
183
+ source_info['processing_time'] = time.time() - start_time
184
+ return content, source_info
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
  except Exception as e:
186
+ logger.warning(f"Error processing source {url}: {str(e)}")
187
+ return None, source
188
 
189
  async def generate_research_plan(query: str, session: aiohttp.ClientSession) -> List[str]:
190
+ """Generates a list of sub-questions to guide the research process."""
191
+ plan_prompt = {
192
+ "model": LLM_MODEL,
193
+ "messages": [{
194
+ "role": "user",
195
+ "content": f"""You are a research strategist. Your task is to generate 5 distinct, insightful sub-questions to form a comprehensive research plan for the topic: '{query}'.
196
+ These questions will guide an AI in searching the web. Focus on different facets of the topic, such as its background, current state, key components, challenges, and future trends.
197
+ Your response MUST be ONLY a raw JSON array of strings, with no other text or explanation.
198
+ Example: ["What is the history of X?", "How does X compare to its main competitors?", "What are the primary use cases for X in 2025?"]"""
199
+ }],
200
+ "temperature": 0.7,
201
+ "max_tokens": 500
202
+ }
203
+ fallback_plan = [
204
+ f"What is the foundational definition and history of {query}?",
205
+ f"What are the core components and key features of {query}?",
206
+ f"Who are the major competitors or alternatives to {query}?",
207
+ f"What are the primary challenges and limitations associated with {query}?",
208
+ f"What are the latest trends and future predictions for {query}?"
209
+ ]
210
  try:
 
 
 
 
 
 
 
 
 
 
 
 
 
211
  async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=30) as response:
212
+ if response.status == 200:
213
+ result = await response.json()
 
 
 
 
214
  content = result['choices'][0]['message']['content']
215
  sub_questions = extract_json_from_llm_response(content)
216
+ if sub_questions and isinstance(sub_questions, list) and len(sub_questions) > 2:
217
+ return [str(q) for q in sub_questions]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
218
  except Exception as e:
219
+ logger.error(f"Failed to generate research plan: {e}")
220
+ return fallback_plan
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
221
 
222
+ async def run_deep_research_stream(query: str, search_time: int) -> AsyncGenerator[str, None]:
223
+ """The main orchestrator for the deep research process, yielding SSE events."""
224
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
225
  def format_sse(data: dict) -> str:
226
+ """Formats a dictionary into an SSE message string."""
227
  return f"data: {json.dumps(data)}\n\n"
228
 
229
  start_time = time.time()
230
+
 
 
 
231
  try:
232
+ yield format_sse({"event": "status", "data": "Initializing deep research..."})
233
+
 
 
 
234
  async with aiohttp.ClientSession() as session:
235
+ yield format_sse({"event": "status", "data": "Step 1: Generating research plan..."})
236
+ plan = await generate_research_plan(query, session)
237
+ yield format_sse({"event": "plan", "data": plan})
238
+
239
+ yield format_sse({"event": "status", "data": f"Step 2: Searching web for up to {search_time} seconds..."})
240
+ search_tasks = [fetch_search_results(q, session) for q in [query] + plan]
241
+ search_results_lists = await asyncio.gather(*search_tasks)
242
+
243
+ seen_urls = set()
244
+ all_sources = []
245
+ for result_list in search_results_lists:
246
+ for result in result_list:
247
+ if result['link'] not in seen_urls:
248
+ seen_urls.add(result['link'])
249
+ all_sources.append(result)
250
+
251
+ query_terms = query.lower().split()
252
+ all_sources.sort(key=lambda s: sum(1 for term in query_terms if term in s['title'].lower()), reverse=True)
253
+ selected_sources = all_sources[:MAX_SOURCES_TO_PROCESS]
254
+
255
+ yield format_sse({"event": "found_sources", "data": selected_sources})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
256
  if not selected_sources:
257
+ yield format_sse({"event": "error", "data": "Could not find any relevant web sources. Please try a different query."})
 
 
 
258
  return
259
 
260
+ yield format_sse({"event": "status", "data": f"Step 3: Processing {len(selected_sources)} sources..."})
261
+
262
  semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
263
+
264
+ async def process_source_and_yield(source):
265
+ """Helper to yield status before and after processing a source."""
266
+ yield format_sse({"event": "processing_source", "data": {"link": source.get('link'), "title": source.get('title')}})
267
+ content, source_info = await process_web_source(session, source)
268
+ if content:
269
+ yield format_sse({"event": "processed_source_success", "data": source_info})
270
+ else:
271
+ yield format_sse({"event": "processed_source_failure", "data": source_info})
272
+ return content, source_info
273
 
274
  async def process_with_semaphore(source):
275
  async with semaphore:
276
+ results = []
277
+ async for result in process_source_and_yield(source):
278
+ results.append(result)
279
+ return results
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
280
 
281
+ tasks = [asyncio.create_task(process_with_semaphore(source)) for source in selected_sources]
282
+
283
+ consolidated_context = ""
284
+ successful_sources = []
285
+
286
+ for task in asyncio.as_completed(tasks):
287
+ results = await task
288
+ for res in results:
289
+ if isinstance(res, str): # SSE event
290
+ yield res
291
+
292
+ # The actual return value is the last item
293
+ content, source_info = results[-1]
294
+ if content:
295
+ consolidated_context += f"Source URL: {source_info['link']}\nSource Title: {source_info['title']}\n\nContent:\n{content}\n\n---\n\n"
296
+ successful_sources.append(source_info)
297
+
298
+ if not successful_sources:
299
+ yield format_sse({"event": "error", "data": "Failed to extract content from any of the selected sources."})
300
+ return
301
 
302
+ yield format_sse({"event": "status", "data": f"Step 4: Synthesizing report from {len(successful_sources)} sources..."})
303
+
304
+ report_prompt = f"""You are an expert research analyst. Your task is to write a comprehensive, in-depth, and exceptionally long-form report on the topic: "{query}".
305
+ You have been provided with detailed content from {len(successful_sources)} web sources. You must use this information as the primary basis for your report.
306
+ **Report Requirements:**
307
+ 1. **Length:** The final report must be extremely detailed and thorough, aiming for a word count between 4,000 and 8,000 words. Do not write a short summary.
308
+ 2. **Structure:** Organize the report into the following sections. Elaborate extensively within each one.
309
+ - **Executive Summary:** A concise, high-level overview of the entire report.
310
+ - **Introduction:** Set the context and background for '{query}'.
311
+ - **Deep Dive Analysis:** This should be the largest part of the report. Break it down into multiple sub-sections based on the research plan: {', '.join(plan)}. Analyze, synthesize, and connect information from the provided sources. Do not just list facts; provide deep insights.
312
+ - **Challenges and Limitations:** A dedicated analysis of the problems, criticisms, and hurdles related to the topic.
313
+ - **Future Outlook and Predictions:** Analyze the future trends, potential developments, and expert predictions.
314
+ - **Conclusion:** Summarize the key findings and provide a final, conclusive statement.
315
+ 3. **Citations:** While you write, you MUST cite the information you use. After a sentence or paragraph that uses a source, add a citation like `[Source: example.com/article]`. Use the actual URLs provided in the context.
316
+ 4. **Formatting:** Use Markdown for clear headings, subheadings, bullet points, and bold text to improve readability.
317
+ **SOURCE MATERIAL:**
318
+ {consolidated_context[:1000000]}
319
+ Begin writing the full, comprehensive report now. Adhere strictly to all instructions.
320
+ """
321
 
322
  report_payload = {
323
  "model": LLM_MODEL,
324
  "messages": [{"role": "user", "content": report_prompt}],
325
  "stream": True,
326
+ # **CHANGE**: max_tokens parameter has been removed to allow for automatic/unlimited length.
327
  }
328
 
329
+ async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload, timeout=TOTAL_TIMEOUT) as response:
330
  if response.status != 200:
331
+ error_text = await response.text()
332
+ yield format_sse({"event": "error", "data": f"Report generation failed with status {response.status}: {error_text}"})
 
 
333
  return
334
 
335
+ # **CHANGE**: Switched to direct chunk streaming as requested.
336
  async for line in response.content:
 
 
 
 
 
 
 
337
  line_str = line.decode('utf-8').strip()
338
  if line_str.startswith('data:'):
339
+ content = line_str[5:].strip()
340
+ if content == "[DONE]":
341
+ break
342
+ try:
343
+ chunk = json.loads(content)
344
+ delta = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
345
+ if delta:
346
+ # Stream chunks as they arrive without line buffering
347
+ yield format_sse({"event": "chunk", "data": delta})
348
+ except (json.JSONDecodeError, IndexError):
349
+ continue
350
+
351
+ duration = time.time() - start_time
352
+ stats = {
353
+ "duration_seconds": round(duration, 1),
354
+ "sources_found": len(all_sources),
355
+ "sources_processed": len(selected_sources),
356
+ "sources_successful": len(successful_sources)
357
+ }
358
+ yield format_sse({"event": "stats", "data": stats})
359
+ yield format_sse({"event": "sources", "data": successful_sources})
360
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
361
  except Exception as e:
362
+ logger.critical(f"A critical error occurred in the research stream: {e}", exc_info=True)
363
+ yield format_sse({"event": "error", "data": f"An unexpected critical error occurred: {str(e)}"})
 
 
 
364
  finally:
365
+ yield format_sse({"event": "complete", "data": "Research process finished."})
 
 
 
 
366
 
367
  @app.post("/deep-research", response_class=StreamingResponse)
368
  async def deep_research_endpoint(request: DeepResearchRequest):
369
+ """The main API endpoint for initiating a deep research task."""
370
+ query = request.query.strip()
371
+ if not query or len(query) < 5:
372
+ raise HTTPException(status_code=400, detail="Query is too short. Please provide a more detailed query.")
373
+ search_time = max(60, min(request.search_time, 300))
374
+
375
  return StreamingResponse(
376
+ run_deep_research_stream(query, search_time),
377
  media_type="text/event-stream",
378
  headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
379
  )
380
 
381
  if __name__ == "__main__":
382
  import uvicorn
383
+ uvicorn.run(app, host="0.0.0.0", port=8000)