gpt / gpt4free /g4f /tools /web_search.py
LoRDxdd's picture
Add gpt4free API for Hugging Face
a4b70d9
from __future__ import annotations
from aiohttp import ClientSession, ClientTimeout, ClientError
import json
import hashlib
from pathlib import Path
from urllib.parse import urlparse, quote_plus
from datetime import date
import asyncio
# Optional dependencies using the new 'ddgs' package name
try:
from ddgs import DDGS
from ddgs.exceptions import DDGSException
from bs4 import BeautifulSoup
has_requirements = True
except ImportError:
has_requirements = False
try:
import spacy
has_spacy = True
except ImportError:
has_spacy = False
from typing import Iterator, List, Optional
from ..cookies import get_cookies_dir
from ..providers.response import format_link, JsonMixin, Sources
from ..errors import MissingRequirementsError
from .. import debug
DEFAULT_INSTRUCTIONS = """
Using the provided web search results, to write a comprehensive reply to the user request.
Make sure to add the sources of cites using [[Number]](Url) notation after the reference. Example: [[0]](http://google.com)
"""
class SearchResults(JsonMixin):
"""
Represents a collection of search result entries along with the count of used words.
"""
def __init__(self, results: List[SearchResultEntry], used_words: int):
self.results = results
self.used_words = used_words
@classmethod
def from_dict(cls, data: dict) -> SearchResults:
return cls(
[SearchResultEntry(**item) for item in data["results"]],
data["used_words"]
)
def __iter__(self) -> Iterator[SearchResultEntry]:
yield from self.results
def __str__(self) -> str:
# Build a string representation of the search results with markdown formatting.
output = []
for idx, result in enumerate(self.results):
parts = [
f"Title: {result.title}",
"",
result.text if result.text else result.snippet,
"",
f"Source: [[{idx}]]({result.url})"
]
output.append("\n".join(parts))
return "\n\n\n".join(output)
def __len__(self) -> int:
return len(self.results)
def get_sources(self) -> Sources:
return Sources([{"url": result.url, "title": result.title} for result in self.results])
def get_dict(self) -> dict:
return {
"results": [result.get_dict() for result in self.results],
"used_words": self.used_words
}
class SearchResultEntry(JsonMixin):
"""
Represents a single search result entry.
"""
def __init__(self, title: str, url: str, snippet: str, text: Optional[str] = None):
self.title = title
self.url = url
self.snippet = snippet
self.text = text
def set_text(self, text: str) -> None:
self.text = text
def scrape_text(html: str, max_words: Optional[int] = None, add_source: bool = True, count_images: int = 2) -> Iterator[str]:
"""
Parses the provided HTML and yields text fragments.
"""
soup = BeautifulSoup(html, "html.parser")
for selector in [
"main", ".main-content-wrapper", ".main-content", ".emt-container-inner",
".content-wrapper", "#content", "#mainContent",
]:
selected = soup.select_one(selector)
if selected:
soup = selected
break
for remove_selector in [".c-globalDisclosure"]:
unwanted = soup.select_one(remove_selector)
if unwanted:
unwanted.extract()
image_selector = "img[alt][src^=http]:not([alt='']):not(.avatar):not([width])"
image_link_selector = f"a:has({image_selector})"
seen_texts = []
for element in soup.select(f"h1, h2, h3, h4, h5, h6, p, pre, table:not(:has(p)), ul:not(:has(p)), {image_link_selector}"):
if count_images > 0:
image = element.select_one(image_selector)
if image:
title = str(element.get("title", element.text))
if title:
yield f"!{format_link(image['src'], title)}\n"
if max_words is not None:
max_words -= 10
count_images -= 1
continue
for line in element.get_text(" ").splitlines():
words = [word for word in line.split() if word]
if not words:
continue
joined_line = " ".join(words)
if joined_line in seen_texts:
continue
if max_words is not None:
max_words -= len(words)
if max_words <= 0:
break
yield joined_line + "\n"
seen_texts.append(joined_line)
if add_source:
canonical_link = soup.find("link", rel="canonical")
if canonical_link and "href" in canonical_link.attrs:
link = canonical_link["href"]
domain = urlparse(link).netloc
yield f"\nSource: [{domain}]({link})"
async def fetch_and_scrape(session: ClientSession, url: str, max_words: Optional[int] = None, add_source: bool = False) -> str:
"""
Fetches a URL and returns the scraped text, using caching to avoid redundant downloads.
"""
try:
cache_dir: Path = Path(get_cookies_dir()) / ".scrape_cache" / "fetch_and_scrape"
cache_dir.mkdir(parents=True, exist_ok=True)
md5_hash = hashlib.md5(url.encode(errors="ignore")).hexdigest()
cache_file = cache_dir / f"{quote_plus(url.split('?')[0].split('//')[1].replace('/', ' ')[:48])}.{date.today()}.{md5_hash[:16]}.cache"
if cache_file.exists():
return cache_file.read_text()
async with session.get(url) as response:
if response.status == 200:
html = await response.text(errors="replace")
scraped_text = "".join(scrape_text(html, max_words, add_source))
with open(cache_file, "wb") as f:
f.write(scraped_text.encode(errors="replace"))
return scraped_text
except (ClientError, asyncio.TimeoutError):
return ""
return ""
async def search(
query: str,
max_results: int = 5,
max_words: int = 2500,
backend: str = "auto",
add_text: bool = True,
timeout: int = 5,
region: str = "us-en",
provider: str = "DDG"
) -> SearchResults:
"""
Performs a web search and returns search results.
"""
if provider == "SearXNG":
from ..Provider.SearXNG import SearXNG
debug.log(f"[SearXNG] Using local container for query: {query}")
results_texts = []
async for chunk in SearXNG.create_async_generator(
"SearXNG",
[{"role": "user", "content": query}],
max_results=max_results,
max_words=max_words,
add_text=add_text
):
if isinstance(chunk, str):
results_texts.append(chunk)
used_words = sum(text.count(" ") for text in results_texts)
return SearchResults([
SearchResultEntry(
title=f"Result {i + 1}",
url="",
snippet=text,
text=text
) for i, text in enumerate(results_texts)
], used_words=used_words)
debug.log(f"[DuckDuckGo] Using local container for query: {query}")
if not has_requirements:
raise MissingRequirementsError('Install "ddgs" and "beautifulsoup4" | pip install -U g4f[search]')
results: List[SearchResultEntry] = []
# Use the new DDGS() context manager style
with DDGS() as ddgs:
for result in ddgs.text(
query,
region=region,
safesearch="moderate",
timelimit="y",
max_results=max_results,
backend=backend,
):
if ".google." in result["href"]:
continue
results.append(SearchResultEntry(
title=result["title"],
url=result["href"],
snippet=result["body"]
))
if add_text:
tasks = []
async with ClientSession(timeout=ClientTimeout(timeout)) as session:
for entry in results:
tasks.append(fetch_and_scrape(session, entry.url, int(max_words / (max_results - 1)), False))
texts = await asyncio.gather(*tasks)
formatted_results: List[SearchResultEntry] = []
used_words = 0
left_words = max_words
for i, entry in enumerate(results):
if add_text:
entry.text = texts[i]
left_words -= entry.title.count(" ") + 5
if entry.text:
left_words -= entry.text.count(" ")
else:
left_words -= entry.snippet.count(" ")
if left_words < 0:
break
used_words = max_words - left_words
formatted_results.append(entry)
return SearchResults(formatted_results, used_words)
async def do_search(
prompt: str,
query: Optional[str] = None,
instructions: str = DEFAULT_INSTRUCTIONS,
**kwargs
) -> tuple[str, Optional[Sources]]:
"""
Combines search results with the user prompt, using caching for improved efficiency.
"""
if not isinstance(prompt, str):
return prompt, None
if instructions and instructions in prompt:
return prompt, None
if prompt.startswith("##") and query is None:
return prompt, None
if query is None:
query = prompt.strip().splitlines()[0]
json_bytes = json.dumps({"query": query, **kwargs}, sort_keys=True).encode(errors="ignore")
md5_hash = hashlib.md5(json_bytes).hexdigest()
cache_dir: Path = Path(get_cookies_dir()) / ".scrape_cache" / "web_search" / f"{date.today()}"
cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = cache_dir / f"{quote_plus(query[:20])}.{md5_hash}.cache"
search_results: Optional[SearchResults] = None
if cache_file.exists():
with cache_file.open("r") as f:
try:
search_results = SearchResults.from_dict(json.loads(f.read()))
except json.JSONDecodeError:
search_results = None
if search_results is None:
search_results = await search(query, **kwargs)
if search_results.results:
with cache_file.open("w") as f:
f.write(json.dumps(search_results.get_dict()))
if instructions:
new_prompt = f"{search_results}\n\nInstruction: {instructions}\n\nUser request:\n{prompt}"
else:
new_prompt = f"{search_results}\n\n{prompt}"
debug.log(f"Web search: '{query.strip()[:50]}...'")
debug.log(f"with {len(search_results.results)} Results {search_results.used_words} Words")
return new_prompt.strip(), search_results.get_sources()
def get_search_message(prompt: str, raise_search_exceptions: bool = False, **kwargs) -> str:
"""
Synchronously obtains the search message by wrapping the async search call.
"""
try:
result, _ = asyncio.run(do_search(prompt, **kwargs))
return result
# Use the new DDGSError exception
except (DDGSException, MissingRequirementsError) as e:
if raise_search_exceptions:
raise e
debug.error(f"Couldn't do web search:", e)
return prompt