Spaces:
Paused
Paused
| from typing import Optional | |
| from open_webui.models.models import ( | |
| ModelForm, | |
| ModelModel, | |
| ModelResponse, | |
| ModelUserResponse, | |
| Models, | |
| ) | |
| from open_webui.constants import ERROR_MESSAGES | |
| from fastapi import APIRouter, Depends, HTTPException, Request, status | |
| from open_webui.utils.auth import get_admin_user, get_verified_user | |
| from open_webui.utils.access_control import has_access, has_permission | |
| router = APIRouter() | |
| ########################### | |
| # GetModels | |
| ########################### | |
| async def get_models(id: Optional[str] = None, user=Depends(get_verified_user)): | |
| if user.role == "admin": | |
| return Models.get_models() | |
| else: | |
| return Models.get_models_by_user_id(user.id) | |
| ########################### | |
| # GetBaseModels | |
| ########################### | |
| async def get_base_models(user=Depends(get_admin_user)): | |
| return Models.get_base_models() | |
| ############################ | |
| # CreateNewModel | |
| ############################ | |
| async def create_new_model( | |
| request: Request, | |
| form_data: ModelForm, | |
| user=Depends(get_verified_user), | |
| ): | |
| if user.role != "admin" and not has_permission( | |
| user.id, "workspace.models", request.app.state.config.USER_PERMISSIONS | |
| ): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.UNAUTHORIZED, | |
| ) | |
| model = Models.get_model_by_id(form_data.id) | |
| if model: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.MODEL_ID_TAKEN, | |
| ) | |
| else: | |
| model = Models.insert_new_model(form_data, user.id) | |
| if model: | |
| return model | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.DEFAULT(), | |
| ) | |
| ########################### | |
| # GetModelById | |
| ########################### | |
| # Note: We're not using the typical url path param here, but instead using a query parameter to allow '/' in the id | |
| async def get_model_by_id(id: str, user=Depends(get_verified_user)): | |
| model = Models.get_model_by_id(id) | |
| if model: | |
| if ( | |
| user.role == "admin" | |
| or model.user_id == user.id | |
| or has_access(user.id, "read", model.access_control) | |
| ): | |
| return model | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # ToggelModelById | |
| ############################ | |
| async def toggle_model_by_id(id: str, user=Depends(get_verified_user)): | |
| model = Models.get_model_by_id(id) | |
| if model: | |
| if ( | |
| user.role == "admin" | |
| or model.user_id == user.id | |
| or has_access(user.id, "write", model.access_control) | |
| ): | |
| model = Models.toggle_model_by_id(id) | |
| if model: | |
| return model | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error updating function"), | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.UNAUTHORIZED, | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # UpdateModelById | |
| ############################ | |
| async def update_model_by_id( | |
| id: str, | |
| form_data: ModelForm, | |
| user=Depends(get_verified_user), | |
| ): | |
| model = Models.get_model_by_id(id) | |
| if not model: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| if ( | |
| model.user_id != user.id | |
| and not has_access(user.id, "write", model.access_control) | |
| and user.role != "admin" | |
| ): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.ACCESS_PROHIBITED, | |
| ) | |
| model = Models.update_model_by_id(id, form_data) | |
| return model | |
| ############################ | |
| # DeleteModelById | |
| ############################ | |
| async def delete_model_by_id(id: str, user=Depends(get_verified_user)): | |
| model = Models.get_model_by_id(id) | |
| if not model: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| if ( | |
| user.role != "admin" | |
| and model.user_id != user.id | |
| and not has_access(user.id, "write", model.access_control) | |
| ): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=ERROR_MESSAGES.UNAUTHORIZED, | |
| ) | |
| result = Models.delete_model_by_id(id) | |
| return result | |
| async def delete_all_models(user=Depends(get_admin_user)): | |
| result = Models.delete_all_models() | |
| return result | |