Spaces:
Runtime error
Runtime error
| import logging | |
| from fastapi import HTTPException | |
| from sqlalchemy.orm import Session | |
| from components.dbo.models.log import Log as LogSQL | |
| from schemas.log import LogCreateSchema, LogFilterSchema, LogSchema, PaginatedLogResponse | |
| logger = logging.getLogger(__name__) | |
| class LogService: | |
| """ | |
| Сервис для работы с параметрами LLM. | |
| """ | |
| def __init__(self, db: Session): | |
| logger.info("LogService initializing") | |
| self.db = db | |
| def create(self, log_schema: LogCreateSchema): | |
| logger.info("Creating a new log") | |
| with self.db() as session: | |
| new_log: LogSQL = LogSQL(**log_schema.model_dump()) | |
| session.add(new_log) | |
| session.commit() | |
| session.refresh(new_log) | |
| return LogSchema(**new_log.to_dict()) | |
| def get_list(self, filters: LogFilterSchema) -> PaginatedLogResponse: | |
| logger.info(f"Fetching logs with filters: {filters.model_dump(exclude_none=True)}") | |
| with self.db() as session: | |
| query = session.query(LogSQL).order_by(LogSQL.date_created.desc()) | |
| # Применение фильтра по user_name | |
| if filters.user_name: | |
| query = query.filter(LogSQL.user_name == filters.user_name) | |
| # Применение фильтра по chat_id (contains) | |
| if filters.chat_id: | |
| query = query.filter(LogSQL.chat_id.startswith(filters.chat_id)) | |
| # Применение фильтра по диапазону date_created | |
| if filters.date_from: | |
| query = query.filter(LogSQL.date_created >= filters.date_from) | |
| if filters.date_to: | |
| query = query.filter(LogSQL.date_created <= filters.date_to) | |
| # Сортировка | |
| if filters.sort: | |
| for sort_param in filters.sort: | |
| if sort_param.field == "date_created": | |
| if sort_param.direction == "asc": | |
| query = query.order_by(LogSQL.date_created.asc()) | |
| elif sort_param.direction == "desc": | |
| query = query.order_by(LogSQL.date_created.desc()) | |
| total = query.count() | |
| # Применение пагинации | |
| offset = (filters.page - 1) * filters.page_size | |
| logs = query.offset(offset).limit(filters.page_size).all() | |
| # Вычисление общего количества страниц | |
| total_pages = (total + filters.page_size - 1) // filters.page_size | |
| # Формирование ответа | |
| return PaginatedLogResponse( | |
| data=[LogSchema(**log.to_dict()) for log in logs], | |
| total=total, | |
| page=filters.page, | |
| page_size=filters.page_size, | |
| total_pages=total_pages | |
| ) | |
| def get_by_id(self, id: int) -> LogSchema: | |
| with self.db() as session: | |
| log: LogSQL = session.query(LogSQL).filter(LogSQL.id == id).first() | |
| if not log: | |
| raise HTTPException( | |
| status_code=400, detail=f"Item with id {id} not found" | |
| ) | |
| return LogSchema(**log.to_dict()) | |
| def update(self, id: int, new_log: LogSchema): | |
| logger.info("Updating log") | |
| with self.db() as session: | |
| log: LogSQL = session.query(LogSQL).filter(LogSQL.id == id).first() | |
| if not log: | |
| raise HTTPException( | |
| status_code=400, detail=f"Item with id {id} not found" | |
| ) | |
| update_data = new_log.model_dump(exclude_unset=True) | |
| for key, value in update_data.items(): | |
| if hasattr(log, key): | |
| setattr(log, key, value) | |
| session.commit() | |
| session.refresh(log) | |
| return log | |
| def delete(self, id: int): | |
| logger.info("Deleting log: {id}") | |
| with self.db() as session: | |
| log_to_del: LogSQL = session.query(LogSQL).get(id) | |
| session.delete(log_to_del) | |
| session.commit() | |