File size: 5,451 Bytes
d89eaa3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
from __future__ import annotations
import asyncio, logging, re, tiktoken
from typing import Dict, List
from config import CFG, _SESS
from fetchers_async import fetch_url
from web_helpers import retry
from urllib.parse import urlparse
enc = tiktoken.get_encoding("cl100k_base")
# ββ Google / Serper search ββββββββββββββββββββββββββββββββββββββββββββββ
# def google_search(query: str, top_k: int = 10) -> List[Dict[str,str]]:
# if not CFG.serper_key:
# raise EnvironmentError("SERPER_API_KEY not set")
# r = _SESS.post(
# CFG.serper_ep,
# headers={"X-API-KEY": CFG.serper_key, "Content-Type": "application/json"},
# json={"q": query}, timeout=20)
# r.raise_for_status()
# hits = []
# for it in r.json().get("organic", []):
# hits.append({"title": it.get("title",""),
# "link": it.get("link",""),
# "snippet": it.get("snippet","")})
# if len(hits) == top_k: break
# return hits
import hashlib, json, logging, os, time
from typing import List, Dict
def _canon_query(q: str) -> str:
# Normalize whitespace to avoid duplicate keys for e.g. "foo bar"
return " ".join((q or "").strip().split())
def _search_cache_key(query: str, top_k: int) -> str:
cq = _canon_query(query)
raw = f"{top_k}|{cq}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest() + ".json"
def _search_cache_paths(query: str, top_k: int) -> str:
root = CFG.serper_cache_dir
os.makedirs(root, exist_ok=True)
return os.path.join(root, _search_cache_key(query, top_k))
def _ttl_seconds() -> int:
# 0 or missing β no expiry
try:
return int(getattr(CFG, "search_cache_ttl", 0) or int(os.environ.get("SEARCH_CACHE_TTL", "0")))
except Exception:
return 0
def _load_search_cache(path: str) -> List[Dict[str, str]] | None:
try:
if not os.path.exists(path) or os.path.getsize(path) <= 2:
return None
ttl = _ttl_seconds()
if ttl > 0:
age = time.time() - os.path.getmtime(path)
if age > ttl:
return None
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
# Basic shape check: list of dicts with expected keys
if isinstance(data, list):
return data
except Exception as e:
logging.debug("Serper cache read failed (%s): %s", path, e)
return None
def _save_search_cache(path: str, hits: List[Dict[str, str]]) -> None:
try:
tmp = f"{path}.tmp.{os.getpid()}"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(hits, f, ensure_ascii=False)
os.replace(tmp, path) # atomic on same FS
except Exception as e:
logging.debug("Serper cache write failed (%s): %s", path, e)
@retry
def google_search(query: str, top_k: int = 10) -> List[Dict[str,str]]:
if not CFG.serper_key:
raise EnvironmentError("SERPER_API_KEY not set")
cpath = _search_cache_paths(query, top_k)
cached = _load_search_cache(cpath)
if cached is not None:
logging.info("Serper search (cache hit) β %r (top_k=%d)", _canon_query(query), top_k)
return cached
r = _SESS.post(
CFG.serper_ep,
headers={"X-API-KEY": CFG.serper_key, "Content-Type": "application/json"},
json={"q": query},
timeout=20
)
r.raise_for_status()
hits: List[Dict[str, str]] = []
for it in r.json().get("organic", []):
hits.append({
"title": it.get("title", ""),
"link": it.get("link", ""),
"snippet": it.get("snippet", ""),
})
if len(hits) == top_k:
break
_save_search_cache(cpath, hits)
return hits
# ββ async extract per hit βββββββββββββββββββββββββββββββββββββββββββββββ
async def async_search_and_extract(query: str, top_k: int = 5) -> List[Dict]:
hits = google_search(query, top_k)
async def enrich(h):
return {**h, "body": await fetch_url(h["link"])}
return await asyncio.gather(*(enrich(h) for h in hits))
# ββ markdown helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββ
def url_hits_to_markdown(hits: List[Dict[str,str]]) -> str:
out = []
for i, h in enumerate(hits, 1):
out.append(f"### {i}. {h['title']}\n**URL**: {h['link']}\n\n**Snippet**: {h['snippet']}\n")
return "\n---\n\n".join(out)
def search_result_to_markdown(blocks: List[Dict]) -> str:
out = []
for i, b in enumerate(blocks, 1):
out.append(f"### {i}. **Title**: {b['title']}\n**URL**: {b['link']}\n\n"
f"**Snippet**: {b['snippet']}\n\n**Content**:\n{b['body']}\n")
return "\n---\n\n".join(out)
def trim_to_tokens(text: str, limit: int, model: str = "gpt-3.5-turbo") -> str:
ids = enc.encode(text)
if len(ids) <= limit: return text
keep = limit // 2
return enc.decode(ids[:keep] + ids[-keep:])
def _bad(url: str) -> str|None:
p = urlparse(url)
if p.scheme not in ("http","https") or not p.netloc:
return "[error: invalid URL β must start with http:// or https://]"
return None
|