from __future__ import annotations import os import base64 from typing import Iterator, Union from urllib.parse import urlparse from pathlib import Path from ..typing import Messages from ..image import is_data_an_media, to_input_audio, is_valid_media, is_valid_audio, to_data_uri from .files import get_bucket_dir, read_bucket def render_media(bucket_id: str, name: str, url: str, as_path: bool = False, as_base64: bool = False, **kwargs) -> Union[str, Path]: if (as_base64 or as_path or url.startswith("/")): file = Path(get_bucket_dir(bucket_id, "thumbnail", name)) if not file.exists(): file = Path(get_bucket_dir(bucket_id, "media", name)) if as_path: return file data = file.read_bytes() data_base64 = base64.b64encode(data).decode() if as_base64: return data_base64 return f"data:{is_data_an_media(data, name)};base64,{data_base64}" return url def render_part(part: dict) -> dict: if "type" in part: return part text = part.get("text") if text: return { "type": "text", "text": text } filename = part.get("name") if (filename is None): bucket_dir = Path(get_bucket_dir(part.get("bucket_id"))) return { "type": "text", "text": "".join(read_bucket(bucket_dir)) } if is_valid_audio(filename=filename): return { "type": "input_audio", "input_audio": { "data": render_media(**part, as_base64=True), "format": os.path.splitext(filename)[1][1:] } } if is_valid_media(filename=filename): return { "type": "image_url", "image_url": {"url": render_media(**part)} } def merge_media(media: list, messages: list) -> Iterator: buffer = [] # Read media from the last user message for message in messages: if message.get("role") == "user": content = message.get("content") if isinstance(content, list): for part in content: if "type" not in part and "name" in part and "text" not in part: path = render_media(**part, as_path=True) buffer.append((path, os.path.basename(path))) elif part.get("type") == "image_url": path: str = urlparse(part.get("image_url")).path if path.startswith("/files/"): path = get_bucket_dir(path.split(path, "/")[1:]) if os.path.exists(path): buffer.append((Path(path), os.path.basename(path))) else: buffer.append((part.get("image_url"), None)) else: buffer.append((part.get("image_url"), None)) else: buffer = [] yield from buffer if media is not None: yield from media def render_messages(messages: Messages, media: list = None) -> Iterator: last_is_assistant = False for idx, message in enumerate(messages): # Remove duplicate assistant messages if message.get("role") == "assistant": if last_is_assistant: continue last_is_assistant = True else: last_is_assistant = False # Render content parts if isinstance(message.get("content"), list): parts = [render_part(part) for part in message["content"] if part] yield { **message, "content": [part for part in parts if part] } else: # Append media to the last message if media is not None and idx == len(messages) - 1: yield { **message, "content": [ { "type": "input_audio", "input_audio": to_input_audio(media_data, filename) } if is_valid_audio(media_data, filename) else { "type": "image_url", "image_url": {"url": to_data_uri(media_data)} } for media_data, filename in media if media_data and is_valid_media(media_data, filename) ] + ([{"type": "text", "text": message["content"]}] if isinstance(message["content"], str) else message["content"]) } else: yield message