|
|
import asyncio |
|
|
import logging |
|
|
import re |
|
|
import requests |
|
|
|
|
|
from bs4 import BeautifulSoup |
|
|
from langchain.chains import RetrievalQA |
|
|
from langchain_community.document_loaders import WebBaseLoader |
|
|
from langchain_core.documents import Document |
|
|
from markdownify import markdownify as md |
|
|
from playwright.async_api import async_playwright |
|
|
from typing import Any, AsyncIterator, Dict, List, Iterator, Optional, Sequence, Union |
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
UNWANTED_SECTIONS = { |
|
|
"references", |
|
|
"external links", |
|
|
"further reading", |
|
|
"see also", |
|
|
"notes", |
|
|
} |
|
|
|
|
|
|
|
|
def build_metadata(soup: Any, url: str) -> dict: |
|
|
"""Build metadata from BeautifulSoup output.""" |
|
|
metadata = {"source": url} |
|
|
if title := soup.find("title"): |
|
|
metadata["title"] = title.get_text() |
|
|
if description := soup.find("meta", attrs={"name": "description"}): |
|
|
metadata["description"] = description.get("content", "No description found.") |
|
|
if html := soup.find("html"): |
|
|
metadata["language"] = html.get("lang", "No language found.") |
|
|
return metadata |
|
|
|
|
|
|
|
|
class MarkdownWebBaseLoader(WebBaseLoader): |
|
|
""" |
|
|
A WebBaseLoader subclass that uses Playwright to render JS, then |
|
|
strips boilerplate and converts structured pieces to Markdown. |
|
|
""" |
|
|
def __init__( |
|
|
self, |
|
|
web_path: Union[str, Sequence[str]] = "", |
|
|
header_template: Optional[dict] = None, |
|
|
verify_ssl: bool = True, |
|
|
proxies: Optional[dict] = None, |
|
|
continue_on_failure: bool = False, |
|
|
autoset_encoding: bool = True, |
|
|
encoding: Optional[str] = None, |
|
|
web_paths: Sequence[str] = (), |
|
|
requests_per_second: int = 2, |
|
|
default_parser: str = "html.parser", |
|
|
requests_kwargs: Optional[Dict[str, Any]] = None, |
|
|
raise_for_status: bool = False, |
|
|
bs_get_text_kwargs: Optional[Dict[str, Any]] = None, |
|
|
bs_kwargs: Optional[Dict[str, Any]] = None, |
|
|
session: Any = None, |
|
|
markdown_kwargs: Optional[Dict[str, Any]] = None, |
|
|
unwanted_css: Optional[List[str]] = None, |
|
|
unwanted_headings: Optional[List[str]] = None, |
|
|
render_wait: float = 1.0, |
|
|
*, |
|
|
show_progress: bool = True, |
|
|
trust_env: bool = False, |
|
|
) -> None: |
|
|
"""Initialize loader. |
|
|
|
|
|
Args: |
|
|
markdown_kwargs: Optional[Dict[str, Any]]: Arguments for markdownify. |
|
|
unwanted_css: Optional[List[str]]: CSS selectors to remove from the page. |
|
|
unwanted_headings: Optional[List[str]]: Headings to remove from the page. |
|
|
render_wait: float: Time to wait for JS rendering (default: 2.0 seconds). |
|
|
""" |
|
|
super().__init__( |
|
|
web_path=web_path, |
|
|
header_template=header_template, |
|
|
verify_ssl=verify_ssl, |
|
|
proxies=proxies, |
|
|
continue_on_failure=continue_on_failure, |
|
|
autoset_encoding=autoset_encoding, |
|
|
encoding=encoding, |
|
|
web_paths=web_paths, |
|
|
requests_per_second=requests_per_second, |
|
|
default_parser=default_parser, |
|
|
requests_kwargs=requests_kwargs, |
|
|
raise_for_status=raise_for_status, |
|
|
bs_get_text_kwargs=bs_get_text_kwargs, |
|
|
bs_kwargs=bs_kwargs, |
|
|
session=session, |
|
|
show_progress=show_progress, |
|
|
trust_env=trust_env, |
|
|
) |
|
|
self.markdown_kwargs = markdown_kwargs or { |
|
|
"heading_style": "ATX", |
|
|
"bullets": "*+-", |
|
|
"strip": ["a", "span"], |
|
|
"table_infer_header": True |
|
|
} |
|
|
self.unwanted_css = unwanted_css or [ |
|
|
".toc", ".navbox", ".sidebar", ".advertisement", ".cookie-banner", ".vertical-navbox", |
|
|
".hatnote", ".reflist", ".mw-references-wrap" |
|
|
] |
|
|
self.unwanted_headings = [h.lower() for h in (unwanted_headings or UNWANTED_SECTIONS)] |
|
|
self.render_wait = render_wait |
|
|
|
|
|
@staticmethod |
|
|
def _should_render(html: str, soup: Any) -> bool: |
|
|
low_text = len(soup.get_text(strip=True)) < 100 |
|
|
has_noscript = bool(soup.find("noscript")) |
|
|
cf_challenge = "just a moment" in html.lower() or "enable javascript" in html.lower() |
|
|
many_scripts = len(soup.find_all("script")) > 20 |
|
|
return has_noscript or cf_challenge or low_text or many_scripts |
|
|
|
|
|
async def _fetch_with_playwright(self, url: str) -> str: |
|
|
async with async_playwright() as pw: |
|
|
browser = await pw.chromium.launch(headless=True) |
|
|
page = await browser.new_page() |
|
|
|
|
|
|
|
|
await page.goto(url) |
|
|
await asyncio.sleep(self.render_wait) |
|
|
content = await page.content() |
|
|
await browser.close() |
|
|
return content |
|
|
|
|
|
def _scrape( |
|
|
self, |
|
|
url: str, |
|
|
parser: Union[str, None] = None, |
|
|
bs_kwargs: Optional[dict] = None, |
|
|
) -> Any: |
|
|
if parser is None: |
|
|
parser = "xml" if url.endswith(".xml") else self.default_parser |
|
|
self._check_parser(parser) |
|
|
|
|
|
resp = self.session.get(url, **self.requests_kwargs) |
|
|
if self.raise_for_status: |
|
|
resp.raise_for_status() |
|
|
if self.encoding is not None: |
|
|
resp.encoding = self.encoding |
|
|
elif self.autoset_encoding: |
|
|
resp.encoding = resp.apparent_encoding |
|
|
html = resp.text |
|
|
|
|
|
soup = BeautifulSoup(html, parser, **(bs_kwargs or {})) |
|
|
|
|
|
|
|
|
if not url.endswith(".xml") and self._should_render(html, soup): |
|
|
try: |
|
|
rendered = asyncio.run(self._fetch_with_playwright(url)) |
|
|
soup = BeautifulSoup(rendered, parser, **(bs_kwargs or {})) |
|
|
except Exception as e: |
|
|
logger.warning("Playwright rendering failed for %s: %s. Falling back to requests.", url, e) |
|
|
|
|
|
return soup |
|
|
|
|
|
@staticmethod |
|
|
def normalize_whitespace(text: str) -> str: |
|
|
""" |
|
|
Collapse runs of spaces, tabs, etc. down to single spaces—but skip |
|
|
inside fenced code blocks ```…``` or inline code `…`. |
|
|
""" |
|
|
|
|
|
text = text.replace("\u00A0", " ") |
|
|
|
|
|
text = re.sub(r"[\u200B\u200C\u200D\uFEFF]", "", text) |
|
|
|
|
|
|
|
|
parts = re.split(r'(```.*?```)', text, flags=re.S) |
|
|
for i, part in enumerate(parts): |
|
|
if not part.startswith("```"): |
|
|
|
|
|
subparts = re.split(r'(`[^`\n]+`)', part) |
|
|
for j, sp in enumerate(subparts): |
|
|
if not sp.startswith("`"): |
|
|
|
|
|
subparts[j] = re.sub(r'[ \t\r\f\v]+', ' ', sp).strip() |
|
|
parts[i] = "".join(subparts) |
|
|
|
|
|
normalized = "\n\n".join(p for p in parts if p.strip() != "") |
|
|
return normalized |
|
|
|
|
|
def _convert_soup_to_text(self, soup: Any) -> str: |
|
|
|
|
|
for tag in soup(["script", "style"]): |
|
|
tag.decompose() |
|
|
|
|
|
for sec in soup.find_all(["section", "div", "aside"]): |
|
|
h = sec.find(["h1", "h2", "h3", "h4", "h5", "h6"]) |
|
|
if h and any(h.get_text(strip=True).lower().startswith(u) for u in self.unwanted_headings): |
|
|
sec.decompose() |
|
|
|
|
|
for sel in self.unwanted_css: |
|
|
for el in soup.select(sel): |
|
|
el.decompose() |
|
|
|
|
|
soup = soup.find("div", class_="mw-parser-output") or soup.find("main") or soup.find("article") or soup |
|
|
|
|
|
|
|
|
markdown = md(str(soup), **self.markdown_kwargs) |
|
|
markdown = self.normalize_whitespace(markdown) |
|
|
return markdown |
|
|
|
|
|
def lazy_load(self) -> Iterator[Document]: |
|
|
"""Lazy load text from the url(s) in web_path.""" |
|
|
for path in self.web_paths: |
|
|
soup = self._scrape(path, bs_kwargs=self.bs_kwargs) |
|
|
text = self._convert_soup_to_text(soup) |
|
|
metadata = build_metadata(soup, path) |
|
|
yield Document(page_content=text, metadata=metadata) |
|
|
|
|
|
async def alazy_load(self) -> AsyncIterator[Document]: |
|
|
"""Async lazy load text from the url(s) in web_path.""" |
|
|
results = await self.ascrape_all(self.web_paths) |
|
|
for path, soup in zip(self.web_paths, results): |
|
|
text = self._convert_soup_to_text(soup) |
|
|
metadata = build_metadata(soup, path) |
|
|
yield Document(page_content=text, metadata=metadata) |
|
|
|
|
|
|
|
|
def fetch_wikipedia_page(page_key: str, lang: str = "en") -> Dict[str, str]: |
|
|
"""Fetches a Wikipedia page by its key and returns its content in Markdown format. |
|
|
|
|
|
Args: |
|
|
page_key (str): The unique key of the Wikipedia page. |
|
|
lang (str): The language code for the Wikipedia edition to fetch (default: "en"). |
|
|
""" |
|
|
page_key = page_key.replace(" ", "_") |
|
|
page_url = f"https://api.wikimedia.org/core/v1/wikipedia/{lang}/page/{page_key}/html" |
|
|
visit_website_tool = MarkdownWebBaseLoader(page_url) |
|
|
markdown = visit_website_tool.load()[0].page_content |
|
|
return { |
|
|
"page_key": page_key, |
|
|
"markdown": markdown, |
|
|
} |
|
|
|
|
|
|
|
|
def get_wikipedia_article(query: str, lang: str = "en") -> Dict[str, str]: |
|
|
"""Searches and fetches a Wikipedia article for a given query and returns its content in Markdown format. |
|
|
|
|
|
Args: |
|
|
query (str): The search query. |
|
|
lang (str): The language code for the Wikipedia edition to search (default: "en"). |
|
|
""" |
|
|
headers = { |
|
|
'User-Agent': 'MyLLMAgent (llm_agent@example.com)' |
|
|
} |
|
|
|
|
|
search_url = f"https://api.wikimedia.org/core/v1/wikipedia/en/search/page" |
|
|
search_params = {'q': query, 'limit': 1} |
|
|
search_response = requests.get(search_url, headers=headers, params=search_params, timeout=15) |
|
|
|
|
|
if search_response.status_code != 200: |
|
|
raise Exception(f"Search error: {search_response.status_code} - {search_response.text}") |
|
|
|
|
|
results = search_response.json().get("pages", []) |
|
|
if not results: |
|
|
raise Exception(f"No results found for query: {query}") |
|
|
|
|
|
page = results[0] |
|
|
page_key = page["key"] |
|
|
|
|
|
return fetch_wikipedia_page(page_key, lang) |
|
|
|
|
|
|
|
|
def parse_sections(markdown_text: str) -> Dict[str, Dict]: |
|
|
""" |
|
|
Parses markdown into a nested dict: |
|
|
{ section_title: { |
|
|
"full": full_section_md, |
|
|
"subsections": { sub_title: sub_md, ... } |
|
|
}, ... } |
|
|
""" |
|
|
|
|
|
top_pat = re.compile(r"^##\s+(.*)$", re.MULTILINE) |
|
|
top_matches = list(top_pat.finditer(markdown_text)) |
|
|
sections: Dict[str, Dict] = {} |
|
|
for i, m in enumerate(top_matches): |
|
|
sec_title = m.group(1).strip() |
|
|
start = m.start() |
|
|
end = top_matches[i+1].start() if i+1 < len(top_matches) else len(markdown_text) |
|
|
sec_md = markdown_text[start:end].strip() |
|
|
|
|
|
|
|
|
sub_pat = re.compile(r"^###\s+(.*)$", re.MULTILINE) |
|
|
subs: Dict[str, str] = {} |
|
|
sub_matches = list(sub_pat.finditer(sec_md)) |
|
|
for j, sm in enumerate(sub_matches): |
|
|
sub_title = sm.group(1).strip() |
|
|
sub_start = sm.start() |
|
|
sub_end = sub_matches[j+1].start() if j+1 < len(sub_matches) else len(sec_md) |
|
|
subs[sub_title] = sec_md[sub_start:sub_end].strip() |
|
|
|
|
|
sections[sec_title] = {"full": sec_md, "subsections": subs} |
|
|
return sections |
|
|
|