| import json | |
| import requests | |
| from typing import Dict, Any, Generator, Optional | |
| class DeepInfraHandler: | |
| API_URL = "https://api.deepinfra.com/v1/openai/chat/completions" | |
| def __init__(self): | |
| self.headers = { | |
| "Accept": "text/event-stream", | |
| "Accept-Encoding": "gzip, deflate, br, zstd", | |
| "Content-Type": "application/json", | |
| "Connection": "keep-alive", | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36", | |
| } | |
| def _prepare_payload(self, **kwargs) -> Dict[str, Any]: | |
| """Prepare the payload for the API request""" | |
| return { | |
| "model": kwargs.get("model"), | |
| "messages": kwargs.get("messages"), | |
| "temperature": kwargs.get("temperature", 0.7), | |
| "max_tokens": kwargs.get("max_tokens", 4096), | |
| "top_p": kwargs.get("top_p", 1.0), | |
| "frequency_penalty": kwargs.get("frequency_penalty", 0.0), | |
| "presence_penalty": kwargs.get("presence_penalty", 0.0), | |
| "stop": kwargs.get("stop", []), | |
| "stream": kwargs.get("stream", False) | |
| } | |
| def generate_completion(self, **kwargs) -> Any: | |
| """Generate completion based on streaming preference""" | |
| payload = self._prepare_payload(**kwargs) | |
| response = requests.post( | |
| self.API_URL, | |
| headers=self.headers, | |
| json=payload, | |
| stream=payload["stream"] | |
| ) | |
| if payload["stream"]: | |
| return self._handle_streaming_response(response) | |
| return self._handle_regular_response(response) | |
| def _handle_streaming_response(self, response) -> Generator[str, None, None]: | |
| """Handle streaming response from the API""" | |
| for line in response.iter_lines(decode_unicode=True): | |
| if line.startswith("data:"): | |
| try: | |
| content = json.loads(line[5:]) | |
| if content == "[DONE]": | |
| continue | |
| delta_content = content.get("choices", [{}])[0].get("delta", {}).get("content") | |
| if delta_content: | |
| yield delta_content | |
| except: | |
| continue | |
| def _handle_regular_response(self, response) -> Dict[str, Any]: | |
| """Handle regular (non-streaming) response from the API""" | |
| try: | |
| return response.json() | |
| except Exception as e: | |
| raise Exception(f"Error processing response: {str(e)}") |