| # ------------------------------------------------------------------- | |
| # This source file is available under the terms of the | |
| # Pimcore Open Core License (POCL) | |
| # Full copyright and license information is available in | |
| # LICENSE.md which is distributed with this source code. | |
| # | |
| # @copyright Copyright (c) Pimcore GmbH (https://www.pimcore.com) | |
| # @license Pimcore Open Core License (POCL) | |
| # ------------------------------------------------------------------- | |
| import os | |
| import torch | |
| #from .training_status import Status | |
| #from .environment_variable_checker import EnvironmentVariableChecker | |
| #from .training_manager import TrainingManager | |
| #from .image_classification.image_classification_trainer import ImageClassificationTrainer | |
| #from .image_classification.image_classification_parameters import ImageClassificationParameters, map_image_classification_training_parameters, ImageClassificationTrainingParameters | |
| #from .text_classification.text_classification_trainer import TextClassificationTrainer | |
| #from .text_classification.text_classification_parameters import TextClassificationParameters, map_text_classification_training_parameters, TextClassificationTrainingParameters | |
| from fastapi import FastAPI, Depends, HTTPException, UploadFile, Form, File, status | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from pydantic import BaseModel | |
| from typing import Annotated | |
| import logging | |
| from pathlib import Path | |
| import tempfile | |
| import sys | |
| from transformers import pipeline | |
| app = FastAPI( | |
| title="Pimcore Local Inference Service", | |
| description="This services allows HF inference provider compatible inference to models which are not available at HF inference providers.", | |
| version="1.0.0" | |
| ) | |
| #environmentVariableChecker = EnvironmentVariableChecker() | |
| #environmentVariableChecker.validate_environment_variables() | |
| logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s') | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.DEBUG) | |
| class StreamToLogger(object): | |
| def __init__(self, logger, log_level): | |
| self.logger = logger | |
| self.log_level = log_level | |
| self.linebuf = '' | |
| def write(self, buf): | |
| for line in buf.rstrip().splitlines(): | |
| self.logger.log(self.log_level, line.rstrip()) | |
| def flush(self): | |
| pass | |
| sys.stdout = StreamToLogger(logger, logging.INFO) | |
| sys.stderr = StreamToLogger(logger, logging.ERROR) | |
| #classification_trainer: TrainingManager = TrainingManager() | |
| class ResponseModel(BaseModel): | |
| """ Default response model for endpoints. """ | |
| message: str | |
| success: bool = True | |
| # =========================================== | |
| # Security Check | |
| # =========================================== | |
| # security = HTTPBearer() | |
| # def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
| # """Verify the token provided by the user.""" | |
| # token = environmentVariableChecker.get_authentication_token() | |
| # if credentials.credentials != token: | |
| # raise HTTPException( | |
| # status_code=status.HTTP_401_UNAUTHORIZED, | |
| # detail="Invalid token", | |
| # headers={"WWW-Authenticate": "Bearer"}, | |
| # ) | |
| # return {"token": credentials.credentials} | |
| # =========================================== | |
| # Training Status Endpoints | |
| # =========================================== | |
| # @app.get("/get_training_status") | |
| # async def get_task_status(token_data: dict = Depends(verify_token)): | |
| # """ Get the status of the currently running training (if any). """ | |
| # status = classification_trainer.get_task_status() | |
| # return { | |
| # "project": status.get_project_name(), | |
| # "progress": status.get_progress(), | |
| # "task": status.get_task(), | |
| # "status": status.get_status().value | |
| # } | |
| # @app.put("/stop_training") | |
| # async def stop_task(token_data: dict = Depends(verify_token)): | |
| # """ Stop the currently running training (if any). """ | |
| # try: | |
| # status = classification_trainer.get_task_status() | |
| # classification_trainer.stop_task() | |
| # return ResponseModel(message=f"Training stopped for `{ status.get_project_name() }`") | |
| # except Exception as e: | |
| # raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
| async def gpu_check(): | |
| """ Check if a GPU is available """ | |
| gpu = 'GPU not available' | |
| if torch.cuda.is_available(): | |
| gpu = 'GPU is available' | |
| print("GPU is available") | |
| else: | |
| print("GPU is not available") | |
| return {'success': True, 'gpu': gpu} | |
| from fastapi import Body | |
| from typing import Optional | |
| class TranslationRequest(BaseModel): | |
| inputs: str | |
| parameters: Optional[dict] = None | |
| async def translation( | |
| model_name: str, | |
| body: TranslationRequest = Body( | |
| ..., | |
| example={ | |
| "inputs": "I am a car", | |
| "parameters": { | |
| "repetition_penalty": 1.6, | |
| } | |
| } | |
| ) | |
| ): | |
| """ | |
| Execute translation tasks. | |
| Args: | |
| model_name (str): The HuggingFace model name to use for translation. | |
| body (TranslationRequest): The request payload containing translation parameters. | |
| Returns: | |
| list: The translation result(s) as returned by the pipeline. | |
| """ | |
| try: | |
| pipe = pipeline("translation", model=model_name) | |
| except Exception as e: | |
| logger.error(f"Failed to load model '{model_name}': {str(e)}") | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"Model '{model_name}' could not be loaded: {str(e)}" | |
| ) | |
| try: | |
| result = pipe(body.inputs, **(body.parameters or {})) | |
| except Exception as e: | |
| logger.error(f"Inference failed for model '{model_name}': {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Inference failed: {str(e)}" | |
| ) | |
| return result | |
| # =========================================== | |
| # Fine-Tuning Image Classification | |
| # =========================================== | |
| # @app.post( | |
| # "/training/image_classification", | |
| # response_model=ResponseModel | |
| # ) | |
| # async def image_classification( | |
| # training_params: Annotated[ImageClassificationTrainingParameters, Depends(map_image_classification_training_parameters)], | |
| # training_data_zip: Annotated[UploadFile, File(description="The ZIP file containing the training data, with a folder per class which contains images belonging to that class.")], | |
| # token_data: dict = Depends(verify_token), | |
| # project_name: str = Form(description="The name of the project. Will also be used as name of resulting model that will be created after fine tuning and as the name of the repository at huggingface."), | |
| # source_model_name: str = Form('google/vit-base-patch16-224-in21k', description="The source model to be used as basis for fine tuning."), | |
| # ): | |
| # """ | |
| # Start fine tuning an image classification model with the provided data. | |
| # """ | |
| # # check if training is running, if so then exit | |
| # status = classification_trainer.get_task_status() | |
| # if status.get_status() == Status.IN_PROGRESS or status.get_status() == Status.CANCELLING: | |
| # raise HTTPException(status_code=405, detail="Training is already in progress.") | |
| # # Ensure the uploaded file is a ZIP file | |
| # if not training_data_zip.filename.endswith(".zip"): | |
| # raise HTTPException(status_code=422, detail="Uploaded file is not a zip file.") | |
| # try: | |
| # # Create a temporary directory to extract the contents | |
| # tmp_path = os.path.join(tempfile.gettempdir(), 'training_data') | |
| # path = Path(tmp_path) | |
| # path.mkdir(parents=True, exist_ok=True) | |
| # contents = await training_data_zip.read() | |
| # zip_path = os.path.join(tmp_path, 'image_classification_data.zip') | |
| # with open(zip_path, 'wb') as temp_file: | |
| # temp_file.write(contents) | |
| # # prepare parameters | |
| # parameters = ImageClassificationParameters( | |
| # training_files_path=tmp_path, | |
| # training_zip_file_path=zip_path, | |
| # project_name=project_name, | |
| # source_model_name=source_model_name, | |
| # training_parameters=training_params | |
| # ) | |
| # # start training | |
| # await classification_trainer.start_training(ImageClassificationTrainer(), parameters) | |
| # return ResponseModel(message="Training started.") | |
| # except Exception as e: | |
| # raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
| # =========================================== | |
| # Fine-Tuning Text Classification | |
| # =========================================== | |
| # @app.post( | |
| # "/training/text_classification", | |
| # response_model=ResponseModel | |
| # ) | |
| # async def text_classificaiton( | |
| # training_params: Annotated[TextClassificationTrainingParameters, Depends(map_text_classification_training_parameters)], | |
| # training_data_csv: Annotated[UploadFile, File(description="The CSV file containing the training data, necessary columns `value` (text data) and `target` (classification).")], | |
| # token_data: dict = Depends(verify_token), | |
| # project_name: str = Form(description="The name of the project. Will also be used as name of resulting model that will be created after fine tuning and as the name of the repository at huggingface."), | |
| # training_csv_limiter: str = Form(';', description="The delimiter used in the CSV file."), | |
| # source_model_name: str = Form('distilbert/distilbert-base-uncased'), | |
| # ): | |
| # """Start fine tuning an text classification model with the provided data.""" | |
| # # check if training is running, if so then exit | |
| # status = classification_trainer.get_task_status() | |
| # if status.get_status() == Status.IN_PROGRESS or status.get_status() == Status.CANCELLING: | |
| # raise HTTPException(status_code=405, detail="Training is already in progress") | |
| # # Ensure the uploaded file is a CSV file | |
| # if not training_data_csv.filename.endswith(".csv"): | |
| # raise HTTPException(status_code=422, detail="Uploaded file is not a csv file.") | |
| # try: | |
| # # Create a temporary directory to extract the contents | |
| # tmp_path = os.path.join(tempfile.gettempdir(), 'training_data') | |
| # path = Path(tmp_path) | |
| # path.mkdir(parents=True, exist_ok=True) | |
| # contents = await training_data_csv.read() | |
| # csv_path = os.path.join(tmp_path, 'data.csv') | |
| # with open(csv_path, 'wb') as temp_file: | |
| # temp_file.write(contents) | |
| # # prepare parameters | |
| # parameters = TextClassificationParameters( | |
| # training_csv_file_path=csv_path, | |
| # training_csv_limiter=training_csv_limiter, | |
| # project_name=project_name, | |
| # source_model_name=source_model_name, | |
| # training_parameters=training_params | |
| # ) | |
| # # start training | |
| # await classification_trainer.start_training(TextClassificationTrainer(), parameters) | |
| # return ResponseModel(message="Training started.") | |
| # except Exception as e: | |
| # raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |