|
|
from __future__ import annotations |
|
|
|
|
|
from aiohttp import ClientSession |
|
|
import json |
|
|
import time |
|
|
import hashlib |
|
|
|
|
|
from ..typing import AsyncResult, Messages, MediaListType |
|
|
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin |
|
|
from .helper import format_prompt |
|
|
from ..tools.media import merge_media |
|
|
from ..image import to_data_uri |
|
|
from ..providers.response import FinishReason |
|
|
|
|
|
|
|
|
class Startnest(AsyncGeneratorProvider, ProviderModelMixin): |
|
|
label = "Startnest" |
|
|
url = "https://play.google.com/store/apps/details?id=starnest.aitype.aikeyboard.chatbot.chatgpt" |
|
|
api_endpoint = "https://api.startnest.uk/api/completions/stream" |
|
|
|
|
|
working = False |
|
|
needs_auth = False |
|
|
supports_stream = True |
|
|
supports_system_message = True |
|
|
supports_message_history = True |
|
|
|
|
|
default_model = 'gpt-4o-mini' |
|
|
models = [default_model] |
|
|
vision_models = models |
|
|
|
|
|
@classmethod |
|
|
def generate_signature(cls, timestamp: int) -> str: |
|
|
""" |
|
|
Generate signature for authorization header |
|
|
You may need to adjust this based on the actual signature algorithm |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
kid = "36ccfe00-78fc-4cab-9c5b-5460b0c78513" |
|
|
algorithm = "sha256" |
|
|
validity = 90 |
|
|
user_id = "" |
|
|
|
|
|
|
|
|
|
|
|
signature_input = f"{kid}{timestamp}{validity}".encode() |
|
|
signature_value = hashlib.sha256(signature_input).hexdigest() |
|
|
|
|
|
return f"Signature kid={kid}&algorithm={algorithm}×tamp={timestamp}&validity={validity}&userId={user_id}&value={signature_value}" |
|
|
|
|
|
@classmethod |
|
|
async def create_async_generator( |
|
|
cls, |
|
|
model: str, |
|
|
messages: Messages, |
|
|
proxy: str = None, |
|
|
media: MediaListType = None, |
|
|
stream: bool = True, |
|
|
max_tokens: int = None, |
|
|
**kwargs |
|
|
) -> AsyncResult: |
|
|
model = cls.get_model(model) |
|
|
|
|
|
|
|
|
timestamp = int(time.time()) |
|
|
|
|
|
headers = { |
|
|
"Accept-Encoding": "gzip", |
|
|
"app_name": "AIKEYBOARD", |
|
|
"Authorization": cls.generate_signature(timestamp), |
|
|
"Connection": "Keep-Alive", |
|
|
"Content-Type": "application/json; charset=UTF-8", |
|
|
"Host": "api.startnest.uk", |
|
|
"User-Agent": "okhttp/4.9.0", |
|
|
} |
|
|
|
|
|
async with ClientSession() as session: |
|
|
|
|
|
media = list(merge_media(media, messages)) |
|
|
|
|
|
|
|
|
formatted_messages = [] |
|
|
for i, msg in enumerate(messages): |
|
|
if isinstance(msg, dict): |
|
|
role = msg.get("role", "user") |
|
|
content = msg.get("content", "") |
|
|
|
|
|
|
|
|
content_array = [] |
|
|
|
|
|
|
|
|
if media and role == "user" and i == len(messages) - 1: |
|
|
for image, _ in media: |
|
|
image_data_uri = to_data_uri(image) |
|
|
content_array.append({ |
|
|
"image_url": { |
|
|
"url": image_data_uri |
|
|
}, |
|
|
"type": "image_url" |
|
|
}) |
|
|
|
|
|
|
|
|
if content: |
|
|
content_array.append({ |
|
|
"text": content, |
|
|
"type": "text" |
|
|
}) |
|
|
|
|
|
formatted_messages.append({ |
|
|
"role": role, |
|
|
"content": content_array |
|
|
}) |
|
|
|
|
|
|
|
|
if len(messages) == 1 and not media: |
|
|
prompt_text = format_prompt(messages) |
|
|
formatted_messages = [{ |
|
|
"role": "user", |
|
|
"content": [{"text": prompt_text, "type": "text"}] |
|
|
}] |
|
|
|
|
|
data = { |
|
|
"isVip": True, |
|
|
"max_tokens": max_tokens, |
|
|
"messages": formatted_messages, |
|
|
"stream": stream |
|
|
} |
|
|
|
|
|
|
|
|
if media: |
|
|
data["advanceToolType"] = "upload_and_ask" |
|
|
|
|
|
async with session.post(cls.api_endpoint, json=data, headers=headers, proxy=proxy) as response: |
|
|
response.raise_for_status() |
|
|
|
|
|
if stream: |
|
|
|
|
|
async for line in response.content: |
|
|
if line: |
|
|
line = line.decode('utf-8').strip() |
|
|
if line.startswith("data: "): |
|
|
data_str = line[6:] |
|
|
if data_str == "[DONE]": |
|
|
break |
|
|
try: |
|
|
json_data = json.loads(data_str) |
|
|
if "choices" in json_data and len(json_data["choices"]) > 0: |
|
|
choice = json_data["choices"][0] |
|
|
|
|
|
|
|
|
delta = choice.get("delta", {}) |
|
|
content = delta.get("content", "") |
|
|
if content: |
|
|
yield content |
|
|
|
|
|
|
|
|
if "finish_reason" in choice and choice["finish_reason"] is not None: |
|
|
yield FinishReason(choice["finish_reason"]) |
|
|
break |
|
|
|
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
else: |
|
|
|
|
|
response_text = await response.text() |
|
|
try: |
|
|
json_data = json.loads(response_text) |
|
|
if "choices" in json_data and len(json_data["choices"]) > 0: |
|
|
choice = json_data["choices"][0] |
|
|
if "message" in choice and "content" in choice["message"]: |
|
|
content = choice["message"]["content"] |
|
|
if content: |
|
|
yield content.strip() |
|
|
|
|
|
|
|
|
if "finish_reason" in choice and choice["finish_reason"] is not None: |
|
|
yield FinishReason(choice["finish_reason"]) |
|
|
return |
|
|
|
|
|
except json.JSONDecodeError: |
|
|
|
|
|
lines = response_text.strip().split('\n') |
|
|
full_content = [] |
|
|
finish_reason_value = None |
|
|
|
|
|
for line in lines: |
|
|
if line.startswith("data: "): |
|
|
data_str = line[6:] |
|
|
if data_str == "[DONE]": |
|
|
break |
|
|
try: |
|
|
json_data = json.loads(data_str) |
|
|
if "choices" in json_data and len(json_data["choices"]) > 0: |
|
|
choice = json_data["choices"][0] |
|
|
delta = choice.get("delta", {}) |
|
|
content = delta.get("content", "") |
|
|
if content: |
|
|
full_content.append(content) |
|
|
|
|
|
|
|
|
if "finish_reason" in choice and choice["finish_reason"] is not None: |
|
|
finish_reason_value = choice["finish_reason"] |
|
|
|
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
|
|
|
if full_content: |
|
|
yield ''.join(full_content) |
|
|
|
|
|
if finish_reason_value: |
|
|
yield FinishReason(finish_reason_value) |
|
|
|