gpt / gpt4free /g4f /Provider /Blackbox.py
LoRDxdd's picture
Add gpt4free API for Hugging Face
a4b70d9
from __future__ import annotations
from aiohttp import ClientSession
import os
import re
import json
import random
import string
from pathlib import Path
from typing import Optional
from datetime import datetime, timedelta
from ..typing import AsyncResult, Messages, MediaListType
from ..requests.raise_for_status import raise_for_status
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
from ..image import to_data_uri
from .helper import render_messages
from ..providers.response import JsonConversation
from ..tools.media import merge_media
from .. import debug
class Conversation(JsonConversation):
validated_value: str = None
chat_id: str = None
message_history: Messages = []
def __init__(self, model: str):
self.model = model
class Blackbox(AsyncGeneratorProvider, ProviderModelMixin):
label = "Blackbox AI"
url = "https://www.blackbox.ai"
api_endpoint = "https://www.blackbox.ai/api/chat"
working = True
active_by_default = True
supports_stream = True
supports_system_message = True
supports_message_history = True
default_model = "blackboxai"
default_vision_model = default_model
models = [
default_model,
"gpt-4.1-mini",
"gpt-4.1-nano",
"gpt-4",
"gpt-4o",
"gpt-4o-mini",
# Trending agent modes
'Python Agent',
'HTML Agent',
'Builder Agent',
'Java Agent',
'JavaScript Agent',
'React Agent',
'Android Agent',
'Flutter Agent',
'Next.js Agent',
'AngularJS Agent',
'Swift Agent',
'MongoDB Agent',
'PyTorch Agent',
'Xcode Agent',
'Azure Agent',
'Bitbucket Agent',
'DigitalOcean Agent',
'Docker Agent',
'Electron Agent',
'Erlang Agent',
'FastAPI Agent',
'Firebase Agent',
'Flask Agent',
'Git Agent',
'Gitlab Agent',
'Go Agent',
'Godot Agent',
'Google Cloud Agent',
'Heroku Agent'
]
vision_models = [default_vision_model]
# Trending agent modes
trendingAgentMode = {
'Python Agent': {'mode': True, 'id': "python"},
'HTML Agent': {'mode': True, 'id': "html"},
'Builder Agent': {'mode': True, 'id': "builder"},
'Java Agent': {'mode': True, 'id': "java"},
'JavaScript Agent': {'mode': True, 'id': "javascript"},
'React Agent': {'mode': True, 'id': "react"},
'Android Agent': {'mode': True, 'id': "android"},
'Flutter Agent': {'mode': True, 'id': "flutter"},
'Next.js Agent': {'mode': True, 'id': "next.js"},
'AngularJS Agent': {'mode': True, 'id': "angularjs"},
'Swift Agent': {'mode': True, 'id': "swift"},
'MongoDB Agent': {'mode': True, 'id': "mongodb"},
'PyTorch Agent': {'mode': True, 'id': "pytorch"},
'Xcode Agent': {'mode': True, 'id': "xcode"},
'Azure Agent': {'mode': True, 'id': "azure"},
'Bitbucket Agent': {'mode': True, 'id': "bitbucket"},
'DigitalOcean Agent': {'mode': True, 'id': "digitalocean"},
'Docker Agent': {'mode': True, 'id': "docker"},
'Electron Agent': {'mode': True, 'id': "electron"},
'Erlang Agent': {'mode': True, 'id': "erlang"},
'FastAPI Agent': {'mode': True, 'id': "fastapi"},
'Firebase Agent': {'mode': True, 'id': "firebase"},
'Flask Agent': {'mode': True, 'id': "flask"},
'Git Agent': {'mode': True, 'id': "git"},
'Gitlab Agent': {'mode': True, 'id': "gitlab"},
'Go Agent': {'mode': True, 'id': "go"},
'Godot Agent': {'mode': True, 'id': "godot"},
'Google Cloud Agent': {'mode': True, 'id': "googlecloud"},
'Heroku Agent': {'mode': True, 'id': "heroku"},
}
# Complete list of all models (for authorized users)
_all_models = list(dict.fromkeys([
*models,
*list(trendingAgentMode.keys())
]))
@classmethod
async def fetch_validated(cls, url: str = "https://www.blackbox.ai", force_refresh: bool = False) -> Optional[str]:
cache_path = Path(os.path.expanduser("~")) / ".g4f" / "cache"
cache_file = cache_path / 'blackbox.json'
if not force_refresh and cache_file.exists():
try:
with open(cache_file, 'r') as f:
data = json.load(f)
if data.get('validated_value'):
return data['validated_value']
except Exception as e:
debug.log(f"Blackbox: Error reading cache: {e}")
js_file_pattern = r'static/chunks/\d{4}-[a-fA-F0-9]+\.js'
uuid_pattern = r'["\']([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})["\']'
def is_valid_context(text: str) -> bool:
return any(char + '=' in text for char in 'abcdefghijklmnopqrstuvwxyz')
async with ClientSession() as session:
try:
async with session.get(url) as response:
if response.status != 200:
return None
page_content = await response.text()
js_files = re.findall(js_file_pattern, page_content)
for js_file in js_files:
js_url = f"{url}/_next/{js_file}"
async with session.get(js_url) as js_response:
if js_response.status == 200:
js_content = await js_response.text()
for match in re.finditer(uuid_pattern, js_content):
start = max(0, match.start() - 10)
end = min(len(js_content), match.end() + 10)
context = js_content[start:end]
if is_valid_context(context):
validated_value = match.group(1)
cache_file.parent.mkdir(exist_ok=True, parents=True)
try:
with open(cache_file, 'w') as f:
json.dump({'validated_value': validated_value}, f)
except Exception as e:
debug.log(f"Blackbox: Error writing cache: {e}")
return validated_value
except Exception as e:
debug.log(f"Blackbox: Error retrieving validated_value: {e}")
return None
@classmethod
def generate_id(cls, length: int = 7) -> str:
chars = string.ascii_letters + string.digits
return ''.join(random.choice(chars) for _ in range(length))
@classmethod
async def create_async_generator(
cls,
model: str,
messages: Messages,
prompt: str = None,
proxy: str = None,
media: MediaListType = None,
top_p: float = None,
temperature: float = None,
max_tokens: int = None,
conversation: Conversation = None,
return_conversation: bool = True,
**kwargs
) -> AsyncResult:
model = cls.get_model(model)
headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9',
'content-type': 'application/json',
'origin': 'https://www.blackbox.ai',
'referer': 'https://www.blackbox.ai/',
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36'
}
async with ClientSession(headers=headers) as session:
if conversation is None or not hasattr(conversation, "chat_id"):
conversation = Conversation(model)
conversation.validated_value = await cls.fetch_validated()
conversation.chat_id = cls.generate_id()
conversation.message_history = []
current_messages = []
for i, msg in enumerate(render_messages(messages)):
msg_id = conversation.chat_id if i == 0 and msg["role"] == "user" else cls.generate_id()
current_msg = {
"id": msg_id,
"content": msg["content"],
"role": msg["role"]
}
current_messages.append(current_msg)
media = list(merge_media(media, messages))
if media:
current_messages[-1]['data'] = {
"imagesData": [
{
"filePath": f"/{image_name}",
"contents": to_data_uri(image)
}
for image, image_name in media
],
"fileText": "",
"title": ""
}
data = {
"messages": current_messages,
"agentMode": {},
"id": conversation.chat_id,
"previewToken": None,
"userId": None,
"codeModelMode": True,
"trendingAgentMode": cls.trendingAgentMode.get(model, {}) if model in cls.trendingAgentMode else {},
"isMicMode": False,
"userSystemPrompt": None,
"maxTokens": max_tokens,
"playgroundTopP": top_p,
"playgroundTemperature": temperature,
"isChromeExt": False,
"githubToken": "",
"clickedAnswer2": False,
"clickedAnswer3": False,
"clickedForceWebSearch": False,
"visitFromDelta": False,
"isMemoryEnabled": False,
"mobileClient": False,
"userSelectedModel": None,
"validated": conversation.validated_value,
"imageGenerationMode": False,
"webSearchModePrompt": False,
"deepSearchMode": False,
"designerMode": False,
"domains": None,
"vscodeClient": False,
"codeInterpreterMode": False,
"customProfile": {
"additionalInfo": "",
"enableNewChats": False,
"name": "",
"occupation": "",
"traits": []
},
"webSearchModeOption": {
"autoMode": False,
"webMode": False,
"offlineMode": False
},
"session": None,
"isPremium": True,
"subscriptionCache": None,
"beastMode": False,
"reasoningMode": False,
"workspaceId": "",
"asyncMode": False,
"webSearchMode": False
}
# Continue with the API request and async generator behavior
async with session.post(cls.api_endpoint, json=data, proxy=proxy) as response:
await raise_for_status(response)
# Collect the full response
full_response = []
async for chunk in response.content.iter_any():
if chunk:
chunk_text = chunk.decode()
if chunk_text != "Login to continue using":
full_response.append(chunk_text)
yield chunk_text
full_response_text = ''.join(full_response)
# Handle conversation history
if return_conversation:
conversation.message_history.append({"role": "assistant", "content": full_response_text})
yield conversation