File size: 14,109 Bytes
a4b70d9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 |
from __future__ import annotations
import re
import base64
import html
from typing import Union, Dict, List, Optional
from abc import abstractmethod
from urllib.parse import quote, unquote
def is_content(chunk):
if isinstance(chunk, Reasoning):
if chunk.is_thinking is None and chunk.token is None:
return False
return True
return isinstance(chunk, (str, MediaResponse, AudioResponse, ToolCalls))
def quote_url(url: str) -> str:
"""
Quote parts of a URL while preserving the domain structure.
Args:
url: The URL to quote
Returns:
str: The properly quoted URL
"""
# Only unquote if needed to avoid double-unquoting
if '%' in url:
url = unquote(url)
url_parts = url.split("//", maxsplit=1)
# If there is no "//" in the URL, then it is a relative URL
if len(url_parts) == 1:
return quote(url_parts[0], '/?&=#')
protocol, rest = url_parts
domain_parts = rest.split("/", maxsplit=1)
# If there is no "/" after the domain, then it is a domain URL
if len(domain_parts) == 1:
return f"{protocol}//{domain_parts[0]}"
domain, path = domain_parts
return f"{protocol}//{domain}/{quote(path, '/?&=#')}"
def quote_title(title: str) -> str:
"""
Normalize whitespace in a title.
Args:
title: The title to normalize
Returns:
str: The title with normalized whitespace
"""
return " ".join(title.split()) if title else ""
def format_link(url: str, title: Optional[str] = None) -> str:
"""
Format a URL and title as a markdown link.
Args:
url: The URL to link to
title: The title to display. If None, extracts from URL
Returns:
str: The formatted markdown link
"""
if title is None or not title.strip():
try:
title = unquote(url.split("//", maxsplit=1)[1].split("?")[0].replace("www.", ""))
except IndexError:
title = url
return f"[{quote_title(title)}]({quote_url(url)})"
def format_image(image: str, alt: str, preview: Optional[str] = None) -> str:
"""
Formats the given image as a markdown string.
Args:
image: The image to format.
alt: The alt text for the image.
preview: The preview URL format. Defaults to the original image.
Returns:
str: The formatted markdown string.
"""
preview_url = preview.replace('{image}', image) if preview else image
# if preview_url.startswith("/media/"):
# preview_url = "/thumbnail" + preview_url[6:]
return f"[})]({quote_url(image)})"
def format_images_markdown(images: Union[str, List[str]], alt: str,
preview: Union[str, List[str]] = None) -> str:
"""
Formats the given images as a markdown string.
Args:
images: The image or list of images to format.
alt: The alt text for the images.
preview: The preview URL format or list of preview URLs.
If not provided, original images are used.
Returns:
str: The formatted markdown string.
"""
if isinstance(images, list) and len(images) == 1:
images = images[0]
if isinstance(images, str):
result = format_image(images, alt, preview)
else:
result = "\n".join(
format_image(
image,
f"#{idx+1} {alt}",
preview[idx] if isinstance(preview, list) and idx < len(preview) else preview
)
for idx, image in enumerate(images)
)
return result
class ResponseType:
@abstractmethod
def __str__(self) -> str:
"""Convert the response to a string representation."""
raise NotImplementedError
class JsonMixin:
def __init__(self, **kwargs) -> None:
"""Initialize with keyword arguments as attributes."""
for key, value in kwargs.items():
setattr(self, key, value)
def get_dict(self) -> Dict:
"""Return a dictionary of non-private attributes."""
return {
key: value
for key, value in self.__dict__.items()
if not key.startswith("__")
}
@classmethod
def from_dict(cls, data: Dict) -> JsonMixin:
"""Create an instance from a dictionary."""
return cls(**data)
def reset(self) -> None:
"""Reset all attributes."""
self.__dict__ = {}
class RawResponse(ResponseType, JsonMixin):
pass
class HiddenResponse(ResponseType):
def __str__(self) -> str:
"""Hidden responses return an empty string."""
return ""
class FinishReason(JsonMixin, HiddenResponse):
def __init__(self, reason: str) -> None:
"""Initialize with a reason."""
self.reason = reason
class ToolCalls(HiddenResponse):
def __init__(self, list: List) -> None:
"""Initialize with a list of tool calls."""
self.list = list
def get_list(self) -> List:
"""Return the list of tool calls."""
return self.list
class Usage(JsonMixin, HiddenResponse):
def __init__(
self,
promptTokens: int = None,
completionTokens: int = None,
input_tokens: int = None,
output_tokens: int = None,
**kwargs
):
if promptTokens is not None:
kwargs["prompt_tokens"] = promptTokens
if completionTokens is not None:
kwargs["completion_tokens"] = completionTokens
if input_tokens is not None:
kwargs["prompt_tokens"] = input_tokens
if output_tokens is not None:
kwargs["completion_tokens"] = output_tokens
if "total_tokens" not in kwargs and "prompt_tokens" in kwargs and "completion_tokens" in kwargs:
kwargs["total_tokens"] = kwargs["prompt_tokens"] + kwargs["completion_tokens"]
return super().__init__(**kwargs)
class AuthResult(JsonMixin, HiddenResponse):
pass
class TitleGeneration(HiddenResponse):
def __init__(self, title: str) -> None:
"""Initialize with a title."""
self.title = title
class DebugResponse(HiddenResponse):
def __init__(self, log: str) -> None:
"""Initialize with a log message."""
self.log = log
class ContinueResponse(HiddenResponse):
def __init__(self, log: str) -> None:
"""Initialize with a log message."""
self.log = log
class Reasoning(ResponseType):
def __init__(
self,
token: Optional[str] = None,
label: Optional[str] = None,
status: Optional[str] = None,
is_thinking: Optional[str] = None
) -> None:
"""Initialize with token, status, and thinking state."""
self.token = token
self.label = label
self.status = status
self.is_thinking = is_thinking
def __str__(self) -> str:
"""Return string representation based on available attributes."""
if self.is_thinking is not None:
return self.is_thinking
if self.token is not None:
return self.token
if self.status is not None:
if self.label is not None:
return f"{self.label}: {self.status}\n"
return f"{self.status}\n"
return ""
def get_dict(self) -> Dict:
"""Return a dictionary representation of the reasoning."""
if self.label is not None:
return {"label": self.label, "status": self.status}
if self.is_thinking is None:
if self.status is None:
return {"token": self.token}
return {"token": self.token, "status": self.status}
return {"token": self.token, "status": self.status, "is_thinking": self.is_thinking}
class Sources(ResponseType):
def __init__(self, sources: List[Dict[str, str]]) -> None:
"""Initialize with a list of source dictionaries."""
self.list = []
for source in sources:
self.add_source(source)
def add_source(self, source: Union[Dict[str, str], str]) -> None:
"""Add a source to the list, cleaning the URL if necessary."""
source = source if isinstance(source, dict) else {"url": source}
url = source.get("url", source.get("link", None))
if url is not None:
url = re.sub(r"[&?]utm_source=.+", "", url)
source["url"] = url
self.list.append(source)
def __str__(self) -> str:
"""Return formatted sources as a string."""
if not self.list:
return ""
return "\n\n\n\n" + ("\n>\n".join([
f"> [{idx}] {format_link(link['url'], link.get('title', None))}"
for idx, link in enumerate(self.list)
]))
class SourceLink(ResponseType):
def __init__(self, title: str, url: str) -> None:
self.title = title
self.url = url
def __str__(self) -> str:
title = f"[{self.title}]"
return f" {format_link(self.url, title)}"
class YouTubeResponse(HiddenResponse):
def __init__(self, ids: List[str], add_links: bool = False) -> None:
"""Initialize with a list of YouTube IDs."""
self.ids = ids
self.add_links = add_links
def to_string(self) -> str:
"""Return YouTube embeds as a string."""
if not self.ids:
return ""
template = '<iframe type="text/html" src="https://www.youtube.com/embed/{id}"></iframe>'
if self.add_links:
template += '\n\n<a href="https://www.youtube.com/watch?v={id}">Watch on YouTube</a>'
return "\n\n" + ("\n".join([
template.format(id=id)
for id in self.ids
]))
class AudioResponse(ResponseType):
def __init__(self, data: str, transcript: str = None, **kwargs) -> None:
"""Initialize with audio data bytes."""
self.data = data
self.transcript = transcript
self.options = kwargs
def to_uri(self) -> str:
if isinstance(self.data, str):
if self.data.startswith("/media/"):
return quote(self.data, '/?&=')
return self.data
"""Return audio data as a base64-encoded data URI."""
data_base64 = base64.b64encode(self.data).decode()
return f"data:audio/mpeg;base64,{data_base64}"
def __str__(self) -> str:
"""Return audio as html element."""
if isinstance(self.data, str) and self.data.startswith("data:"):
return f'<audio controls></audio>' + (f"\n\n{self.transcript}" if self.transcript else "")
return f'<audio controls src="{self.to_uri()}"></audio>' + (f"\n\n{self.transcript}" if self.transcript else "")
class BaseConversation(ResponseType):
def __str__(self) -> str:
"""Return an empty string by default."""
return ""
class JsonConversation(BaseConversation, JsonMixin):
pass
class SynthesizeData(HiddenResponse, JsonMixin):
def __init__(self, provider: str, data: Dict) -> None:
"""Initialize with provider and data."""
self.provider = provider
self.data = data
class SuggestedFollowups(HiddenResponse):
def __init__(self, suggestions: list[str]):
self.suggestions = suggestions
class RequestLogin(HiddenResponse):
def __init__(self, label: str, login_url: str) -> None:
"""Initialize with label and login URL."""
self.label = label
self.login_url = login_url
def to_string(self) -> str:
"""Return formatted login link as a string."""
return format_link(self.login_url, f"[Login to {self.label}]") + "\n\n"
class MediaResponse(ResponseType):
def __init__(
self,
urls: Union[str, List[str]],
alt: str,
options: Dict = {},
**kwargs
) -> None:
"""Initialize with images, alt text, and options."""
self.urls = kwargs.get("images", urls)
self.alt = alt
self.options = options
def get(self, key: str, default: any = None) -> any:
"""Get an option value by key."""
return self.options.get(key, default)
def get_list(self) -> List[str]:
"""Return images as a list."""
return [self.urls] if isinstance(self.urls, str) else self.urls
class ImageResponse(MediaResponse):
def __str__(self) -> str:
return self.to_string()
def to_string(self) -> str:
"""Return images as markdown."""
if self.get("width") and self.get("height"):
return "\n".join([
f'<a href="{html.escape(url)}" data-width="{self.get("width")}" data-height="{self.get("height")}" data-source="{html.escape(self.get("source_url", ""))}">'
+ f'<img src="{url.replace("/media/", "/thumbnail/")}" alt="{html.escape(" ".join(self.alt.split()))}"></a>'
for url in self.get_list()
])
return format_images_markdown(self.urls, self.alt, self.get("preview"))
class VideoResponse(MediaResponse):
def __str__(self) -> str:
"""Return videos as html elements."""
if self.get("preview"):
result = []
for idx, video in enumerate(self.get_list()):
image = self.get("preview")
if isinstance(image, list) and len(image) > idx:
image = image[idx]
result.append(f'<video src="{quote_url(video)}" poster="{quote_url(image)}"></video>')
return "\n".join(result)
return "\n".join([f'<video src="{quote_url(video)}"></video>' for video in self.get_list()])
class ImagePreview(ImageResponse, HiddenResponse):
pass
class PreviewResponse(HiddenResponse):
def __init__(self, data: str) -> None:
"""Initialize with data."""
self.data = data
def to_string(self) -> str:
"""Return data as a string."""
return self.data
class Parameters(ResponseType, JsonMixin):
def __str__(self) -> str:
"""Return an empty string."""
return ""
class ProviderInfo(JsonMixin, HiddenResponse):
pass
|