Spaces:
Paused
Paused
File size: 2,839 Bytes
4efde5d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
"""
Utility functions for handling image operations.
"""
import base64
import uuid
from datetime import datetime
from utils.logger import logger
from services.supabase import DBConnection
async def upload_base64_image(base64_data: str, bucket_name: str = "browser-screenshots") -> str:
"""Upload a base64 encoded image to Supabase storage and return the URL.
Args:
base64_data (str): Base64 encoded image data (with or without data URL prefix)
bucket_name (str): Name of the storage bucket to upload to
Returns:
str: Public URL of the uploaded image
"""
try:
# Remove data URL prefix if present
if base64_data.startswith('data:'):
base64_data = base64_data.split(',')[1]
# Decode base64 data
image_data = base64.b64decode(base64_data)
# Generate unique filename
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
unique_id = str(uuid.uuid4())[:8]
filename = f"image_{timestamp}_{unique_id}.png"
# Upload to Supabase storage
db = DBConnection()
client = await db.client
storage_response = await client.storage.from_(bucket_name).upload(
filename,
image_data,
{"content-type": "image/png"}
)
# Get public URL
public_url = await client.storage.from_(bucket_name).get_public_url(filename)
logger.debug(f"Successfully uploaded image to {public_url}")
return public_url
except Exception as e:
logger.error(f"Error uploading base64 image: {e}")
raise RuntimeError(f"Failed to upload image: {str(e)}")
async def upload_image_bytes(image_bytes: bytes, content_type: str = "image/png", bucket_name: str = "agent-profile-images") -> str:
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
unique_id = str(uuid.uuid4())[:8]
ext = "png"
if content_type == "image/jpeg" or content_type == "image/jpg":
ext = "jpg"
elif content_type == "image/webp":
ext = "webp"
elif content_type == "image/gif":
ext = "gif"
filename = f"agent_profile_{timestamp}_{unique_id}.{ext}"
db = DBConnection()
client = await db.client
await client.storage.from_(bucket_name).upload(
filename,
image_bytes,
{"content-type": content_type}
)
public_url = await client.storage.from_(bucket_name).get_public_url(filename)
logger.debug(f"Successfully uploaded agent profile image to {public_url}")
return public_url
except Exception as e:
logger.error(f"Error uploading image bytes: {e}")
raise RuntimeError(f"Failed to upload image: {str(e)}") |