Spaces:
Runtime error
Runtime error
| """ | |
| BingChat API Interaction Module | |
| This module provides functionality to interact with the Bing Chat Copilot, | |
| allowing for dynamic conversations and response streaming. | |
| Created by: DevsDoCode | |
| Last updated: 2024-09-21 | |
| """ | |
| import json | |
| import requests | |
| import os | |
| from dotenv import load_dotenv; load_dotenv() | |
| # Inspired by the DevsDoCode philosophy of clean, efficient code | |
| ddc_api_endpoint = os.environ.get("api_url") | |
| def ddc_bing_converse(conv_history=[], convo_tone="", md_format=False, realtime=True): | |
| """ | |
| Initiates a conversation with the Bing Chat API and retrieves the response. | |
| Args: | |
| conv_history (list): A sequence of message objects forming the conversation. | |
| convo_tone (str): The desired conversational style or tone. | |
| md_format (bool): Flag to enable Markdown formatting in the response. | |
| realtime (bool): Indicator for streaming the API response in real-time. | |
| Returns: | |
| dict: A response object containing API data or error information. | |
| Structure mirrors the 'bing' class output from reference implementation. | |
| Raises: | |
| requests.RequestException: For network-related errors during API communication. | |
| json.JSONDecodeError: If the API response cannot be parsed as JSON. | |
| """ | |
| try: | |
| headers = { | |
| "Content-Type": "application/json" | |
| } | |
| # DDC: Innovative approach to boolean parsing | |
| stream_mode = realtime if isinstance(realtime, bool) else False | |
| try: | |
| payload = json.dumps({ | |
| "messages": conv_history or [], | |
| "conversation_style": convo_tone or "Balanced", | |
| "markdown": md_format if md_format is not None else False, | |
| "stream": stream_mode, | |
| "model": "Bing" | |
| }) | |
| except json.JSONDecodeError: | |
| payload = json.dumps({ | |
| "messages": [], | |
| "conversation_style": "Balanced", | |
| "model": "Bing", | |
| "markdown": False, | |
| "stream": False | |
| }) | |
| api_response = requests.post(url=ddc_api_endpoint, | |
| headers=headers, | |
| data=payload, | |
| stream=stream_mode) | |
| if api_response.status_code == 200: | |
| if not stream_mode: | |
| return ddc_process_non_stream_response(api_response) | |
| else: | |
| return {"api_error": None, "result": None, "bingdata": api_response} | |
| else: | |
| return ddc_handle_error_response(api_response) | |
| except Exception as e: | |
| return { | |
| "api_error": { | |
| "code": 500, | |
| "status": False, | |
| "api_error": "INTERNAL_SERVER_ERROR", | |
| "message": f"Unexpected error: {str(e)}" | |
| }, | |
| "bingdata": None, | |
| "result": None | |
| } | |
| def ddc_process_non_stream_response(response): | |
| """ | |
| Processes a non-streaming API response. | |
| Args: | |
| response (requests.Response): The API response object. | |
| Returns: | |
| dict: Processed response data or error information. | |
| """ | |
| try: | |
| json_start = response.text.index("{") | |
| json_data = json.loads(response.text[json_start:]) | |
| if json_data.get("code") == 200 and json_data.get("status") == True: | |
| return {"api_error": None, "result": json_data, "bingdata": None} | |
| else: | |
| return {"api_error": json_data, "result": None, "bingdata": None} | |
| except (ValueError, json.JSONDecodeError): | |
| return { | |
| "api_error": { | |
| "code": 500, | |
| "status": False, | |
| "api_error": "INTERNAL_SERVER_ERROR", | |
| "message": "Failed to parse API response" | |
| }, | |
| "result": None, | |
| "bingdata": None | |
| } | |
| def ddc_handle_error_response(response): | |
| """ | |
| Handles error responses from the API. | |
| Args: | |
| response (requests.Response): The error response from the API. | |
| Returns: | |
| dict: Formatted error information. | |
| """ | |
| try: | |
| error_data = response.json() | |
| except json.JSONDecodeError: | |
| error_data = { | |
| "code": 500, | |
| "status": False, | |
| "api_error": "INTERNAL_SERVER_ERROR", | |
| "message": "Unable to parse error response" | |
| } | |
| return {"api_error": error_data, "result": None, "bingdata": None} | |
| if __name__ == "__main__": | |
| # DevsDoCode: Example usage demonstration | |
| sample_query = [{"role": "user", "content": "Describe India in 10 lines"}] | |
| api_result = ddc_bing_converse(conv_history=sample_query, realtime=False, md_format=True) | |
| if api_result["api_error"]: | |
| print("API Error Encountered:", api_result["api_error"]) | |
| else: | |
| if api_result["bingdata"]: | |
| for data_chunk in api_result["bingdata"].iter_content(chunk_size=1024): | |
| print(data_chunk.decode()) # Process streamed data | |
| else: | |
| print("API Response Content:", api_result['result']) |