Spaces:
Paused
Paused
| import os | |
| import logging | |
| from pathlib import Path | |
| from typing import Optional | |
| from open_webui.models.functions import ( | |
| FunctionForm, | |
| FunctionModel, | |
| FunctionResponse, | |
| Functions, | |
| ) | |
| from open_webui.utils.plugin import load_function_module_by_id, replace_imports | |
| from open_webui.config import CACHE_DIR | |
| from open_webui.constants import ERROR_MESSAGES | |
| from fastapi import APIRouter, Depends, HTTPException, Request, status | |
| from open_webui.utils.auth import get_admin_user, get_verified_user | |
| from open_webui.env import SRC_LOG_LEVELS | |
| log = logging.getLogger(__name__) | |
| log.setLevel(SRC_LOG_LEVELS["MAIN"]) | |
| router = APIRouter() | |
| ############################ | |
| # GetFunctions | |
| ############################ | |
| async def get_functions(user=Depends(get_verified_user)): | |
| return Functions.get_functions() | |
| ############################ | |
| # ExportFunctions | |
| ############################ | |
| async def get_functions(user=Depends(get_admin_user)): | |
| return Functions.get_functions() | |
| ############################ | |
| # CreateNewFunction | |
| ############################ | |
| async def create_new_function( | |
| request: Request, form_data: FunctionForm, user=Depends(get_admin_user) | |
| ): | |
| if not form_data.id.isidentifier(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Only alphanumeric characters and underscores are allowed in the id", | |
| ) | |
| form_data.id = form_data.id.lower() | |
| function = Functions.get_function_by_id(form_data.id) | |
| if function is None: | |
| try: | |
| form_data.content = replace_imports(form_data.content) | |
| function_module, function_type, frontmatter = load_function_module_by_id( | |
| form_data.id, | |
| content=form_data.content, | |
| ) | |
| form_data.meta.manifest = frontmatter | |
| FUNCTIONS = request.app.state.FUNCTIONS | |
| FUNCTIONS[form_data.id] = function_module | |
| function = Functions.insert_new_function(user.id, function_type, form_data) | |
| function_cache_dir = CACHE_DIR / "functions" / form_data.id | |
| function_cache_dir.mkdir(parents=True, exist_ok=True) | |
| if function: | |
| return function | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error creating function"), | |
| ) | |
| except Exception as e: | |
| log.exception(f"Failed to create a new function: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT(e), | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.ID_TAKEN, | |
| ) | |
| ############################ | |
| # GetFunctionById | |
| ############################ | |
| async def get_function_by_id(id: str, user=Depends(get_admin_user)): | |
| function = Functions.get_function_by_id(id) | |
| if function: | |
| return function | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # ToggleFunctionById | |
| ############################ | |
| async def toggle_function_by_id(id: str, user=Depends(get_admin_user)): | |
| function = Functions.get_function_by_id(id) | |
| if function: | |
| function = Functions.update_function_by_id( | |
| id, {"is_active": not function.is_active} | |
| ) | |
| if function: | |
| return function | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error updating function"), | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # ToggleGlobalById | |
| ############################ | |
| async def toggle_global_by_id(id: str, user=Depends(get_admin_user)): | |
| function = Functions.get_function_by_id(id) | |
| if function: | |
| function = Functions.update_function_by_id( | |
| id, {"is_global": not function.is_global} | |
| ) | |
| if function: | |
| return function | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error updating function"), | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # UpdateFunctionById | |
| ############################ | |
| async def update_function_by_id( | |
| request: Request, id: str, form_data: FunctionForm, user=Depends(get_admin_user) | |
| ): | |
| try: | |
| form_data.content = replace_imports(form_data.content) | |
| function_module, function_type, frontmatter = load_function_module_by_id( | |
| id, content=form_data.content | |
| ) | |
| form_data.meta.manifest = frontmatter | |
| FUNCTIONS = request.app.state.FUNCTIONS | |
| FUNCTIONS[id] = function_module | |
| updated = {**form_data.model_dump(exclude={"id"}), "type": function_type} | |
| log.debug(updated) | |
| function = Functions.update_function_by_id(id, updated) | |
| if function: | |
| return function | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error updating function"), | |
| ) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT(e), | |
| ) | |
| ############################ | |
| # DeleteFunctionById | |
| ############################ | |
| async def delete_function_by_id( | |
| request: Request, id: str, user=Depends(get_admin_user) | |
| ): | |
| result = Functions.delete_function_by_id(id) | |
| if result: | |
| FUNCTIONS = request.app.state.FUNCTIONS | |
| if id in FUNCTIONS: | |
| del FUNCTIONS[id] | |
| return result | |
| ############################ | |
| # GetFunctionValves | |
| ############################ | |
| async def get_function_valves_by_id(id: str, user=Depends(get_admin_user)): | |
| function = Functions.get_function_by_id(id) | |
| if function: | |
| try: | |
| valves = Functions.get_function_valves_by_id(id) | |
| return valves | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT(e), | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # GetFunctionValvesSpec | |
| ############################ | |
| async def get_function_valves_spec_by_id( | |
| request: Request, id: str, user=Depends(get_admin_user) | |
| ): | |
| function = Functions.get_function_by_id(id) | |
| if function: | |
| if id in request.app.state.FUNCTIONS: | |
| function_module = request.app.state.FUNCTIONS[id] | |
| else: | |
| function_module, function_type, frontmatter = load_function_module_by_id(id) | |
| request.app.state.FUNCTIONS[id] = function_module | |
| if hasattr(function_module, "Valves"): | |
| Valves = function_module.Valves | |
| return Valves.schema() | |
| return None | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # UpdateFunctionValves | |
| ############################ | |
| async def update_function_valves_by_id( | |
| request: Request, id: str, form_data: dict, user=Depends(get_admin_user) | |
| ): | |
| function = Functions.get_function_by_id(id) | |
| if function: | |
| if id in request.app.state.FUNCTIONS: | |
| function_module = request.app.state.FUNCTIONS[id] | |
| else: | |
| function_module, function_type, frontmatter = load_function_module_by_id(id) | |
| request.app.state.FUNCTIONS[id] = function_module | |
| if hasattr(function_module, "Valves"): | |
| Valves = function_module.Valves | |
| try: | |
| form_data = {k: v for k, v in form_data.items() if v is not None} | |
| valves = Valves(**form_data) | |
| Functions.update_function_valves_by_id(id, valves.model_dump()) | |
| return valves.model_dump() | |
| except Exception as e: | |
| log.exception(f"Error updating function values by id {id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT(e), | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # FunctionUserValves | |
| ############################ | |
| async def get_function_user_valves_by_id(id: str, user=Depends(get_verified_user)): | |
| function = Functions.get_function_by_id(id) | |
| if function: | |
| try: | |
| user_valves = Functions.get_user_valves_by_id_and_user_id(id, user.id) | |
| return user_valves | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT(e), | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| async def get_function_user_valves_spec_by_id( | |
| request: Request, id: str, user=Depends(get_verified_user) | |
| ): | |
| function = Functions.get_function_by_id(id) | |
| if function: | |
| if id in request.app.state.FUNCTIONS: | |
| function_module = request.app.state.FUNCTIONS[id] | |
| else: | |
| function_module, function_type, frontmatter = load_function_module_by_id(id) | |
| request.app.state.FUNCTIONS[id] = function_module | |
| if hasattr(function_module, "UserValves"): | |
| UserValves = function_module.UserValves | |
| return UserValves.schema() | |
| return None | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| async def update_function_user_valves_by_id( | |
| request: Request, id: str, form_data: dict, user=Depends(get_verified_user) | |
| ): | |
| function = Functions.get_function_by_id(id) | |
| if function: | |
| if id in request.app.state.FUNCTIONS: | |
| function_module = request.app.state.FUNCTIONS[id] | |
| else: | |
| function_module, function_type, frontmatter = load_function_module_by_id(id) | |
| request.app.state.FUNCTIONS[id] = function_module | |
| if hasattr(function_module, "UserValves"): | |
| UserValves = function_module.UserValves | |
| try: | |
| form_data = {k: v for k, v in form_data.items() if v is not None} | |
| user_valves = UserValves(**form_data) | |
| Functions.update_user_valves_by_id_and_user_id( | |
| id, user.id, user_valves.model_dump() | |
| ) | |
| return user_valves.model_dump() | |
| except Exception as e: | |
| log.exception(f"Error updating function user valves by id {id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT(e), | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |