moneychatbot / src /engine /browser_engine.py
hadadrjt's picture
SearchGPT: Production.
bc87248
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#
import aiohttp
import asyncio
from urllib.parse import quote
from config import (
REMINDERS,
TCP_CONNECTOR_ENABLE_DNS_CACHE,
TCP_CONNECTOR_TTL_DNS_CACHE,
TCP_CONNECTOR_LIMIT,
TCP_CONNECTOR_LIMIT_PER_HOST,
TCP_CONNECTOR_FORCE_CLOSE,
TCP_CONNECTOR_ENABLE_CLEANUP,
ENABLE_TRUST_ENV,
ENABLE_CONNECTOR_OWNER,
USER_AGENT
)
class BrowserEngine:
def __init__(self, configuration):
self.config = configuration
def generate_headers(self):
return {
"User-Agent": USER_AGENT
}
def web_selector(self, search_query: str, search_provider: str = "google"):
provider_prefix = "!go"
return (
f"{self.config.content_reader_api}{self.config.searxng_endpoint}?q={quote(f'{provider_prefix} {search_query}')}",
"#urls"
)
async def web_request(self, method: str, url: str, headers: dict, data: dict = None):
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.config.request_timeout),
connector=aiohttp.TCPConnector(
use_dns_cache=TCP_CONNECTOR_ENABLE_DNS_CACHE,
ttl_dns_cache=TCP_CONNECTOR_TTL_DNS_CACHE,
limit=TCP_CONNECTOR_LIMIT,
limit_per_host=TCP_CONNECTOR_LIMIT_PER_HOST,
force_close=TCP_CONNECTOR_FORCE_CLOSE,
enable_cleanup_closed=TCP_CONNECTOR_ENABLE_CLEANUP
),
trust_env=ENABLE_TRUST_ENV,
connector_owner=ENABLE_CONNECTOR_OWNER
) as session:
async with session.request(method, url, headers=headers, data=data) as response:
text = await response.text()
response.raise_for_status()
return text
async def _post(self, url: str, data: dict, headers: dict):
return await self.web_request("POST", url, headers, data)
async def _get(self, url: str, headers: dict):
return await self.web_request("GET", url, headers)
def extract_page_content(self, target_url: str) -> str:
headers = self.generate_headers()
payload = {"url": target_url}
try:
extracted_content = asyncio.run(self._post(self.config.content_reader_api, payload, headers))
return f"{extracted_content}\n\n\n{REMINDERS}\n\n\n"
except Exception as error:
return f"Error reading URL: {str(error)}"
def perform_search(self, search_query: str, search_provider: str = "google") -> str:
headers = self.generate_headers()
full_url, selector = self.web_selector(search_query, search_provider)
headers["X-Target-Selector"] = selector
try:
search_results = asyncio.run(self._get(full_url, headers))
return f"{search_results}\n\n\n{REMINDERS}\n\n\n"
except Exception as error:
return f"Error during search: {str(error)}"