|
|
from __future__ import annotations |
|
|
|
|
|
import time |
|
|
import json |
|
|
import random |
|
|
import requests |
|
|
import asyncio |
|
|
from urllib.parse import quote, quote_plus |
|
|
from typing import Optional |
|
|
from aiohttp import ClientSession, ClientTimeout |
|
|
|
|
|
from .helper import filter_none, format_media_prompt |
|
|
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin |
|
|
from ..typing import AsyncResult, Messages, MediaListType |
|
|
from ..image import is_data_an_audio |
|
|
from ..errors import MissingAuthError |
|
|
from ..requests.raise_for_status import raise_for_status |
|
|
from ..requests.aiohttp import get_connector |
|
|
from ..image import use_aspect_ratio |
|
|
from ..providers.response import ImageResponse, Reasoning, TitleGeneration, SuggestedFollowups |
|
|
from ..tools.media import render_messages |
|
|
from ..config import STATIC_URL |
|
|
from .template.OpenaiTemplate import read_response |
|
|
from .. import debug |
|
|
|
|
|
DEFAULT_HEADERS = { |
|
|
"accept": "*/*", |
|
|
'accept-language': 'en-US,en;q=0.9', |
|
|
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36", |
|
|
"referer": "https://pollinations.ai/", |
|
|
"origin": "https://pollinations.ai", |
|
|
} |
|
|
|
|
|
FOLLOWUPS_TOOLS = [{ |
|
|
"type": "function", |
|
|
"function": { |
|
|
"name": "options", |
|
|
"description": "Provides options for the conversation", |
|
|
"parameters": { |
|
|
"properties": { |
|
|
"title": { |
|
|
"title": "Conversation title. Prefixed with one or more emojies", |
|
|
"type": "string" |
|
|
}, |
|
|
"followups": { |
|
|
"items": { |
|
|
"type": "string" |
|
|
}, |
|
|
"title": "Suggested 4 Followups (only user messages)", |
|
|
"type": "array" |
|
|
} |
|
|
}, |
|
|
"title": "Conversation", |
|
|
"type": "object" |
|
|
} |
|
|
} |
|
|
}] |
|
|
|
|
|
FOLLOWUPS_DEVELOPER_MESSAGE = [{ |
|
|
"role": "developer", |
|
|
"content": "Provide conversation options.", |
|
|
}] |
|
|
|
|
|
class PollinationsAI(AsyncGeneratorProvider, ProviderModelMixin): |
|
|
label = "Pollinations AI 🌸" |
|
|
url = "https://pollinations.ai" |
|
|
login_url = "https://auth.pollinations.ai" |
|
|
active_by_default = True |
|
|
working = True |
|
|
supports_system_message = True |
|
|
supports_message_history = True |
|
|
|
|
|
|
|
|
text_api_endpoint = "https://text.pollinations.ai" |
|
|
openai_endpoint = "https://text.pollinations.ai/openai" |
|
|
image_api_endpoint = "https://image.pollinations.ai/" |
|
|
|
|
|
|
|
|
default_model = "openai" |
|
|
fallback_model = "deepseek" |
|
|
default_image_model = "flux" |
|
|
default_vision_model = default_model |
|
|
default_audio_model = "openai-audio" |
|
|
default_voice = "alloy" |
|
|
text_models = [default_model, "evil"] |
|
|
image_models = [default_image_model, "turbo", "kontext"] |
|
|
audio_models = {default_audio_model: []} |
|
|
vision_models = [default_vision_model] |
|
|
_models_loaded = False |
|
|
model_aliases = { |
|
|
"llama-4-scout": "llamascout", |
|
|
"deepseek-r1": "deepseek-reasoning", |
|
|
"sdxl-turbo": "turbo", |
|
|
"gpt-image": "gptimage", |
|
|
"flux-dev": "flux", |
|
|
"flux-schnell": "flux", |
|
|
"flux-pro": "flux", |
|
|
"flux": "flux", |
|
|
"flux-kontext": "kontext", |
|
|
} |
|
|
|
|
|
@classmethod |
|
|
def get_models(cls, **kwargs): |
|
|
def get_alias(model: dict) -> str: |
|
|
alias = model.get("name") |
|
|
if (model.get("aliases")): |
|
|
alias = model.get("aliases")[0] |
|
|
return alias.replace("-instruct", "").replace("qwen-", "qwen").replace("qwen", "qwen-") |
|
|
if not cls._models_loaded: |
|
|
try: |
|
|
|
|
|
image_response = requests.get("https://image.pollinations.ai/models") |
|
|
if image_response.ok: |
|
|
new_image_models = image_response.json() |
|
|
else: |
|
|
new_image_models = [] |
|
|
|
|
|
|
|
|
image_models = cls.image_models.copy() |
|
|
|
|
|
|
|
|
for model in new_image_models: |
|
|
if model not in image_models: |
|
|
image_models.append(model) |
|
|
|
|
|
cls.image_models = image_models |
|
|
|
|
|
text_response = requests.get("https://g4f.dev/api/pollinations.ai/models") |
|
|
if not text_response.ok: |
|
|
text_response = requests.get("https://text.pollinations.ai/models") |
|
|
text_response.raise_for_status() |
|
|
models = text_response.json() |
|
|
|
|
|
|
|
|
cls.audio_models = { |
|
|
model.get("name"): model.get("voices") |
|
|
for model in models |
|
|
if "output_modalities" in model and "audio" in model["output_modalities"] |
|
|
} |
|
|
for alias, model in cls.model_aliases.items(): |
|
|
if model in cls.audio_models and alias not in cls.audio_models: |
|
|
cls.audio_models.update({alias: {}}) |
|
|
|
|
|
cls.vision_models.extend([ |
|
|
get_alias(model) |
|
|
for model in models |
|
|
if model.get("vision") and get_alias(model) not in cls.vision_models |
|
|
]) |
|
|
|
|
|
for model in models: |
|
|
alias = get_alias(model) |
|
|
if alias not in cls.text_models: |
|
|
cls.text_models.append(alias) |
|
|
if alias != model.get("name"): |
|
|
cls.model_aliases[alias] = model.get("name") |
|
|
elif model.get("name") not in cls.text_models: |
|
|
cls.text_models.append(model.get("name")) |
|
|
cls.live += 1 |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
if not cls.text_models: |
|
|
cls.text_models = [cls.default_model] |
|
|
if not cls.image_models: |
|
|
cls.image_models = [cls.default_image_model] |
|
|
debug.error(f"Failed to fetch models: {e}") |
|
|
|
|
|
finally: |
|
|
cls._models_loaded = True |
|
|
|
|
|
|
|
|
all_models = cls.text_models.copy() |
|
|
all_models.extend(cls.image_models) |
|
|
all_models.extend(cls.audio_models.keys()) |
|
|
if cls.default_audio_model in cls.audio_models: |
|
|
all_models.extend(cls.audio_models[cls.default_audio_model]) |
|
|
return list(dict.fromkeys(all_models)) |
|
|
|
|
|
@classmethod |
|
|
def get_grouped_models(cls) -> dict[str, list[str]]: |
|
|
cls.get_models() |
|
|
return [ |
|
|
{"group": "Text Generation", "models": cls.text_models}, |
|
|
{"group": "Image Generation", "models": cls.image_models}, |
|
|
{"group": "Audio Generation", "models": list(cls.audio_models.keys())}, |
|
|
{"group": "Audio Voices", "models": cls.audio_models.get(cls.default_audio_model, [])}, |
|
|
] |
|
|
|
|
|
@classmethod |
|
|
async def create_async_generator( |
|
|
cls, |
|
|
model: str, |
|
|
messages: Messages, |
|
|
stream: bool = True, |
|
|
proxy: str = None, |
|
|
cache: bool = None, |
|
|
referrer: str = STATIC_URL, |
|
|
api_key: str = None, |
|
|
extra_body: dict = None, |
|
|
|
|
|
prompt: str = None, |
|
|
aspect_ratio: str = None, |
|
|
width: int = None, |
|
|
height: int = None, |
|
|
seed: Optional[int] = None, |
|
|
nologo: bool = True, |
|
|
private: bool = False, |
|
|
enhance: bool = None, |
|
|
safe: bool = False, |
|
|
transparent: bool = False, |
|
|
n: int = 1, |
|
|
|
|
|
media: MediaListType = None, |
|
|
temperature: float = None, |
|
|
presence_penalty: float = None, |
|
|
top_p: float = None, |
|
|
frequency_penalty: float = None, |
|
|
response_format: Optional[dict] = None, |
|
|
extra_parameters: list[str] = ["tools", "parallel_tool_calls", "tool_choice", "reasoning_effort", "logit_bias", "voice", "modalities", "audio"], |
|
|
**kwargs |
|
|
) -> AsyncResult: |
|
|
if cache is None: |
|
|
cache = kwargs.get("action") == "next" |
|
|
if extra_body is None: |
|
|
extra_body = {} |
|
|
if not model: |
|
|
has_audio = "audio" in kwargs or "audio" in kwargs.get("modalities", []) |
|
|
if not has_audio and media is not None: |
|
|
for media_data, filename in media: |
|
|
if is_data_an_audio(media_data, filename): |
|
|
has_audio = True |
|
|
break |
|
|
model = cls.default_audio_model if has_audio else model |
|
|
elif cls._models_loaded or cls.get_models(): |
|
|
if model in cls.model_aliases: |
|
|
model = cls.model_aliases[model] |
|
|
debug.log(f"Using model: {model}") |
|
|
if model in cls.image_models: |
|
|
async for chunk in cls._generate_image( |
|
|
model="gptimage" if model == "transparent" else model, |
|
|
prompt=format_media_prompt(messages, prompt), |
|
|
media=media, |
|
|
proxy=proxy, |
|
|
aspect_ratio=aspect_ratio, |
|
|
width=width, |
|
|
height=height, |
|
|
seed=seed, |
|
|
cache=cache, |
|
|
nologo=nologo, |
|
|
private=private, |
|
|
enhance=enhance, |
|
|
safe=safe, |
|
|
transparent=transparent or model == "transparent", |
|
|
n=n, |
|
|
referrer=referrer, |
|
|
api_key=api_key |
|
|
): |
|
|
yield chunk |
|
|
else: |
|
|
if prompt is not None and len(messages) == 1: |
|
|
messages = [{ |
|
|
"role": "user", |
|
|
"content": prompt |
|
|
}] |
|
|
if model and model in cls.audio_models[cls.default_audio_model]: |
|
|
kwargs["audio"] = { |
|
|
"voice": model, |
|
|
} |
|
|
model = cls.default_audio_model |
|
|
async for result in cls._generate_text( |
|
|
model=model, |
|
|
messages=messages, |
|
|
media=media, |
|
|
proxy=proxy, |
|
|
temperature=temperature, |
|
|
presence_penalty=presence_penalty, |
|
|
top_p=top_p, |
|
|
frequency_penalty=frequency_penalty, |
|
|
response_format=response_format, |
|
|
seed=seed, |
|
|
cache=cache, |
|
|
stream=stream, |
|
|
extra_parameters=extra_parameters, |
|
|
referrer=referrer, |
|
|
api_key=api_key, |
|
|
extra_body=extra_body, |
|
|
**kwargs |
|
|
): |
|
|
yield result |
|
|
|
|
|
@classmethod |
|
|
async def _generate_image( |
|
|
cls, |
|
|
model: str, |
|
|
prompt: str, |
|
|
media: MediaListType, |
|
|
proxy: str, |
|
|
aspect_ratio: str, |
|
|
width: int, |
|
|
height: int, |
|
|
seed: Optional[int], |
|
|
cache: bool, |
|
|
nologo: bool, |
|
|
private: bool, |
|
|
enhance: bool, |
|
|
safe: bool, |
|
|
transparent: bool, |
|
|
n: int, |
|
|
referrer: str, |
|
|
api_key: str, |
|
|
timeout: int = 120 |
|
|
) -> AsyncResult: |
|
|
if enhance is None: |
|
|
enhance = True if model == "flux" else False |
|
|
params = { |
|
|
"model": model, |
|
|
"nologo": str(nologo).lower(), |
|
|
"private": str(private).lower(), |
|
|
"enhance": str(enhance).lower(), |
|
|
"safe": str(safe).lower(), |
|
|
"referrer": referrer |
|
|
} |
|
|
if transparent: |
|
|
params["transparent"] = "true" |
|
|
image = [data for data, _ in media if isinstance(data, str) and data.startswith("http")] if media else [] |
|
|
if image: |
|
|
params["image"] = ",".join(image) |
|
|
if model != "gptimage": |
|
|
params = use_aspect_ratio({ |
|
|
"width": width, |
|
|
"height": height, |
|
|
**params |
|
|
}, "1:1" if aspect_ratio is None else aspect_ratio) |
|
|
query = "&".join(f"{k}={quote(str(v))}" for k, v in params.items() if v is not None) |
|
|
encoded_prompt = prompt.strip(". \n") |
|
|
if model == "gptimage" and aspect_ratio is not None: |
|
|
encoded_prompt = f"{encoded_prompt} aspect-ratio: {aspect_ratio}" |
|
|
encoded_prompt = quote_plus(encoded_prompt)[:4096-len(cls.image_api_endpoint)-len(query)-8].rstrip("%") |
|
|
url = f"{cls.image_api_endpoint}prompt/{encoded_prompt}?{query}" |
|
|
def get_url_with_seed(i: int, seed: Optional[int] = None): |
|
|
if model == "gptimage": |
|
|
return url |
|
|
if i == 0: |
|
|
if not cache and seed is None: |
|
|
seed = random.randint(0, 2**32) |
|
|
else: |
|
|
seed = random.randint(0, 2**32) |
|
|
return f"{url}&seed={seed}" if seed else url |
|
|
headers = {"referer": referrer} |
|
|
if api_key: |
|
|
headers["authorization"] = f"Bearer {api_key}" |
|
|
async with ClientSession( |
|
|
headers=DEFAULT_HEADERS, |
|
|
connector=get_connector(proxy=proxy), |
|
|
timeout=ClientTimeout(timeout) |
|
|
) as session: |
|
|
responses = set() |
|
|
yield Reasoning(label=f"Generating {n} {'image' if n == 1 else 'images'}") |
|
|
finished = 0 |
|
|
start = time.time() |
|
|
async def get_image(responses: set, i: int, seed: Optional[int] = None): |
|
|
try: |
|
|
async with session.get(get_url_with_seed(i, seed), allow_redirects=False, headers=headers) as response: |
|
|
await raise_for_status(response) |
|
|
except Exception as e: |
|
|
responses.add(e) |
|
|
debug.error(f"Error fetching image: {e}") |
|
|
responses.add(ImageResponse(str(response.url), prompt, {"headers": headers})) |
|
|
tasks: list[asyncio.Task] = [] |
|
|
for i in range(int(n)): |
|
|
tasks.append(asyncio.create_task(get_image(responses, i, seed))) |
|
|
while finished < n or len(responses) > 0: |
|
|
while len(responses) > 0: |
|
|
item = responses.pop() |
|
|
if isinstance(item, Exception): |
|
|
if finished < 2: |
|
|
yield Reasoning(status="") |
|
|
for task in tasks: |
|
|
task.cancel() |
|
|
if cls.login_url in str(item): |
|
|
raise MissingAuthError(item) |
|
|
raise item |
|
|
else: |
|
|
finished += 1 |
|
|
yield Reasoning(label=f"Image {finished}/{n} generated in {time.time() - start:.2f}s") |
|
|
yield item |
|
|
await asyncio.sleep(1) |
|
|
yield Reasoning(status="") |
|
|
await asyncio.gather(*tasks) |
|
|
|
|
|
@classmethod |
|
|
async def _generate_text( |
|
|
cls, |
|
|
model: str, |
|
|
messages: Messages, |
|
|
media: MediaListType, |
|
|
proxy: str, |
|
|
temperature: float, |
|
|
presence_penalty: float, |
|
|
top_p: float, |
|
|
frequency_penalty: float, |
|
|
response_format: Optional[dict], |
|
|
seed: Optional[int], |
|
|
cache: bool, |
|
|
stream: bool, |
|
|
extra_parameters: list[str], |
|
|
referrer: str, |
|
|
api_key: str, |
|
|
extra_body: dict, |
|
|
**kwargs |
|
|
) -> AsyncResult: |
|
|
if not cache and seed is None: |
|
|
seed = random.randint(0, 2**32) |
|
|
|
|
|
async with ClientSession(headers=DEFAULT_HEADERS, connector=get_connector(proxy=proxy)) as session: |
|
|
extra_body.update({param: kwargs[param] for param in extra_parameters if param in kwargs}) |
|
|
if model in cls.audio_models: |
|
|
if "audio" in extra_body and extra_body.get("audio", {}).get("voice") is None: |
|
|
extra_body["audio"]["voice"] = cls.default_voice |
|
|
elif "audio" not in extra_body: |
|
|
extra_body["audio"] = {"voice": cls.default_voice} |
|
|
if extra_body.get("audio", {}).get("format") is None: |
|
|
extra_body["audio"]["format"] = "mp3" |
|
|
stream = False |
|
|
if "modalities" not in extra_body: |
|
|
extra_body["modalities"] = ["text", "audio"] |
|
|
data = filter_none( |
|
|
messages=list(render_messages(messages, media)), |
|
|
model=model, |
|
|
temperature=temperature, |
|
|
presence_penalty=presence_penalty, |
|
|
top_p=top_p, |
|
|
frequency_penalty=frequency_penalty, |
|
|
response_format=response_format, |
|
|
stream=stream, |
|
|
seed=None if model =="grok" else seed, |
|
|
referrer=referrer, |
|
|
**extra_body |
|
|
) |
|
|
headers = {"referer": referrer} |
|
|
if api_key: |
|
|
headers["authorization"] = f"Bearer {api_key}" |
|
|
async with session.post(cls.openai_endpoint, json=data, headers=headers) as response: |
|
|
if response.status in (400, 500): |
|
|
debug.error(f"Error: {response.status} - Bad Request: {data}") |
|
|
full_resposne = [] |
|
|
async for chunk in read_response(response, stream, format_media_prompt(messages), cls.get_dict(), kwargs.get("download_media", True)): |
|
|
if isinstance(chunk, str): |
|
|
full_resposne.append(chunk) |
|
|
yield chunk |
|
|
if full_resposne: |
|
|
full_content = "".join(full_resposne) |
|
|
if kwargs.get("action") == "next" and model != "evil": |
|
|
tool_messages = [] |
|
|
for message in messages: |
|
|
if message.get("role") == "user": |
|
|
if isinstance(message.get("content"), str): |
|
|
tool_messages.append({"role": "user", "content": message.get("content")}) |
|
|
elif isinstance(message.get("content"), list): |
|
|
next_value = message.get("content").pop() |
|
|
if isinstance(next_value, dict): |
|
|
next_value = next_value.get("text") |
|
|
if next_value: |
|
|
tool_messages.append({"role": "user", "content": next_value}) |
|
|
tool_messages.append({"role": "assistant", "content": full_content}) |
|
|
data = { |
|
|
"model": "openai", |
|
|
"messages": tool_messages + FOLLOWUPS_DEVELOPER_MESSAGE, |
|
|
"tool_choice": "required", |
|
|
"tools": FOLLOWUPS_TOOLS |
|
|
} |
|
|
async with session.post(cls.openai_endpoint, json=data, headers=headers) as response: |
|
|
try: |
|
|
await raise_for_status(response) |
|
|
tool_calls = (await response.json()).get("choices", [{}])[0].get("message", {}).get("tool_calls", []) |
|
|
if tool_calls: |
|
|
arguments = json.loads(tool_calls.pop().get("function", {}).get("arguments")) |
|
|
if arguments.get("title"): |
|
|
yield TitleGeneration(arguments.get("title")) |
|
|
if arguments.get("followups"): |
|
|
yield SuggestedFollowups(arguments.get("followups")) |
|
|
except Exception as e: |
|
|
debug.error("Error generating title and followups:", e) |