File size: 3,024 Bytes
408c946
 
 
 
 
b3cf31b
02ce7c3
b3cf31b
02ce7c3
4359d28
02ce7c3
 
 
 
 
 
 
bc87248
 
02ce7c3
408c946
 
 
 
b3cf31b
408c946
 
bc87248
408c946
 
02ce7c3
4359d28
b3cf31b
 
 
 
 
02ce7c3
 
4359d28
 
 
 
 
 
 
 
 
02ce7c3
 
 
 
b3cf31b
02ce7c3
b3cf31b
 
02ce7c3
 
b3cf31b
02ce7c3
 
b3cf31b
02ce7c3
b3cf31b
 
408c946
02ce7c3
4359d28
02ce7c3
 
b3cf31b
02ce7c3
b3cf31b
02ce7c3
b3cf31b
408c946
02ce7c3
4359d28
02ce7c3
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#

import aiohttp
import asyncio
from urllib.parse import quote
from config import (
    REMINDERS,
    TCP_CONNECTOR_ENABLE_DNS_CACHE,
    TCP_CONNECTOR_TTL_DNS_CACHE,
    TCP_CONNECTOR_LIMIT,
    TCP_CONNECTOR_LIMIT_PER_HOST,
    TCP_CONNECTOR_FORCE_CLOSE,
    TCP_CONNECTOR_ENABLE_CLEANUP,
    ENABLE_TRUST_ENV,
    ENABLE_CONNECTOR_OWNER,
    USER_AGENT
)

class BrowserEngine:
    def __init__(self, configuration):
        self.config = configuration

    def generate_headers(self):
        return {
            "User-Agent": USER_AGENT
        }

    def web_selector(self, search_query: str, search_provider: str = "google"):
        provider_prefix = "!go"
        return (
            f"{self.config.content_reader_api}{self.config.searxng_endpoint}?q={quote(f'{provider_prefix} {search_query}')}",
            "#urls"
        )

    async def web_request(self, method: str, url: str, headers: dict, data: dict = None):
        async with aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=self.config.request_timeout),
            connector=aiohttp.TCPConnector(
                use_dns_cache=TCP_CONNECTOR_ENABLE_DNS_CACHE,
                ttl_dns_cache=TCP_CONNECTOR_TTL_DNS_CACHE,
                limit=TCP_CONNECTOR_LIMIT,
                limit_per_host=TCP_CONNECTOR_LIMIT_PER_HOST,
                force_close=TCP_CONNECTOR_FORCE_CLOSE,
                enable_cleanup_closed=TCP_CONNECTOR_ENABLE_CLEANUP
            ),
            trust_env=ENABLE_TRUST_ENV,
            connector_owner=ENABLE_CONNECTOR_OWNER
        ) as session:
            async with session.request(method, url, headers=headers, data=data) as response:
                text = await response.text()
                response.raise_for_status()
                return text

    async def _post(self, url: str, data: dict, headers: dict):
        return await self.web_request("POST", url, headers, data)

    async def _get(self, url: str, headers: dict):
        return await self.web_request("GET", url, headers)

    def extract_page_content(self, target_url: str) -> str:
        headers = self.generate_headers()
        payload = {"url": target_url}
        try:
            extracted_content = asyncio.run(self._post(self.config.content_reader_api, payload, headers))
            return f"{extracted_content}\n\n\n{REMINDERS}\n\n\n"
        except Exception as error:
            return f"Error reading URL: {str(error)}"

    def perform_search(self, search_query: str, search_provider: str = "google") -> str:
        headers = self.generate_headers()
        full_url, selector = self.web_selector(search_query, search_provider)
        headers["X-Target-Selector"] = selector
        try:
            search_results = asyncio.run(self._get(full_url, headers))
            return f"{search_results}\n\n\n{REMINDERS}\n\n\n"
        except Exception as error:
            return f"Error during search: {str(error)}"