| from __future__ import annotations | |
| from typing import Union | |
| from aiohttp import ClientResponse | |
| from requests import Response as RequestsResponse | |
| from ..errors import ResponseStatusError, RateLimitError, MissingAuthError, CloudflareError | |
| from . import Response, StreamResponse | |
| def is_cloudflare(text: str) -> bool: | |
| if "Generated by cloudfront" in text or '<p id="cf-spinner-please-wait">' in text: | |
| return True | |
| elif "<title>Attention Required! | Cloudflare</title>" in text or 'id="cf-cloudflare-status"' in text: | |
| return True | |
| return '<div id="cf-please-wait">' in text or "<title>Just a moment...</title>" in text | |
| def is_openai(text: str) -> bool: | |
| return "<p>Unable to load site</p>" in text or 'id="challenge-error-text"' in text | |
| async def raise_for_status_async(response: Union[StreamResponse, ClientResponse], message: str = None): | |
| if response.ok: | |
| return | |
| is_html = False | |
| if message is None: | |
| content_type = response.headers.get("content-type", "") | |
| if content_type.startswith("application/json"): | |
| message = await response.json() | |
| error = message.get("error") | |
| if isinstance(error, dict): | |
| message = error.get("message") | |
| else: | |
| message = message.get("message", message) | |
| if isinstance(error, str): | |
| message = f"{error}: {message}" | |
| else: | |
| message = (await response.text()).strip() | |
| is_html = content_type.startswith("text/html") or message.startswith("<!DOCTYPE") | |
| if message is None or is_html: | |
| if response.status == 520: | |
| message = "Unknown error (Cloudflare)" | |
| if response.status in (429, 402): | |
| raise RateLimitError(f"Response {response.status}: {message}") | |
| if response.status == 401: | |
| raise MissingAuthError(f"Response {response.status}: {message}") | |
| if response.status == 403 and is_cloudflare(message): | |
| raise CloudflareError(f"Response {response.status}: Cloudflare detected") | |
| elif response.status == 403 and is_openai(message): | |
| raise MissingAuthError(f"Response {response.status}: OpenAI Bot detected") | |
| elif response.status == 502: | |
| raise ResponseStatusError(f"Response {response.status}: Bad Gateway") | |
| elif response.status == 504: | |
| raise RateLimitError(f"Response {response.status}: Gateway Timeout ") | |
| else: | |
| raise ResponseStatusError(f"Response {response.status}: {'HTML content' if is_html else message}") | |
| def raise_for_status(response: Union[Response, StreamResponse, ClientResponse, RequestsResponse], message: str = None): | |
| if hasattr(response, "status"): | |
| return raise_for_status_async(response, message) | |
| if response.ok: | |
| return | |
| is_html = False | |
| if message is None: | |
| is_html = response.headers.get("content-type", "").startswith("text/html") or response.text.startswith("<!DOCTYPE") | |
| message = response.text | |
| if message is None or is_html: | |
| if response.status_code == 520: | |
| message = "Unknown error (Cloudflare)" | |
| if response.status_code in (429, 402): | |
| raise RateLimitError(f"Response {response.status_code}: {message}") | |
| if response.status_code == 401: | |
| raise MissingAuthError(f"Response {response.status_code}: {message}") | |
| if response.status_code == 403 and is_cloudflare(response.text): | |
| raise CloudflareError(f"Response {response.status_code}: Cloudflare detected") | |
| elif response.status_code == 403 and is_openai(response.text): | |
| raise MissingAuthError(f"Response {response.status_code}: OpenAI Bot detected") | |
| elif response.status_code == 502: | |
| raise ResponseStatusError(f"Response {response.status_code}: Bad Gateway") | |
| elif response.status_code == 504: | |
| raise RateLimitError(f"Response {response.status_code}: Gateway Timeout ") | |
| else: | |
| raise ResponseStatusError(f"Response {response.status_code}: {'HTML content' if is_html else message}") |