File size: 9,526 Bytes
a4b70d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
from __future__ import annotations

from aiohttp import ClientSession
import json
import time
import hashlib

from ..typing import AsyncResult, Messages, MediaListType
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
from .helper import format_prompt
from ..tools.media import merge_media
from ..image import to_data_uri
from ..providers.response import FinishReason


class Startnest(AsyncGeneratorProvider, ProviderModelMixin):
    label = "Startnest"
    url = "https://play.google.com/store/apps/details?id=starnest.aitype.aikeyboard.chatbot.chatgpt"
    api_endpoint = "https://api.startnest.uk/api/completions/stream"
    
    working = False
    needs_auth = False
    supports_stream = True
    supports_system_message = True
    supports_message_history = True
    
    default_model = 'gpt-4o-mini'
    models = [default_model]
    vision_models = models

    @classmethod
    def generate_signature(cls, timestamp: int) -> str:
        """
        Generate signature for authorization header
        You may need to adjust this based on the actual signature algorithm
        """
        # This is a placeholder - the actual signature generation might involve:
        # - A secret key
        # - Specific string formatting
        # - Different hash input
        
        # Example implementation (adjust as needed):
        kid = "36ccfe00-78fc-4cab-9c5b-5460b0c78513"
        algorithm = "sha256"
        validity = 90
        user_id = ""
        
        # The actual signature generation logic needs to be determined
        # This is just a placeholder that creates a hash from timestamp
        signature_input = f"{kid}{timestamp}{validity}".encode()
        signature_value = hashlib.sha256(signature_input).hexdigest()
        
        return f"Signature kid={kid}&algorithm={algorithm}&timestamp={timestamp}&validity={validity}&userId={user_id}&value={signature_value}"

    @classmethod
    async def create_async_generator(
        cls,
        model: str,
        messages: Messages,
        proxy: str = None,
        media: MediaListType = None,
        stream: bool = True,
        max_tokens: int = None,
        **kwargs
    ) -> AsyncResult:
        model = cls.get_model(model)
        
        # Generate current timestamp
        timestamp = int(time.time())
        
        headers = {
            "Accept-Encoding": "gzip",
            "app_name": "AIKEYBOARD",
            "Authorization": cls.generate_signature(timestamp),
            "Connection": "Keep-Alive",
            "Content-Type": "application/json; charset=UTF-8",
            "Host": "api.startnest.uk",
            "User-Agent": "okhttp/4.9.0",
        }
        
        async with ClientSession() as session:
            # Merge media with messages
            media = list(merge_media(media, messages))
            
            # Convert messages to the required format
            formatted_messages = []
            for i, msg in enumerate(messages):
                if isinstance(msg, dict):
                    role = msg.get("role", "user")
                    content = msg.get("content", "")
                    
                    # Create content array
                    content_array = []
                    
                    # Add images if this is the last user message and media exists
                    if media and role == "user" and i == len(messages) - 1:
                        for image, _ in media:
                            image_data_uri = to_data_uri(image)
                            content_array.append({
                                "image_url": {
                                    "url": image_data_uri
                                },
                                "type": "image_url"
                            })
                    
                    # Add text content
                    if content:
                        content_array.append({
                            "text": content,
                            "type": "text"
                        })
                    
                    formatted_messages.append({
                        "role": role,
                        "content": content_array
                    })
            
            # If only one message and no media, use format_prompt as requested
            if len(messages) == 1 and not media:
                prompt_text = format_prompt(messages)
                formatted_messages = [{
                    "role": "user",
                    "content": [{"text": prompt_text, "type": "text"}]
                }]
            
            data = {
                "isVip": True,
                "max_tokens": max_tokens,
                "messages": formatted_messages,
                "stream": stream
            }
            
            # Add advanceToolType if media is present
            if media:
                data["advanceToolType"] = "upload_and_ask"
            
            async with session.post(cls.api_endpoint, json=data, headers=headers, proxy=proxy) as response:
                response.raise_for_status()
                
                if stream:
                    # Handle streaming response (SSE format)
                    async for line in response.content:
                        if line:
                            line = line.decode('utf-8').strip()
                            if line.startswith("data: "):
                                data_str = line[6:]
                                if data_str == "[DONE]":
                                    break
                                try:
                                    json_data = json.loads(data_str)
                                    if "choices" in json_data and len(json_data["choices"]) > 0:
                                        choice = json_data["choices"][0]
                                        
                                        # Handle content
                                        delta = choice.get("delta", {})
                                        content = delta.get("content", "")
                                        if content:
                                            yield content
                                        
                                        # Handle finish_reason
                                        if "finish_reason" in choice and choice["finish_reason"] is not None:
                                            yield FinishReason(choice["finish_reason"])
                                            break
                                            
                                except json.JSONDecodeError:
                                    continue
                else:
                    # Handle non-streaming response (regular JSON)
                    response_text = await response.text()
                    try:
                        json_data = json.loads(response_text)
                        if "choices" in json_data and len(json_data["choices"]) > 0:
                            choice = json_data["choices"][0]
                            if "message" in choice and "content" in choice["message"]:
                                content = choice["message"]["content"]
                                if content:
                                    yield content.strip()
                            
                            # Handle finish_reason for non-streaming
                            if "finish_reason" in choice and choice["finish_reason"] is not None:
                                yield FinishReason(choice["finish_reason"])
                                return
                                
                    except json.JSONDecodeError:
                        # If it's still SSE format even when stream=False, handle it
                        lines = response_text.strip().split('\n')
                        full_content = []
                        finish_reason_value = None
                        
                        for line in lines:
                            if line.startswith("data: "):
                                data_str = line[6:]
                                if data_str == "[DONE]":
                                    break
                                try:
                                    json_data = json.loads(data_str)
                                    if "choices" in json_data and len(json_data["choices"]) > 0:
                                        choice = json_data["choices"][0]
                                        delta = choice.get("delta", {})
                                        content = delta.get("content", "")
                                        if content:
                                            full_content.append(content)
                                        
                                        # Store finish_reason
                                        if "finish_reason" in choice and choice["finish_reason"] is not None:
                                            finish_reason_value = choice["finish_reason"]
                                            
                                except json.JSONDecodeError:
                                    continue
                        
                        if full_content:
                            yield ''.join(full_content)
                        
                        if finish_reason_value:
                            yield FinishReason(finish_reason_value)