infoseeker-4b / fetchers_async.py
shreyess's picture
Upload folder using huggingface_hub
d89eaa3 verified
# """
# fetchers_async.py – Orchestrates multiple specialised fetchers **without changing
# its public surface** (`async def fetch_url(url: str) -> str`).
# Order of strategies (after specialised handlers):
# 1. **Jina AI** – fast & cheap full‑text extraction
# 2. **Crawl4AI** – browser‑based heavy‑weight fallback
# 3. **Legacy HTML** – trafilatura / readability last‑chance scrape
# Specialised fetchers (PDF, YouTube, Reddit) remain unchanged.
# """
# from __future__ import annotations
# import asyncio, logging
# from typing import Callable
# from web_helpers import retry
# from fetchers.pdf_fetcher import fetch_pdf
# from fetchers.youtube_fetcher import fetch_youtube
# from fetchers.reddit_fetcher import fetch_reddit
# from fetchers.github_fetcher import fetch_github
# from fetchers.jina_fetcher import fetch_jina
# from fetchers.crawl4ai_fetcher import fetch_crawl4ai
# from fetchers.basic_fetcher import fetch_html
# _ERR_PREFIXES = ("[error", "[failed", "[unable")
# def _looks_error(txt: str | None) -> bool:
# return not txt or txt.strip().lower().startswith(_ERR_PREFIXES)
# async def _thread_wrapper(fn: Callable[[str], str], url: str) -> str | None:
# try:
# return await asyncio.to_thread(fn, url)
# except Exception as exc:
# logging.debug("%s threw in thread: %s", fn.__name__, exc)
# @retry
# async def fetch_url(url: str) -> str:
# url_l = url.lower()
# # 1 – Jina AI ------------------------------------------------------------
# if (out := await _thread_wrapper(fetch_jina, url)) and not _looks_error(out):
# return out
# # if (out := await _thread_wrapper(fetch_html, url)) and not _looks_error(out):
# # return out
# # 2 – Crawl4AI -----------------------------------------------------------
# try:
# md = await fetch_crawl4ai(url)
# if not _looks_error(md):
# return md
# except Exception as e:
# logging.debug("Crawl4AI failed: %s", e)
# if "pdf" in url_l:
# if (out := await _thread_wrapper(fetch_pdf, url)) and not _looks_error(out):
# return out
# if "reddit" in url_l:
# if (out := await _thread_wrapper(fetch_reddit, url)) and not _looks_error(out):
# return out
# if "youtube" in url_l:
# if (out := await _thread_wrapper(fetch_youtube, url)) and not _looks_error(out):
# return out
# if "github" in url_l:
# if (out := await _thread_wrapper(fetch_github, url)) and not _looks_error(out):
# return out
# # 3 – Basic HTML --------------------------------------------------------
# if (out := await _thread_wrapper(fetch_html, url)) and not _looks_error(out):
# return out
# return "[error fetch_url exhausted all methods]"
import asyncio, logging, time
from fetchers.pdf_fetcher import fetch_pdf
from fetchers.youtube_fetcher import fetch_youtube
from fetchers.reddit_fetcher import fetch_reddit
from fetchers.github_fetcher import fetch_github
from fetchers.jina_fetcher import fetch_jina
from fetchers.crawl4ai_fetcher import fetch_crawl4ai
from fetchers.basic_fetcher import fetch_html
_ERR_PREFIXES = ("[error", "[failed", "[unable]")
def _looks_error(txt: str | None) -> bool:
return not txt or txt.strip().lower().startswith(_ERR_PREFIXES)
# per-fetcher hard caps (seconds)
_FETCHER_TIMEOUTS = {
"fetch_jina": 20.0,
"fetch_github": 10.0,
"fetch_crawl4ai": 40.0,
"fetch_html": 20.0,
"fetch_pdf": 30.0,
"fetch_youtube": 30.0,
"fetch_reddit": 10.0,
}
async def fetch_url(url: str) -> str:
url_l = url.lower()
async def timed_fetch(fn) -> str | None:
name = fn.__name__
timeout = _FETCHER_TIMEOUTS.get(name, 60.0)
start_ts = time.perf_counter()
try:
# choose sync or async execution path
coro = fn(url) if asyncio.iscoroutinefunction(fn) else asyncio.to_thread(fn, url)
result = await asyncio.wait_for(coro, timeout=timeout)
elapsed = (time.perf_counter() - start_ts) * 1000
if result and not _looks_error(result):
logging.info(f"[{name}] ✅ success in {elapsed:.1f} ms")
return result
logging.warning(f"[{name}] ❌ error response in {elapsed:.1f} ms")
except asyncio.TimeoutError:
logging.warning(f"[{name}] ⏱️ timed-out after {timeout}s")
except Exception as e:
elapsed = (time.perf_counter() - start_ts) * 1000
logging.warning(f"[{name}] 💥 exception in {elapsed:.1f} ms → {e}")
return None
async def try_chain(*fetchers) -> str | None:
for fn in fetchers:
if result := await timed_fetch(fn):
return result
return None
# -------------- domain-specific chains ---------------
if "github.com" in url_l:
return await try_chain(fetch_jina, fetch_github, fetch_crawl4ai)
if "wikipedia.org" in url_l:
return await try_chain(fetch_html, fetch_jina, fetch_crawl4ai)
if "reddit.com" in url_l:
return await try_chain(fetch_jina, fetch_reddit, fetch_html)
if "quora.com" in url_l:
return await try_chain(fetch_crawl4ai, fetch_jina, fetch_html)
if "youtube.com" in url_l or "youtu.be" in url_l:
return await try_chain(fetch_jina, fetch_youtube)
if url_l.endswith(".pdf") or "pdf" in url_l:
return await try_chain(fetch_jina, fetch_pdf, fetch_html, fetch_crawl4ai)
# -------------- generic fallback ---------------------
return (await try_chain(fetch_jina, fetch_crawl4ai, fetch_html)
or "[error fetch_url exhausted all methods]")