Spaces:
Runtime error
Runtime error
| import fastapi | |
| from typing import Optional, Dict, Any | |
| import copy | |
| import requests | |
| import json | |
| import os | |
| import sys | |
| from io import StringIO | |
| import ctypes | |
| import subprocess | |
| import logging | |
| from pathlib import Path | |
| from llama_cpp import Llama | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import random | |
| import time | |
| import inspect | |
| # Load model directly | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| tokenizer = AutoTokenizer.from_pretrained("meetkai/functionary-small-v3.2-GGUF", trust_remote_code=True) | |
| model_path = AutoModelForCausalLM.from_pretrained("sentence-transformers/all-MiniLM-L6-v2", trust_remote_code=True) | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| class Tool(fastapi.FastAPI): | |
| def __init__( | |
| self, | |
| tool_name: str, | |
| description: str, | |
| name_for_human: Optional[str] = None, | |
| name_for_model: Optional[str] = None, | |
| description_for_human: Optional[str] = None, | |
| description_for_model: Optional[str] = None, | |
| logo_url: Optional[str] = None, | |
| author_github: Optional[str] = None, | |
| contact_email: str = "", | |
| legal_info_url: str = "", | |
| version: str = "0.1.0", | |
| ): | |
| super().__init__( | |
| title=tool_name, | |
| description=description, | |
| version=version, | |
| ) | |
| if name_for_human is None: | |
| name_for_human = tool_name | |
| if name_for_model is None: | |
| name_for_model = name_for_human | |
| if description_for_human is None: | |
| description_for_human = description | |
| if description_for_model is None: | |
| description_for_model = description_for_human | |
| self.api_info = { | |
| "schema_version": "v1", | |
| "name_for_human": name_for_human, | |
| "name_for_model": name_for_model, | |
| "description_for_human": description_for_human, | |
| "description_for_model": description_for_model, | |
| "auth": { | |
| "type": "none", | |
| }, | |
| "api": { | |
| "type": "openapi", | |
| "url": "/openapi.json", | |
| "is_user_authenticated": False, | |
| }, | |
| "author_github": author_github, | |
| "logo_url": logo_url, | |
| "contact_email": contact_email, | |
| "legal_info_url": legal_info_url, | |
| } | |
| def get_api_info(request: fastapi.Request): | |
| openapi_path = str(request.url).replace("/.well-known/ai-plugin.json", "/openapi.json") | |
| info = copy.deepcopy(self.api_info) | |
| info["api"]["url"] = str(openapi_path) | |
| return info | |
| class SelfLearningTool: | |
| def __init__(self): | |
| self.tools = {} | |
| def add_tool(self, name: str, func: callable): | |
| self.tools[name] = func | |
| def use_tool(self, name: str, *args, **kwargs): | |
| if name in self.tools: | |
| return self.tools[name](*args, **kwargs) | |
| else: | |
| return f"Tool '{name}' not found." | |
| def list_tools(self): | |
| return list(self.tools.keys()) | |
| def remove_tool(self, name: str): | |
| if name in self.tools: | |
| del self.tools[name] | |
| return f"Tool '{name}' removed successfully." | |
| else: | |
| return f"Tool '{name}' not found." | |
| class PythonREPL: | |
| def __init__(self): | |
| self.globals = {} | |
| self.locals = {} | |
| self.output_buffer = StringIO() | |
| self.self_learning_tool = SelfLearningTool() | |
| def run(self, command: str) -> str: | |
| old_stdout = sys.stdout | |
| sys.stdout = self.output_buffer | |
| try: | |
| exec(command, self.globals, self.locals) | |
| output = self.output_buffer.getvalue() | |
| except Exception as e: | |
| output = f"Error: {repr(e)}" | |
| finally: | |
| sys.stdout = old_stdout | |
| self.output_buffer.truncate(0) | |
| self.output_buffer.seek(0) | |
| return output | |
| def add_tool(self, name: str, func: callable): | |
| self.self_learning_tool.add_tool(name, func) | |
| def use_tool(self, name: str, *args, **kwargs): | |
| return self.self_learning_tool.use_tool(name, *args, **kwargs) | |
| def list_tools(self): | |
| return self.self_learning_tool.list_tools() | |
| def remove_tool(self, name: str): | |
| return self.self_learning_tool.remove_tool(name) | |
| def self_reflect(self): | |
| reflection = "Self-reflection:\n" | |
| reflection += f"Number of defined variables: {len(self.locals)}\n" | |
| reflection += f"Number of available tools: {len(self.list_tools())}\n" | |
| reflection += "Available tools:\n" | |
| for tool in self.list_tools(): | |
| reflection += f"- {tool}\n" | |
| return reflection | |
| def self_inspect(self): | |
| inspection = "Self-inspection:\n" | |
| for name, value in self.locals.items(): | |
| inspection += f"{name}: {type(value)}\n" | |
| if callable(value): | |
| try: | |
| signature = inspect.signature(value) | |
| inspection += f" Signature: {signature}\n" | |
| except ValueError: | |
| inspection += " Signature: Unable to inspect\n" | |
| return inspection | |
| def initialize_llm(model_path: str, n_ctx: int, n_threads: int = 4, n_batch: int = 512) -> Llama: | |
| try: | |
| return Llama(model_path=model_path, n_ctx=n_ctx, n_threads=n_threads, n_batch=n_batch, verbose=True) | |
| except Exception as e: | |
| logger.error(f"Failed to initialize LLM: {e}") | |
| raise | |
| llm = initialize_llm(model_path, 4096) | |
| def build_tool(config) -> Tool: | |
| tool = Tool( | |
| "Advanced Python REPL", | |
| "Execute sophisticated Python commands with self-learning capabilities", | |
| name_for_model="Advanced Python REPL", | |
| description_for_model=( | |
| "An advanced Python shell for executing complex Python commands. " | |
| "Input should be a valid Python command or script. " | |
| "Use print(...) to see the output of expressions. " | |
| "Capable of handling multi-line code, advanced Python features, " | |
| "and self-learning tools." | |
| ), | |
| logo_url="https://your-app-url.com/.well-known/logo.png", | |
| contact_email="hello@contact.com", | |
| legal_info_url="hello@legal.com" | |
| ) | |
| python_repl = PythonREPL() | |
| def sanitize_input(query: str) -> str: | |
| return query.strip().strip("```").strip() | |
| def run_python(query: str): | |
| sanitized_query = sanitize_input(query) | |
| result = python_repl.run(sanitized_query) | |
| return {"result": result, "execution_time": time.time()} | |
| def add_tool(name: str, code: str): | |
| sanitized_code = sanitize_input(code) | |
| try: | |
| exec(f"def {name}({sanitized_code})", python_repl.globals, python_repl.locals) | |
| python_repl.add_tool(name, python_repl.locals[name]) | |
| return f"Tool '{name}' added successfully." | |
| except Exception as e: | |
| return f"Error adding tool: {str(e)}" | |
| def use_tool(name: str, args: str): | |
| try: | |
| result = python_repl.use_tool(name, *eval(args)) | |
| return {"result": result} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def list_tools(): | |
| return {"tools": python_repl.list_tools()} | |
| def remove_tool(name: str): | |
| return {"result": python_repl.remove_tool(name)} | |
| def self_reflect(): | |
| return {"reflection": python_repl.self_reflect()} | |
| def self_inspect(): | |
| return {"inspection": python_repl.self_inspect()} | |
| def write_file(file_path: str, text: str) -> str: | |
| write_path = Path(file_path) | |
| try: | |
| write_path.parent.mkdir(exist_ok=True, parents=False) | |
| with write_path.open("w", encoding="utf-8") as f: | |
| f.write(text) | |
| return f"File written successfully to {file_path}." | |
| except Exception as e: | |
| return "Error: " + str(e) | |
| def read_file(file_path: str) -> str: | |
| read_path = Path(file_path) | |
| try: | |
| with read_path.open("r", encoding="utf-8") as f: | |
| content = f.read() | |
| return content | |
| except Exception as e: | |
| return "Error: " + str(e) | |
| return tool | |
| if __name__ == "__main__": | |
| config = {} # Add any necessary configuration | |
| advanced_python_repl = build_tool(config) | |
| # Run the FastAPI server | |
| import uvicorn | |
| uvicorn.run(advanced_python_repl, host="0.0.0.0", port=8000) | |