|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
from typing import Any, Dict, List, Optional, Type, Union |
|
|
|
|
|
from openai import AsyncOpenAI, AsyncStream, OpenAI, Stream |
|
|
from pydantic import BaseModel |
|
|
|
|
|
from camel.messages import OpenAIMessage |
|
|
from camel.models.base_model import BaseModelBackend |
|
|
from camel.types import ( |
|
|
ChatCompletion, |
|
|
ChatCompletionChunk, |
|
|
ModelType, |
|
|
) |
|
|
from camel.utils import ( |
|
|
BaseTokenCounter, |
|
|
OpenAITokenCounter, |
|
|
) |
|
|
|
|
|
|
|
|
class OpenAICompatibleModelV2(BaseModelBackend): |
|
|
r"""Constructor for model backend supporting OpenAI compatibility. |
|
|
|
|
|
Args: |
|
|
model_type (Union[ModelType, str]): Model for which a backend is |
|
|
created. |
|
|
model_config_dict (Optional[Dict[str, Any]], optional): A dictionary |
|
|
that will be fed into:obj:`openai.ChatCompletion.create()`. If |
|
|
:obj:`None`, :obj:`{}` will be used. (default: :obj:`None`) |
|
|
api_key (str): The API key for authenticating with the model service. |
|
|
url (str): The url to the model service. |
|
|
token_counter (Optional[BaseTokenCounter], optional): Token counter to |
|
|
use for the model. If not provided, :obj:`OpenAITokenCounter( |
|
|
ModelType.GPT_4O_MINI)` will be used. |
|
|
(default: :obj:`None`) |
|
|
timeout (Optional[float], optional): The timeout value in seconds for |
|
|
API calls. If not provided, will fall back to the MODEL_TIMEOUT |
|
|
environment variable or default to 180 seconds. |
|
|
(default: :obj:`None`) |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
model_type: Union[ModelType, str], |
|
|
model_config_dict: Optional[Dict[str, Any]] = None, |
|
|
api_key: Optional[str] = None, |
|
|
url: Optional[str] = None, |
|
|
token_counter: Optional[BaseTokenCounter] = None, |
|
|
timeout: Optional[float] = None, |
|
|
) -> None: |
|
|
api_key = api_key or os.environ.get("OPENAI_COMPATIBILITY_API_KEY") |
|
|
url = url or os.environ.get("OPENAI_COMPATIBILITY_API_BASE_URL") |
|
|
timeout = timeout or float(os.environ.get("MODEL_TIMEOUT", 180)) |
|
|
super().__init__( |
|
|
model_type, model_config_dict, api_key, url, token_counter |
|
|
) |
|
|
self._timeout = timeout |
|
|
self._client = OpenAI( |
|
|
timeout=self._timeout, |
|
|
max_retries=3, |
|
|
api_key=self._api_key, |
|
|
base_url=self._url, |
|
|
) |
|
|
|
|
|
self._async_client = AsyncOpenAI( |
|
|
timeout=self._timeout, |
|
|
max_retries=3, |
|
|
api_key=self._api_key, |
|
|
base_url=self._url, |
|
|
) |
|
|
|
|
|
def _run( |
|
|
self, |
|
|
messages: List[OpenAIMessage], |
|
|
response_format: Optional[Type[BaseModel]] = None, |
|
|
tools: Optional[List[Dict[str, Any]]] = None, |
|
|
) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: |
|
|
r"""Runs inference of OpenAI chat completion. |
|
|
|
|
|
Args: |
|
|
messages (List[OpenAIMessage]): Message list with the chat history |
|
|
in OpenAI API format. |
|
|
response_format (Optional[Type[BaseModel]]): The format of the |
|
|
response. |
|
|
tools (Optional[List[Dict[str, Any]]]): The schema of the tools to |
|
|
use for the request. |
|
|
|
|
|
Returns: |
|
|
Union[ChatCompletion, Stream[ChatCompletionChunk]]: |
|
|
`ChatCompletion` in the non-stream mode, or |
|
|
`Stream[ChatCompletionChunk]` in the stream mode. |
|
|
""" |
|
|
response_format = response_format or self.model_config_dict.get( |
|
|
"response_format", None |
|
|
) |
|
|
if response_format: |
|
|
return self._request_parse(messages, response_format, tools) |
|
|
else: |
|
|
return self._request_chat_completion(messages, tools) |
|
|
|
|
|
async def _arun( |
|
|
self, |
|
|
messages: List[OpenAIMessage], |
|
|
response_format: Optional[Type[BaseModel]] = None, |
|
|
tools: Optional[List[Dict[str, Any]]] = None, |
|
|
) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]: |
|
|
r"""Runs inference of OpenAI chat completion in async mode. |
|
|
|
|
|
Args: |
|
|
messages (List[OpenAIMessage]): Message list with the chat history |
|
|
in OpenAI API format. |
|
|
response_format (Optional[Type[BaseModel]]): The format of the |
|
|
response. |
|
|
tools (Optional[List[Dict[str, Any]]]): The schema of the tools to |
|
|
use for the request. |
|
|
|
|
|
Returns: |
|
|
Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]: |
|
|
`ChatCompletion` in the non-stream mode, or |
|
|
`AsyncStream[ChatCompletionChunk]` in the stream mode. |
|
|
""" |
|
|
response_format = response_format or self.model_config_dict.get( |
|
|
"response_format", None |
|
|
) |
|
|
if response_format: |
|
|
return await self._arequest_parse(messages, response_format, tools) |
|
|
else: |
|
|
return await self._arequest_chat_completion(messages, tools) |
|
|
|
|
|
def _request_chat_completion( |
|
|
self, |
|
|
messages: List[OpenAIMessage], |
|
|
tools: Optional[List[Dict[str, Any]]] = None, |
|
|
) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: |
|
|
request_config = self.model_config_dict.copy() |
|
|
|
|
|
if tools: |
|
|
request_config["tools"] = tools |
|
|
|
|
|
return self._client.chat.completions.create( |
|
|
messages=messages, |
|
|
model=self.model_type, |
|
|
**request_config, |
|
|
) |
|
|
|
|
|
async def _arequest_chat_completion( |
|
|
self, |
|
|
messages: List[OpenAIMessage], |
|
|
tools: Optional[List[Dict[str, Any]]] = None, |
|
|
) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]: |
|
|
request_config = self.model_config_dict.copy() |
|
|
|
|
|
if tools: |
|
|
request_config["tools"] = tools |
|
|
|
|
|
return await self._async_client.chat.completions.create( |
|
|
messages=messages, |
|
|
model=self.model_type, |
|
|
**request_config, |
|
|
) |
|
|
|
|
|
def _request_parse( |
|
|
self, |
|
|
messages: List[OpenAIMessage], |
|
|
response_format: Type[BaseModel], |
|
|
tools: Optional[List[Dict[str, Any]]] = None, |
|
|
) -> ChatCompletion: |
|
|
import copy |
|
|
|
|
|
request_config = copy.deepcopy(self.model_config_dict) |
|
|
|
|
|
|
|
|
request_config["response_format"] = response_format |
|
|
request_config.pop("stream", None) |
|
|
if tools is not None: |
|
|
request_config["tools"] = tools |
|
|
|
|
|
return self._client.beta.chat.completions.parse( |
|
|
messages=messages, |
|
|
model=self.model_type, |
|
|
**request_config, |
|
|
) |
|
|
|
|
|
async def _arequest_parse( |
|
|
self, |
|
|
messages: List[OpenAIMessage], |
|
|
response_format: Type[BaseModel], |
|
|
tools: Optional[List[Dict[str, Any]]] = None, |
|
|
) -> ChatCompletion: |
|
|
import copy |
|
|
|
|
|
request_config = copy.deepcopy(self.model_config_dict) |
|
|
|
|
|
|
|
|
request_config["response_format"] = response_format |
|
|
request_config.pop("stream", None) |
|
|
if tools is not None: |
|
|
request_config["tools"] = tools |
|
|
|
|
|
return await self._async_client.beta.chat.completions.parse( |
|
|
messages=messages, |
|
|
model=self.model_type, |
|
|
**request_config, |
|
|
) |
|
|
|
|
|
@property |
|
|
def token_counter(self) -> BaseTokenCounter: |
|
|
r"""Initialize the token counter for the model backend. |
|
|
|
|
|
Returns: |
|
|
OpenAITokenCounter: The token counter following the model's |
|
|
tokenization style. |
|
|
""" |
|
|
|
|
|
if not self._token_counter: |
|
|
self._token_counter = OpenAITokenCounter(ModelType.GPT_4O_MINI) |
|
|
return self._token_counter |
|
|
|
|
|
@property |
|
|
def stream(self) -> bool: |
|
|
r"""Returns whether the model is in stream mode, which sends partial |
|
|
results each time. |
|
|
|
|
|
Returns: |
|
|
bool: Whether the model is in stream mode. |
|
|
""" |
|
|
return self.model_config_dict.get('stream', False) |
|
|
|
|
|
def check_model_config(self): |
|
|
pass |
|
|
|