from __future__ import annotations import asyncio import json from ...typing import AsyncResult, Messages, Cookies from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin, AuthFileMixin, get_running_loop from ...requests import Browser, get_nodriver, has_nodriver from ...errors import MissingRequirementsError, ModelNotFoundError from ... import debug from ..helper import get_last_user_message class GoogleSearch(AsyncGeneratorProvider, AuthFileMixin): label = "Google Search" url = "https://google.com" working = has_nodriver use_nodriver = True @classmethod async def create_async_generator( cls, model: str, messages: Messages, browser: Browser = None, proxy: str = None, timeout: int = 300, **kwargs ) -> AsyncResult: if not has_nodriver: raise MissingRequirementsError("Google requires a browser to be installed.") if not cls.working: raise ModelNotFoundError(f"Model {model} not found.") try: stop_browser = None if browser is None: browser, stop_browser = await get_nodriver(proxy=proxy, timeout=timeout) tab = await browser.get(cls.url) await asyncio.sleep(3) while True: try: await tab.wait_for('[aria-modal="true"]', timeout=10) await tab.wait_for('[aria-modal="true"][style*="display: none"]', timeout=timeout) except Exception as e: break break element = await tab.wait_for('textarea') await element.send_keys(get_last_user_message(messages)) button = await tab.find("Google Suche") await button.click() await asyncio.sleep(1000) finally: if stop_browser is not None: stop_browser()