Spaces:
Running
Running
| # | |
| # SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org> | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| import aiohttp | |
| import asyncio | |
| from urllib.parse import quote | |
| from config import ( | |
| REMINDERS, | |
| TCP_CONNECTOR_ENABLE_DNS_CACHE, | |
| TCP_CONNECTOR_TTL_DNS_CACHE, | |
| TCP_CONNECTOR_LIMIT, | |
| TCP_CONNECTOR_LIMIT_PER_HOST, | |
| TCP_CONNECTOR_FORCE_CLOSE, | |
| TCP_CONNECTOR_ENABLE_CLEANUP, | |
| ENABLE_TRUST_ENV, | |
| ENABLE_CONNECTOR_OWNER, | |
| USER_AGENT | |
| ) | |
| class BrowserEngine: | |
| def __init__(self, configuration): | |
| self.config = configuration | |
| def generate_headers(self): | |
| return { | |
| "User-Agent": USER_AGENT | |
| } | |
| def web_selector(self, search_query: str, search_provider: str = "google"): | |
| provider_prefix = "!go" | |
| return ( | |
| f"{self.config.content_reader_api}{self.config.searxng_endpoint}?q={quote(f'{provider_prefix} {search_query}')}", | |
| "#urls" | |
| ) | |
| async def web_request(self, method: str, url: str, headers: dict, data: dict = None): | |
| async with aiohttp.ClientSession( | |
| timeout=aiohttp.ClientTimeout(total=self.config.request_timeout), | |
| connector=aiohttp.TCPConnector( | |
| use_dns_cache=TCP_CONNECTOR_ENABLE_DNS_CACHE, | |
| ttl_dns_cache=TCP_CONNECTOR_TTL_DNS_CACHE, | |
| limit=TCP_CONNECTOR_LIMIT, | |
| limit_per_host=TCP_CONNECTOR_LIMIT_PER_HOST, | |
| force_close=TCP_CONNECTOR_FORCE_CLOSE, | |
| enable_cleanup_closed=TCP_CONNECTOR_ENABLE_CLEANUP | |
| ), | |
| trust_env=ENABLE_TRUST_ENV, | |
| connector_owner=ENABLE_CONNECTOR_OWNER | |
| ) as session: | |
| async with session.request(method, url, headers=headers, data=data) as response: | |
| text = await response.text() | |
| response.raise_for_status() | |
| return text | |
| async def _post(self, url: str, data: dict, headers: dict): | |
| return await self.web_request("POST", url, headers, data) | |
| async def _get(self, url: str, headers: dict): | |
| return await self.web_request("GET", url, headers) | |
| def extract_page_content(self, target_url: str) -> str: | |
| headers = self.generate_headers() | |
| payload = {"url": target_url} | |
| try: | |
| extracted_content = asyncio.run(self._post(self.config.content_reader_api, payload, headers)) | |
| return f"{extracted_content}\n\n\n{REMINDERS}\n\n\n" | |
| except Exception as error: | |
| return f"Error reading URL: {str(error)}" | |
| def perform_search(self, search_query: str, search_provider: str = "google") -> str: | |
| headers = self.generate_headers() | |
| full_url, selector = self.web_selector(search_query, search_provider) | |
| headers["X-Target-Selector"] = selector | |
| try: | |
| search_results = asyncio.run(self._get(full_url, headers)) | |
| return f"{search_results}\n\n\n{REMINDERS}\n\n\n" | |
| except Exception as error: | |
| return f"Error during search: {str(error)}" |