infoseeker-4b / compressor.py
shreyess's picture
Upload folder using huggingface_hub
d89eaa3 verified
# compressor.py
from __future__ import annotations
import functools, json, logging, re
from difflib import SequenceMatcher
from io import StringIO
from typing import Dict, List, Tuple
import pandas as pd
import regex # needed by tiktoken
import tiktoken
from bs4 import BeautifulSoup
from config import CFG
from web_helpers import retry
# ────────────────────────────────────────────────────────────────────────
# 0. shared helpers
# ------------------------------------------------------------------------
enc = tiktoken.get_encoding("cl100k_base")
_tok = lambda s: len(enc.encode(s)) # fast inline counter
@functools.lru_cache(maxsize=1)
def _nlp():
import spacy
return spacy.load("en_core_web_sm")
def _openai_client():
"""Import OpenAI lazily to avoid overhead when not needed."""
import importlib
mod = importlib.import_module("openai")
return getattr(mod, "OpenAI", None)() if hasattr(mod, "OpenAI") else mod
# ────────────────────────────────────────────────────────────────────────
# 1. regex patterns (compiled once)
# ------------------------------------------------------------------------
DATE_PATS = [re.compile(p, re.I) for p in [
r"\d{4}-\d{2}-\d{2}",
r"(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{1,2},\s+\d{4}",
r"(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},\s+\d{4}",
r"\d{1,2}\s+(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{4}",
r"\b\d{4}/\d{2}\b",
r"\b\d{4}\b(?!\s*(?:%|million|billion|thousand))",
]]
EMAIL_PAT = re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+")
URL_PAT = re.compile(r"https?://[^\s\)]+")
PHONE_PAT = re.compile(r"\+?\d[\d\s\-().]{7,}\d")
CURR_PAT = re.compile(r"(\$\s?\d+(?:,\d{3})*(?:\.\d+)?|\d+(?:,\d{3})*(?:\.\d+)?\s*(USD|EUR|GBP|INR|Β₯|β‚©|β‚Ή|€))", re.I)
DEF_PAT = re.compile(r"([A-Z][A-Za-z0-9\s]+?)\s+(is|are|refers to|means)\s+(.*?)(?:[\.\n])")
MD_TABLE_PAT = re.compile(
r"(?:^\|.*?\|\n?)+(?:^\|[-:\s|]+\|\n?)?(?:^\|.*?\|\n?)+", re.M)
CSV_PAT = re.compile(r"((?:^.*?,.*?\n){2,})", re.M)
TSV_PAT = re.compile(r"((?:^.*?\t.*?\n){2,})", re.M)
# ────────────────────────────────────────────────────────────────────────
# 2. core utilities
# ------------------------------------------------------------------------
def deduplicate_items(items: List[str], *, similarity=0.5,
other: List[str] | None = None) -> List[str]:
"""Drop near‑duplicates; prefer the longest variant."""
if not items: return []
other = other or []
def _clean(x: str) -> str:
x = re.sub(r'\[edit\]|\[\d+\]', '', x)
return re.sub(r'\s+', ' ', x).strip()
out, out_clean = [], []
for orig in items:
clean = _clean(orig)
dup = False
for ref in out_clean + list(map(_clean, other)):
sim = SequenceMatcher(None, clean, ref).ratio()
if sim >= similarity or clean in ref or ref in clean:
dup = True
# if current is longer than stored, replace
if clean not in out_clean and len(clean) > len(ref):
idx = out_clean.index(ref)
out[idx], out_clean[idx] = orig, clean
break
if not dup:
out.append(orig)
out_clean.append(clean)
return out
# ────────────────────────────────────────────────────────────────────────
# 3. fact & table extractor
# ------------------------------------------------------------------------
def extract_facts_and_tables(text: str) -> Tuple[str, List[str], List[str]]:
facts, spans = [], []
def _add(match):
facts.append(match.group())
spans.append(match.span())
for pat in DATE_PATS: [_add(m) for m in pat.finditer(text)]
for m in EMAIL_PAT.finditer(text): _add(m)
for m in URL_PAT.finditer(text): _add(m)
for m in PHONE_PAT.finditer(text): _add(m)
for m in CURR_PAT.finditer(text): _add(m)
for m in DEF_PAT.finditer(text): _add(m)
# contextual sentences around facts
doc = _nlp()(text)
ctx = [s.text.strip() for s in doc.sents
if any(s.start_char <= s_ <= s.end_char for s_, _ in spans)]
facts.extend(ctx)
facts = sorted(set(facts))
# ── tables
tables = []
for tbl in MD_TABLE_PAT.findall(text):
cleaned = "\n".join(l for l in tbl.splitlines()
if l.strip() and not re.match(r"^\|[-:\s|]+\|$", l))
if len(cleaned.splitlines()) < 2: continue
try:
df = pd.read_csv(StringIO(cleaned), sep="|").dropna(how="all", axis=1)
tables.append(df.to_markdown(index=False))
except Exception:
tables.append(cleaned)
soup = BeautifulSoup(text, "lxml")
for html_tbl in soup.find_all("table"):
try:
df = pd.read_html(str(html_tbl))[0]
tables.append(df.to_markdown(index=False))
except Exception:
tables.append(str(html_tbl))
for m in CSV_PAT.finditer(text):
try:
df = pd.read_csv(StringIO(m.group(1)))
if not df.empty:
tables.append(df.to_markdown(index=False))
except Exception:
pass
for m in TSV_PAT.finditer(text):
try:
df = pd.read_csv(StringIO(m.group(1)), sep="\t")
if not df.empty:
tables.append(df.to_markdown(index=False))
except Exception:
pass
# ── clean narrative (remove facts & tables)
narrative = text
for tbl in tables: narrative = narrative.replace(tbl, " ")
for s, e in sorted(spans, reverse=True): narrative = narrative[:s] + narrative[e:]
narrative = re.sub(r"\s{2,}", " ", narrative).strip()
return narrative, facts, tables
# ────────────────────────────────────────────────────────────────────────
# 4. OpenAI summariser helpers
# ------------------------------------------------------------------------
def _summarise(text: str, pct: float, model: str) -> str:
target_tokens = int(_tok(text) * pct)
sys_prompt = (
"You are an expert abstractor. Summarize the text below to "
f"approximately {pct*100:.0f}% of its original length (β‰ˆ{target_tokens} tokens), "
"while **retaining all key facts, figures, names, dates, places, and events**. "
"Ensure the summary remains accurate, informative, and faithful to the original content."
)
client = _openai_client()
rsp = client.chat.completions.create(
model=model, temperature=0.2,
messages=[{"role":"system","content":sys_prompt},
{"role":"user","content":text}],
max_tokens=CFG.output_limit_per_link
)
return rsp.choices[0].message.content
# ────────────────────────────────────────────────────────────────────────
# 5. compress_text (public)
# ------------------------------------------------------------------------
def compress_text(text: str, *, pct: float = 0.3,
model: str = "gpt-4o-mini") -> Dict[str, str]:
FACTS_TABLES_LIMIT = CFG.output_limit_per_link - CFG.disable_narrative_compress_thresh
narrative, facts, tables = extract_facts_and_tables(text)
# narrative compression
if _tok(narrative) > CFG.disable_narrative_compress_thresh:
narrative_txt = _summarise(narrative, pct, model)
else:
narrative_txt = narrative
return narrative_txt
# ────────────────────────────────────────────────────────────────────────
# 6. query_text (goal‑oriented extraction)
# ------------------------------------------------------------------------
EXTRACTOR_SYS_PROMPT = (
"You are a highly skilled information extraction agent. Your job is to analyze long, complex webpages "
"in the context of a specific user goal. You excel at identifying relevant sections, capturing supporting evidence "
"in full original context, and providing logically structured summaries. Always ensure precision, completeness, "
"and alignment with the user’s intent."
)
EXTRACTOR_PROMPT_TEMPLATE = """You are a highly skilled information extraction agent. Your task is to analyze the following webpage content in light of a specific user goal, and extract accurate, well-structured information using plain text format.
## Webpage Content
{webpage_content}
## User Goal
{goal}
## Task Guidelines
1. **Rational**: Briefly explain why this content is relevant to the user’s goal.
2. **Evidence**: Quote the most relevant parts of the webpage that directly support or address the goal. Use bullet points or numbered lines separated by newlines.
3. **Summary**: Provide a clear, logically structured summary of the extracted evidence that addresses the user's goal.
## Output Format
Your response must follow **exactly this format** with the three sections:
Rational: <one paragraph>
Evidence: <first point>\n<second point>...
Summary:<concise paragraph summarizing the evidence>
"""
def extract_regex(text: str) -> Dict[str, str]:
def extract_section(header: str) -> str:
# Match the section starting with `Header:` until the next capitalized line followed by `:` or end
pattern = rf"{header}:\s*(.*?)(?=\n[A-Z][a-z]+:|\Z)"
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
return match.group(1).strip() if match else "(not found)"
return {
"rational": extract_section("Rational"),
"evidence": extract_section("Evidence"),
"summary": extract_section("Summary")
}
def query_text(
url: str,
text: str,
goal: str,
*,
model: str = "gpt-4.1-mini",
max_attempts: int = 3,
) -> Dict[str, str]:
"""Goal‑oriented extractor with retries β†’ compress fallback β†’ token trim fallback."""
prompt = EXTRACTOR_PROMPT_TEMPLATE.format(
webpage_content=text[:15_000], # clip for safety
goal=goal,
)
client = _openai_client()
for attempt in range(1, max_attempts + 1):
try:
rsp = client.chat.completions.create(
model=model,
temperature=0.0,
messages=[
{"role": "system", "content": EXTRACTOR_SYS_PROMPT},
{"role": "user", "content": prompt},
],
max_tokens = 1024
).choices[0].message.content
extracted = extract_regex(rsp)
# Sanity check: evidence + summary must be > 20 characters
if len(extracted.get("evidence", "")) + len(extracted.get("summary", "")) > 20:
return {
"extracted_info": (
f"The useful information in {url} for goal β€œ{goal}”:\n\n"
f"Rationale:\n{extracted.get('rational')}\n\n"
f"Evidence:\n{extracted.get('evidence')}\n\n"
f"Summary:\n{extracted.get('summary')}"
)
}
raise ValueError("LLM returned empty or malformed extraction")
except Exception as e:
logging.warning("Attempt %d/%d failed for query-based extraction: %s",
attempt, max_attempts, e)
# ── Retry fallback: compress text ─────────────────────────────────────
try:
compressed = compress_text(text, model=model)
return {
"extracted_info": (
f"Goal-based extraction failed after {max_attempts} attempts; "
f"returning compressed webpage:\n\n{compressed}"
)
}
except Exception as ce:
logging.error("compress_text also failed: %s", ce)
# ── Final fallback: hard truncate to token budget ────────────────────
return {
"extracted_info": (
"Goal-based extraction and compression both failed; "
"returning truncated webpage content:\n\n" +
trim_to_budget(text, CFG.output_limit_per_link, model=model)
)
}
# ────────────────────────────────────────────────────────────────────────
# 7. helper: trim long lists to token budget
# ------------------------------------------------------------------------
def trim_to_budget(items: List[str], budget: int, *,
is_table: bool) -> Tuple[str, int]:
build, used = [], 0
for it in items:
toks = _tok(it)
if used + toks > budget:
break
build.append(it)
used += toks
if len(build) < len(items):
build.append(f"[{len(items)-len(build)} {'tables' if is_table else 'facts'} omitted]")
joined = "\n\n".join(build) if is_table else "\n".join(build)
return joined, _tok(joined)