File size: 5,451 Bytes
d89eaa3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from __future__ import annotations
import asyncio, logging, re, tiktoken
from typing import Dict, List
from config import CFG, _SESS
from fetchers_async import fetch_url
from web_helpers import retry
from urllib.parse import urlparse

enc = tiktoken.get_encoding("cl100k_base")

# ── Google / Serper search ──────────────────────────────────────────────

# def google_search(query: str, top_k: int = 10) -> List[Dict[str,str]]:
#     if not CFG.serper_key:
#         raise EnvironmentError("SERPER_API_KEY not set")
#     r = _SESS.post(
#         CFG.serper_ep,
#         headers={"X-API-KEY": CFG.serper_key, "Content-Type": "application/json"},
#         json={"q": query}, timeout=20)
#     r.raise_for_status()
#     hits = []
#     for it in r.json().get("organic", []):
#         hits.append({"title": it.get("title",""),
#                      "link":  it.get("link",""),
#                      "snippet": it.get("snippet","")})
#         if len(hits) == top_k: break
#     return hits
import hashlib, json, logging, os, time
from typing import List, Dict

def _canon_query(q: str) -> str:
    # Normalize whitespace to avoid duplicate keys for e.g. "foo  bar"
    return " ".join((q or "").strip().split())


def _search_cache_key(query: str, top_k: int) -> str:
    cq = _canon_query(query)
    raw = f"{top_k}|{cq}"
    return hashlib.sha256(raw.encode("utf-8")).hexdigest() + ".json"

def _search_cache_paths(query: str, top_k: int) -> str:
    root = CFG.serper_cache_dir
    os.makedirs(root, exist_ok=True)
    return os.path.join(root, _search_cache_key(query, top_k))

def _ttl_seconds() -> int:
    # 0 or missing β†’ no expiry
    try:
        return int(getattr(CFG, "search_cache_ttl", 0) or int(os.environ.get("SEARCH_CACHE_TTL", "0")))
    except Exception:
        return 0

def _load_search_cache(path: str) -> List[Dict[str, str]] | None:
    try:
        if not os.path.exists(path) or os.path.getsize(path) <= 2:
            return None
        ttl = _ttl_seconds()
        if ttl > 0:
            age = time.time() - os.path.getmtime(path)
            if age > ttl:
                return None
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
        # Basic shape check: list of dicts with expected keys
        if isinstance(data, list):
            return data
    except Exception as e:
        logging.debug("Serper cache read failed (%s): %s", path, e)
    return None

def _save_search_cache(path: str, hits: List[Dict[str, str]]) -> None:
    try:
        tmp = f"{path}.tmp.{os.getpid()}"
        with open(tmp, "w", encoding="utf-8") as f:
            json.dump(hits, f, ensure_ascii=False)
        os.replace(tmp, path)  # atomic on same FS
    except Exception as e:
        logging.debug("Serper cache write failed (%s): %s", path, e)
        
    
@retry
def google_search(query: str, top_k: int = 10) -> List[Dict[str,str]]:
    if not CFG.serper_key:
        raise EnvironmentError("SERPER_API_KEY not set")

    cpath = _search_cache_paths(query, top_k)
    cached = _load_search_cache(cpath)
    if cached is not None:
        logging.info("Serper search (cache hit) ← %r (top_k=%d)", _canon_query(query), top_k)
        return cached

    r = _SESS.post(
        CFG.serper_ep,
        headers={"X-API-KEY": CFG.serper_key, "Content-Type": "application/json"},
        json={"q": query},
        timeout=20
    )
    r.raise_for_status()
    hits: List[Dict[str, str]] = []
    for it in r.json().get("organic", []):
        hits.append({
            "title": it.get("title", ""),
            "link": it.get("link", ""),
            "snippet": it.get("snippet", ""),
        })
        if len(hits) == top_k:
            break

    _save_search_cache(cpath, hits)
    return hits


# ── async extract per hit ───────────────────────────────────────────────
async def async_search_and_extract(query: str, top_k: int = 5) -> List[Dict]:
    hits = google_search(query, top_k)
    async def enrich(h):
        return {**h, "body": await fetch_url(h["link"])}
    return await asyncio.gather(*(enrich(h) for h in hits))

# ── markdown helpers ────────────────────────────────────────────────────
def url_hits_to_markdown(hits: List[Dict[str,str]]) -> str:
    out = []
    for i, h in enumerate(hits, 1):
        out.append(f"### {i}. {h['title']}\n**URL**: {h['link']}\n\n**Snippet**: {h['snippet']}\n")
    return "\n---\n\n".join(out)

def search_result_to_markdown(blocks: List[Dict]) -> str:
    out = []
    for i, b in enumerate(blocks, 1):
        out.append(f"### {i}. **Title**: {b['title']}\n**URL**: {b['link']}\n\n"
                   f"**Snippet**: {b['snippet']}\n\n**Content**:\n{b['body']}\n")
    return "\n---\n\n".join(out)

def trim_to_tokens(text: str, limit: int, model: str = "gpt-3.5-turbo") -> str:
    ids = enc.encode(text)
    if len(ids) <= limit: return text
    keep = limit // 2
    return enc.decode(ids[:keep] + ids[-keep:])

def _bad(url: str) -> str|None:
    p = urlparse(url)
    if p.scheme not in ("http","https") or not p.netloc:
        return "[error: invalid URL – must start with http:// or https://]"
    return None