Spaces:
Runtime error
Runtime error
| import logging | |
| from typing import Annotated | |
| from fastapi import APIRouter, BackgroundTasks, HTTPException, Response, UploadFile, Depends | |
| from components.services.dataset import DatasetService | |
| from schemas.dataset import (Dataset, DatasetExpanded, DatasetProcessing, | |
| SortQuery, SortQueryList) | |
| import common.dependencies as DI | |
| router = APIRouter(prefix='/datasets') | |
| logger = logging.getLogger(__name__) | |
| async def get_datasets(dataset_service: Annotated[DatasetService, Depends(DI.get_dataset_service)]) -> list[Dataset]: | |
| logger.info("Handling GET request to /datasets") | |
| try: | |
| result = dataset_service.get_datasets() | |
| logger.info(f"Successfully retrieved {len(result)} datasets") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error retrieving datasets: {str(e)}") | |
| raise | |
| async def get_processing(dataset_service: Annotated[DatasetService, Depends(DI.get_dataset_service)]) -> DatasetProcessing: | |
| logger.info("Handling GET request to /datasets/processing") | |
| try: | |
| result = dataset_service.get_processing() | |
| logger.info(f"Successfully retrieved processing status: {result.status}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error retrieving processing status: {str(e)}") | |
| raise | |
| def try_create_default_dataset(dataset_service: DatasetService): | |
| """ | |
| Создаёт датасет по умолчанию, если такого нет. | |
| """ | |
| if not dataset_service.get_default_dataset(): | |
| print('creating default dataset') | |
| if dataset_service.config.db_config.files.empty_start: | |
| dataset_service.create_empty_dataset(is_default=True) | |
| else: | |
| dataset_service.create_dataset_from_directory( | |
| is_default=True, | |
| directory_with_xmls=dataset_service.config.db_config.files.xmls_path_default, | |
| directory_with_ready_dataset=dataset_service.config.db_config.files.start_path, | |
| ) | |
| async def try_init_default_dataset(dataset_service: Annotated[DatasetService, Depends(DI.get_dataset_service)]): | |
| logger.info(f"Handling GET request try_init_default_dataset") | |
| try_create_default_dataset(dataset_service) | |
| try: | |
| return {"ok": True} | |
| except Exception as e: | |
| logger.error(f"Error creating default dataset: {str(e)}") | |
| raise | |
| async def get_dataset( | |
| dataset_id: int, | |
| dataset_service: Annotated[DatasetService, Depends(DI.get_dataset_service)], | |
| page: int = 1, | |
| page_size: int = 20, | |
| search: str = '', | |
| sort: str = '' | |
| ) -> DatasetExpanded: | |
| logger.info(f"Handling GET request to /datasets/{dataset_id} (page={page}, size={page_size}, search='{search}')") | |
| if sort: | |
| try: | |
| sorts = [] | |
| for one in sort.split(','): | |
| field, direction = one.split(':') | |
| sorts.append(SortQuery(field=field, direction=direction)) | |
| sort = SortQueryList(sorts=sorts) | |
| except ValueError: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid sort format. Expected format: 'field:direction,field:direction'", | |
| ) | |
| try: | |
| result = dataset_service.get_dataset( | |
| dataset_id, | |
| page=page, | |
| page_size=page_size, | |
| search=search, | |
| sort=sort, | |
| ) | |
| logger.info(f"Successfully retrieved dataset {dataset_id}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error retrieving dataset {dataset_id}: {str(e)}") | |
| raise e | |
| async def create_draft(parent_id: int, dataset_service: Annotated[DatasetService, Depends(DI.get_dataset_service)]) -> Dataset: | |
| logger.info(f"Handling POST request to /datasets/{parent_id}/edit") | |
| try: | |
| result = dataset_service.create_draft(parent_id) | |
| logger.info(f"Successfully created draft from dataset {parent_id}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error creating draft from dataset {parent_id}: {str(e)}") | |
| raise e | |
| async def make_active(dataset_id: int, dataset_service: Annotated[DatasetService, Depends(DI.get_dataset_service)], background_tasks: BackgroundTasks) -> DatasetExpanded: | |
| logger.info(f"Handling POST request to /datasets/{dataset_id}/activate") | |
| try: | |
| result = dataset_service.activate_dataset(dataset_id, background_tasks) | |
| logger.info(f"Successfully activated dataset {dataset_id}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error activating dataset {dataset_id}: {str(e)}") | |
| raise e | |
| async def delete_dataset(dataset_id: int, dataset_service: Annotated[DatasetService, Depends(DI.get_dataset_service)]) -> None: | |
| logger.info(f"Handling DELETE request to /datasets/{dataset_id}") | |
| try: | |
| dataset_service.delete_dataset(dataset_id) | |
| logger.info(f"Successfully deleted dataset {dataset_id}") | |
| return Response(status_code=200) | |
| except Exception as e: | |
| logger.error(f"Error deleting dataset {dataset_id}: {str(e)}") | |
| raise e | |
| async def upload_zip(file: UploadFile, dataset_service: Annotated[DatasetService, Depends(DI.get_dataset_service)]) -> DatasetExpanded: | |
| logger.info(f"Handling POST request to /datasets/upload with file {file.filename}") | |
| try: | |
| result = dataset_service.upload_zip(file) | |
| logger.info("Successfully uploaded and processed dataset") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error uploading dataset: {str(e)}") | |
| raise e | |