File size: 5,845 Bytes
d89eaa3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# """
# fetchers_async.py – Orchestrates multiple specialised fetchers **without changing
# its public surface** (`async def fetch_url(url: str) -> str`).
# Order of strategies (after specialised handlers):
# 1. **Jina AI** – fast & cheap full‑text extraction
# 2. **Crawl4AI** – browser‑based heavy‑weight fallback
# 3. **Legacy HTML** – trafilatura / readability last‑chance scrape
# Specialised fetchers (PDF, YouTube, Reddit) remain unchanged.
# """
# from __future__ import annotations
# import asyncio, logging
# from typing import Callable
# from web_helpers import retry
# from fetchers.pdf_fetcher import fetch_pdf
# from fetchers.youtube_fetcher import fetch_youtube
# from fetchers.reddit_fetcher import fetch_reddit
# from fetchers.github_fetcher import fetch_github
# from fetchers.jina_fetcher import fetch_jina
# from fetchers.crawl4ai_fetcher import fetch_crawl4ai
# from fetchers.basic_fetcher import fetch_html
# _ERR_PREFIXES = ("[error", "[failed", "[unable")
# def _looks_error(txt: str | None) -> bool:
# return not txt or txt.strip().lower().startswith(_ERR_PREFIXES)
# async def _thread_wrapper(fn: Callable[[str], str], url: str) -> str | None:
# try:
# return await asyncio.to_thread(fn, url)
# except Exception as exc:
# logging.debug("%s threw in thread: %s", fn.__name__, exc)
# @retry
# async def fetch_url(url: str) -> str:
# url_l = url.lower()
# # 1 – Jina AI ------------------------------------------------------------
# if (out := await _thread_wrapper(fetch_jina, url)) and not _looks_error(out):
# return out
# # if (out := await _thread_wrapper(fetch_html, url)) and not _looks_error(out):
# # return out
# # 2 – Crawl4AI -----------------------------------------------------------
# try:
# md = await fetch_crawl4ai(url)
# if not _looks_error(md):
# return md
# except Exception as e:
# logging.debug("Crawl4AI failed: %s", e)
# if "pdf" in url_l:
# if (out := await _thread_wrapper(fetch_pdf, url)) and not _looks_error(out):
# return out
# if "reddit" in url_l:
# if (out := await _thread_wrapper(fetch_reddit, url)) and not _looks_error(out):
# return out
# if "youtube" in url_l:
# if (out := await _thread_wrapper(fetch_youtube, url)) and not _looks_error(out):
# return out
# if "github" in url_l:
# if (out := await _thread_wrapper(fetch_github, url)) and not _looks_error(out):
# return out
# # 3 – Basic HTML --------------------------------------------------------
# if (out := await _thread_wrapper(fetch_html, url)) and not _looks_error(out):
# return out
# return "[error fetch_url exhausted all methods]"
import asyncio, logging, time
from fetchers.pdf_fetcher import fetch_pdf
from fetchers.youtube_fetcher import fetch_youtube
from fetchers.reddit_fetcher import fetch_reddit
from fetchers.github_fetcher import fetch_github
from fetchers.jina_fetcher import fetch_jina
from fetchers.crawl4ai_fetcher import fetch_crawl4ai
from fetchers.basic_fetcher import fetch_html
_ERR_PREFIXES = ("[error", "[failed", "[unable]")
def _looks_error(txt: str | None) -> bool:
return not txt or txt.strip().lower().startswith(_ERR_PREFIXES)
# per-fetcher hard caps (seconds)
_FETCHER_TIMEOUTS = {
"fetch_jina": 20.0,
"fetch_github": 10.0,
"fetch_crawl4ai": 40.0,
"fetch_html": 20.0,
"fetch_pdf": 30.0,
"fetch_youtube": 30.0,
"fetch_reddit": 10.0,
}
async def fetch_url(url: str) -> str:
url_l = url.lower()
async def timed_fetch(fn) -> str | None:
name = fn.__name__
timeout = _FETCHER_TIMEOUTS.get(name, 60.0)
start_ts = time.perf_counter()
try:
# choose sync or async execution path
coro = fn(url) if asyncio.iscoroutinefunction(fn) else asyncio.to_thread(fn, url)
result = await asyncio.wait_for(coro, timeout=timeout)
elapsed = (time.perf_counter() - start_ts) * 1000
if result and not _looks_error(result):
logging.info(f"[{name}] ✅ success in {elapsed:.1f} ms")
return result
logging.warning(f"[{name}] ❌ error response in {elapsed:.1f} ms")
except asyncio.TimeoutError:
logging.warning(f"[{name}] ⏱️ timed-out after {timeout}s")
except Exception as e:
elapsed = (time.perf_counter() - start_ts) * 1000
logging.warning(f"[{name}] 💥 exception in {elapsed:.1f} ms → {e}")
return None
async def try_chain(*fetchers) -> str | None:
for fn in fetchers:
if result := await timed_fetch(fn):
return result
return None
# -------------- domain-specific chains ---------------
if "github.com" in url_l:
return await try_chain(fetch_jina, fetch_github, fetch_crawl4ai)
if "wikipedia.org" in url_l:
return await try_chain(fetch_html, fetch_jina, fetch_crawl4ai)
if "reddit.com" in url_l:
return await try_chain(fetch_jina, fetch_reddit, fetch_html)
if "quora.com" in url_l:
return await try_chain(fetch_crawl4ai, fetch_jina, fetch_html)
if "youtube.com" in url_l or "youtu.be" in url_l:
return await try_chain(fetch_jina, fetch_youtube)
if url_l.endswith(".pdf") or "pdf" in url_l:
return await try_chain(fetch_jina, fetch_pdf, fetch_html, fetch_crawl4ai)
# -------------- generic fallback ---------------------
return (await try_chain(fetch_jina, fetch_crawl4ai, fetch_html)
or "[error fetch_url exhausted all methods]")
|