Spaces:
Runtime error
Runtime error
| import base64 | |
| import os | |
| from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain.tools import Tool | |
| from langchain_core.tools import tool | |
| from langchain_openai import ChatOpenAI | |
| provider = os.getenv("PROVIDER", "GOOGLE") | |
| if provider == "GOOGLE": | |
| api_key = os.getenv("GEMINI_API_KEY") | |
| vision_llm = ChatGoogleGenerativeAI( | |
| model= "gemini-2.5-pro-preview-05-06", | |
| temperature=0, | |
| max_retries=2, | |
| google_api_key=api_key, | |
| thinking_budget= 0 | |
| ) | |
| elif provider == "OPENAI": | |
| vision_llm = ChatOpenAI( | |
| model= "gpt-4o", | |
| temperature=0, | |
| max_retries=2, | |
| ) | |
| else: | |
| raise ValueError(f"Invalid provider: {provider}") | |
| def extract_text(img_path: str) -> str: | |
| """ | |
| Extract text from an image file using a multimodal model. | |
| Args: | |
| img_path: The path to the image file from which to extract text. | |
| Returns: | |
| The extracted text from the image, or an empty string if an error occurs. | |
| """ | |
| all_text = "" | |
| try: | |
| # Read image and encode as base64 | |
| with open(img_path, "rb") as image_file: | |
| image_bytes = image_file.read() | |
| image_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
| # Prepare the prompt including the base64 image data | |
| message = [ | |
| HumanMessage( | |
| content=[ | |
| { | |
| "type": "text", | |
| "text": ( | |
| "Extract all the text from this image. " | |
| "Return only the extracted text, no explanations." | |
| ), | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/png;base64,{image_base64}" | |
| }, | |
| }, | |
| ] | |
| ) | |
| ] | |
| # Call the vision-capable model | |
| response = vision_llm.invoke(message) | |
| # Append extracted text | |
| all_text += response.content + "\n\n" | |
| return all_text.strip() | |
| except Exception as e: | |
| # A butler should handle errors gracefully | |
| error_msg = f"Error extracting text: {str(e)}" | |
| print(error_msg) | |
| return "" | |
| def analyze_image_tool(user_query: str, img_path: str) -> str: | |
| """ | |
| Answer the question reasoning on the image. | |
| Args: | |
| user_query: The question to be answered based on the image. | |
| img_path: Path to the image file to be analyzed. | |
| Returns: | |
| The answer to the query based on image content, or an empty string if an error occurs. | |
| """ | |
| all_text = "" | |
| try: | |
| # Read image and encode as base64 | |
| with open(img_path, "rb") as image_file: | |
| image_bytes = image_file.read() | |
| image_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
| # Prepare the prompt including the base64 image data | |
| message = [ | |
| HumanMessage( | |
| content=[ | |
| { | |
| "type": "text", | |
| "text": ( | |
| f"User query: {user_query}" | |
| ), | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/png;base64,{image_base64}" | |
| }, | |
| }, | |
| ] | |
| ) | |
| ] | |
| # Call the vision-capable model | |
| response = vision_llm.invoke(message) | |
| # Append extracted text | |
| all_text += response.content + "\n\n" | |
| return all_text.strip() | |
| except Exception as e: | |
| # A butler should handle errors gracefully | |
| error_msg = f"Error analyzing image: {str(e)}" | |
| print(error_msg) | |
| return "" | |
| def analyze_audio_tool(user_query: str, audio_path: str) -> str: | |
| """Answer the question by reasoning on the provided audio file. | |
| Args: | |
| user_query: The question to be answered based on the audio content. | |
| audio_path: Path to the audio file (e.g., .mp3, .wav, .flac, .aac, .ogg). | |
| Returns: | |
| The answer to the query based on audio content, or an error message/empty string if an error occurs. | |
| """ | |
| try: | |
| # Determine MIME type from file extension | |
| _filename, file_extension = os.path.splitext(audio_path) | |
| file_extension = file_extension.lower() | |
| supported_formats = { | |
| ".mp3": "audio/mp3", ".wav": "audio/wav", ".flac": "audio/flac", | |
| ".aac": "audio/aac", ".ogg": "audio/ogg" | |
| } | |
| if file_extension not in supported_formats: | |
| return (f"Error: Unsupported audio file format '{file_extension}'. " | |
| f"Supported extensions: {', '.join(supported_formats.keys())}.") | |
| mime_type = supported_formats[file_extension] | |
| # Read audio file and encode as base64 | |
| with open(audio_path, "rb") as audio_file: | |
| audio_bytes = audio_file.read() | |
| audio_base64 = base64.b64encode(audio_bytes).decode("utf-8") | |
| # Prepare the prompt including the base64 audio data | |
| message = [ | |
| HumanMessage( | |
| content=[ | |
| { | |
| "type": "text", | |
| "text": f"User query: {user_query}", | |
| }, | |
| { | |
| "type": "audio", | |
| "source_type": "base64", | |
| "mime_type": mime_type, | |
| "data": audio_base64 | |
| }, | |
| ] | |
| ) | |
| ] | |
| # Call the vision-capable model | |
| response = vision_llm.invoke(message) | |
| return response.content.strip() | |
| except Exception as e: | |
| error_msg = f"Error analyzing audio: {str(e)}" | |
| print(error_msg) | |
| return "" |