Spaces:
Sleeping
Sleeping
| import os | |
| import random | |
| import requests | |
| import tempfile | |
| import re | |
| from typing import Dict | |
| from pathlib import Path | |
| #from markitdown import MarkItDown | |
| from urllib.parse import urlparse | |
| from langchain_core.tools import tool | |
| from langchain_core.messages import ToolMessage | |
| from langchain_tavily import TavilySearch | |
| from langchain_community.utilities import GoogleSerperAPIWrapper | |
| from langchain_community.document_loaders import WikipediaLoader | |
| from langchain_community.document_loaders import ArxivLoader | |
| def web_search(query: str) -> ToolMessage: | |
| """Search in the web with Tavily for a query and return maximum 5 results. | |
| Args: | |
| query: The search query. | |
| Returns: | |
| Tavily output, and snippet for the top 5 results | |
| """ | |
| return TavilySearch(max_results=5, include_images=False).invoke({"query": query}) | |
| def search_tool(query: str) -> str: | |
| """Search in Google and returns an string with title, link, and snippet for the top 10 results. | |
| Args: | |
| query: str | |
| Returns: | |
| Title, link, and snippet for the top 10 results | |
| """ | |
| searcher = GoogleSerperAPIWrapper(k=10) | |
| retries = 3 | |
| result = "" | |
| while retries > 0: | |
| try: | |
| search_results = searcher.results(query)["organic"] | |
| for row in search_results: | |
| result += f"Title: {row['title']}\nSnippet: {row['snippet']}\nURL: {row['link']}\n\n" | |
| return result | |
| except Exception as e: | |
| retries -= 1 | |
| return f"There was an error with Google search: {e}" | |
| def wikipedia_search(query: str) -> Dict[str, list]: | |
| """Search Wikipedia for a given query and return the first 10 results. | |
| Args: | |
| query: The search term or topic. | |
| Returns: | |
| A dictionary containing the formatted Wikipedia results. | |
| """ | |
| search_docs = WikipediaLoader(query=query, load_max_docs=10).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in search_docs | |
| ] | |
| ) | |
| return {"wiki_results": formatted_search_docs} | |
| #Mathematical tools | |
| def multiply(a: float, b: float) -> float: | |
| """Multiply two numbers. | |
| Args: | |
| a: first number | |
| b: second number | |
| Returns: | |
| Multiplication result | |
| """ | |
| return a * b | |
| def add(a: float, b: float) -> float: | |
| """Add two numbers. | |
| Args: | |
| a: first number | |
| b: second number | |
| Returns: | |
| Addition result | |
| """ | |
| return a + b | |
| def subtract(a: float, b: float) -> float: | |
| """Subtract two numbers. | |
| Args: | |
| a: first number | |
| b: second number | |
| Returns: | |
| Subtraction result | |
| """ | |
| return a - b | |
| def divide(a: float, b: float) -> float: | |
| """Divide two numbers. | |
| Args: | |
| a: first number | |
| b: second number | |
| Returns: | |
| Division result | |
| """ | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero.") | |
| return a / b | |
| def modulus(a: int, b: int) -> int: | |
| """Get the modulus of two numbers. | |
| Args: | |
| a: first number | |
| b: second number | |
| Returns: | |
| Modulus result | |
| """ | |
| return a % b | |
| from langchain_core.tools import tool | |
| def convert_units(value: float, from_unit: str, to_unit: str) -> float: | |
| """ | |
| Converts a value from one unit to another. | |
| Args: | |
| value: The numerical value to convert. | |
| from_unit: The original unit (e.g. 'miles', 'kg', 'celsius'). | |
| to_unit: The target unit (e.g. 'kilometers', 'lb', 'fahrenheit'). | |
| Supported conversions: | |
| - miles <-> kilometers | |
| - kilograms <-> pounds | |
| - celsius <-> fahrenheit | |
| Returns: | |
| The converted value result. | |
| """ | |
| conversions = { | |
| ("miles", "kilometers"): lambda v: v * 1.60934, | |
| ("kilometers", "miles"): lambda v: v / 1.60934, | |
| ("kilograms", "pounds"): lambda v: v * 2.20462, | |
| ("pounds", "kilograms"): lambda v: v / 2.20462, | |
| ("celsius", "fahrenheit"): lambda v: (v * 9/5) + 32, | |
| ("fahrenheit", "celsius"): lambda v: (v - 32) * 5/9, | |
| } | |
| key = (from_unit.lower(), to_unit.lower()) | |
| if key not in conversions: | |
| raise ValueError(f"Conversion from {from_unit} to {to_unit} not supported.") | |
| return conversions[key](value) | |
| def query_table_data(file_path: str, query: str, sheet_name: str = None) -> str: | |
| """ | |
| Loads a table from CSV or Excel and filters it using a pandas query. | |
| Args: | |
| file_path: Path to the table file (.xlsx, .xls). | |
| query_pandas_syntax: A pandas-compatible query string, e.g., "Age > 30 and Country == 'USA'". | |
| sheet_name: Optional sheet name if the file is Excel. | |
| Returns: | |
| A string representation (markdown) of the filtered table (max 10 rows). | |
| """ | |
| try: | |
| import pandas as pd | |
| path = Path(file_path) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"File not found: {file_path}") | |
| ext = path.suffix.lower() | |
| if ext == ".csv": | |
| df = pd.read_csv(path) | |
| elif ext in [".xlsx", ".xls"]: | |
| df = pd.read_excel(path, sheet_name=sheet_name) | |
| else: | |
| raise ValueError(f"Unsupported file extension: {ext}") | |
| try: | |
| #Converts a natural language query to pandas query syntax using basic heuristics. | |
| # Preprocess query | |
| query_l = query.lower().strip() | |
| # Heuristic rules | |
| rules = [ | |
| (r"(\w+) greater than (\d+)", r"\1 > \2"), | |
| (r"(\w+) less than (\d+)", r"\1 < \2"), | |
| (r"(\w+) equal to ['\"]?([\w\s]+)['\"]?", r"\1 == '\2'"), | |
| (r"(\w+) not equal to ['\"]?([\w\s]+)['\"]?", r"\1 != '\2'"), | |
| (r"(\w+) more than (\d+)", r"\1 > \2"), | |
| (r"(\w+) less than or equal to (\d+)", r"\1 <= \2"), | |
| (r"(\w+) greater than or equal to (\d+)", r"\1 >= \2"), | |
| (r"(\w+) is ['\"]?([\w\s]+)['\"]?", r"\1 == '\2'"), | |
| ] | |
| for pattern, replacement in rules: | |
| if re.search(pattern, query): | |
| query = re.sub(pattern, replacement, query) | |
| break | |
| # Handle AND/OR logic | |
| query_pandas_syntax = query.replace(" and ", " and ") | |
| query_pandas_syntaxs = query.replace(" or ", " or ") | |
| filtered_df = df.query(query_pandas_syntax) | |
| return filtered_df.head(10).to_markdown(index=False) | |
| except Exception as e: | |
| raise ValueError(f"Invalid query: {query_pandas_syntax}. Error: {e}") | |
| except ImportError: | |
| return "Error: pandas and openpyxl are not installed. Please install them with 'pip install pandas openpyxl'." | |
| def arvix_search(query: str) -> str: | |
| """Search Arxiv for a query and return maximum 5 result. | |
| Args: | |
| query: The search query. | |
| Returns: | |
| A dictionary containing the formatted Arvix results, and snippet for the top 5 results. | |
| """ | |
| search_docs = ArxivLoader(query=query, load_max_docs=5).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return {"arvix_results": formatted_search_docs} | |
| def read_python_file(file_path: str) -> str: | |
| """ | |
| Reads and parses an Python file to markdown. | |
| Args: | |
| file_path: Path to the Python file | |
| Returns: | |
| Python file content. | |
| """ | |
| try: | |
| # Just with markitdown | |
| path = Path(file_path) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"File not found: {file_path}") | |
| ext = path.suffix.lower() | |
| if ext == ".py": | |
| md = MarkItDown(enable_plugins=True) | |
| result = md.convert(file_path) | |
| return result.text_content | |
| else: | |
| raise ValueError(f"Unsupported file extension: {ext}") | |
| except Exception as err: | |
| raise type(err)(f"Could not parse python file > {err}") | |
| def save_and_read_file(content: str, filename: str = None) -> str: | |
| """ | |
| Save content to a temporary file and return the path. | |
| Useful for processing files from the GAIA API. | |
| Args: | |
| content: The content to save to the file | |
| filename: Optional filename, will generate a random name if not provided | |
| Returns: | |
| Path to the saved file | |
| """ | |
| temp_dir = tempfile.gettempdir() | |
| if filename is None: | |
| temp_file = tempfile.NamedTemporaryFile(delete=False) | |
| filepath = temp_file.name | |
| else: | |
| filepath = os.path.join(temp_dir, filename) | |
| # Write content to the file | |
| with open(filepath, 'w') as f: | |
| f.write(content) | |
| return f"File saved to {filepath}. You can read this file to process its contents." | |
| def download_file_from_url(url: str, filename: str) -> str: | |
| """ | |
| Download a file from a URL and save it to a temporary location. | |
| Args: | |
| url: The URL to download from | |
| filename: filename | |
| Returns: | |
| Path to the downloaded file | |
| """ | |
| try: | |
| # Parse URL to get filename if not provided | |
| if not filename: | |
| path = urlparse(url).path | |
| filename = os.path.basename(path) | |
| if not filename: | |
| # Generate a random name if we couldn't extract one | |
| import uuid | |
| filename = f"downloaded_{uuid.uuid4().hex[:8]}" | |
| # Create temporary file | |
| temp_dir = tempfile.gettempdir() | |
| filepath = os.path.join(temp_dir, filename) | |
| # Download the file | |
| response = requests.get(url, stream=True) | |
| response.raise_for_status() | |
| # Save the file | |
| with open(filepath, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return f"File downloaded to {filepath}. You can now process this file." | |
| except Exception as e: | |
| return f"Error downloading file: {str(e)}" | |
| def extract_text_from_image(image_path: str) -> str: | |
| """ | |
| Extracts text from an image using pytesseract OCR. | |
| Args: | |
| image_path: Path to the image file. | |
| Returns: | |
| A string with the extracted text or an error message. | |
| """ | |
| try: | |
| from PIL import Image | |
| import pytesseract | |
| # Load the image | |
| image = Image.open(image_path) | |
| # Perform OCR | |
| text = pytesseract.image_to_string(image) | |
| return f"Extracted text from image:\n\n{text.strip()}" | |
| except ImportError: | |
| return ( | |
| "Error: pytesseract or PIL is not installed. " | |
| "Install them with 'pip install pytesseract pillow' and ensure Tesseract OCR is installed." | |
| ) | |
| except FileNotFoundError: | |
| return f"Error: File not found at '{image_path}'." | |
| except Exception as e: | |
| return f"Unexpected error during OCR: {str(e)}" | |
| def transcribe_audio(audio_path: str) -> str: | |
| """ | |
| Transcribes speech from an audio file using Whisper (local). | |
| Args: | |
| audio_path: Path to the audio file (e.g., .mp3, .wav, .m4a). | |
| Returns: | |
| The transcribed text or an error message. | |
| """ | |
| try: | |
| import whisper | |
| if not os.path.exists(audio_path): | |
| return f"Error: File not found at '{audio_path}'." | |
| model = whisper.load_model("base") # You can use "small", "medium", "large" | |
| result = model.transcribe(audio_path) | |
| return result["text"].strip() | |
| except ImportError: | |
| return ( | |
| "Error: 'whisper' library is not installed. " | |
| "Install it using 'pip install openai-whisper'." | |
| ) | |
| except Exception as e: | |
| return f"Error during transcription: {str(e)}" | |
| level1_tools = [ | |
| multiply, | |
| add, | |
| subtract, | |
| divide, | |
| modulus, | |
| wikipedia_search, | |
| web_search, | |
| #search_tool, | |
| arvix_search, | |
| convert_units, | |
| query_table_data, | |
| #download_file_from_url, | |
| #save_and_read_file, | |
| #read_python_file, | |
| #extract_text_from_image, | |
| #transcribe_audio | |
| ] |