Spaces:
Runtime error
Runtime error
| import logging | |
| from typing import Callable, Optional | |
| from uuid import UUID | |
| import numpy as np | |
| from ntr_fileparser import ParsedDocument | |
| from ntr_text_fragmentation import Destructurer, InjectionBuilder, LinkerEntity | |
| from common.configuration import Configuration | |
| from components.dbo.chunk_repository import ChunkRepository | |
| from components.embedding_extraction import EmbeddingExtractor | |
| from components.nmd.faiss_vector_search import FaissVectorSearch | |
| logger = logging.getLogger(__name__) | |
| class EntityService: | |
| """ | |
| Сервис для работы с сущностями. | |
| Объединяет функциональность chunk_repository, destructurer, injection_builder и faiss_vector_search. | |
| """ | |
| def __init__( | |
| self, | |
| vectorizer: EmbeddingExtractor, | |
| chunk_repository: ChunkRepository, | |
| config: Configuration, | |
| ) -> None: | |
| """ | |
| Инициализация сервиса. | |
| Args: | |
| vectorizer: Модель для извлечения эмбеддингов | |
| chunk_repository: Репозиторий для работы с чанками | |
| config: Конфигурация приложения | |
| """ | |
| self.vectorizer = vectorizer | |
| self.config = config | |
| self.chunk_repository = chunk_repository | |
| self.faiss_search = None # Инициализируется при необходимости | |
| self.current_dataset_id = None # Текущий dataset_id | |
| def _ensure_faiss_initialized(self, dataset_id: int) -> None: | |
| """ | |
| Проверяет и при необходимости инициализирует или обновляет FAISS индекс. | |
| Args: | |
| dataset_id: ID датасета для инициализации | |
| """ | |
| # Если индекс не инициализирован или датасет изменился | |
| if self.faiss_search is None or self.current_dataset_id != dataset_id: | |
| logger.info(f'Initializing FAISS for dataset {dataset_id}') | |
| entities, embeddings = self.chunk_repository.get_searching_entities(dataset_id) | |
| if entities: | |
| # Создаем словарь только из не-None эмбеддингов | |
| embeddings_dict = { | |
| str(entity.id): embedding # Преобразуем UUID в строку для ключа | |
| for entity, embedding in zip(entities, embeddings) | |
| if embedding is not None | |
| } | |
| if embeddings_dict: # Проверяем, что есть хотя бы один эмбеддинг | |
| self.faiss_search = FaissVectorSearch( | |
| self.vectorizer, | |
| embeddings_dict, | |
| self.config.db_config, | |
| ) | |
| self.current_dataset_id = dataset_id | |
| logger.info(f'FAISS initialized for dataset {dataset_id} with {len(embeddings_dict)} embeddings') | |
| else: | |
| logger.warning(f'No valid embeddings found for dataset {dataset_id}') | |
| self.faiss_search = None | |
| self.current_dataset_id = None | |
| else: | |
| logger.warning(f'No entities found for dataset {dataset_id}') | |
| self.faiss_search = None | |
| self.current_dataset_id = None | |
| def process_document( | |
| self, | |
| document: ParsedDocument, | |
| dataset_id: int, | |
| progress_callback: Optional[Callable] = None, | |
| **destructurer_kwargs, | |
| ) -> None: | |
| """ | |
| Обработка документа: разбиение на чанки и сохранение в базу. | |
| Args: | |
| document: Документ для обработки | |
| dataset_id: ID датасета | |
| progress_callback: Функция для отслеживания прогресса | |
| **destructurer_kwargs: Дополнительные параметры для Destructurer | |
| """ | |
| logger.info(f"Processing document {document.name} for dataset {dataset_id}") | |
| # Создаем деструктуризатор с параметрами по умолчанию | |
| destructurer = Destructurer( | |
| document, | |
| strategy_name="fixed_size", | |
| process_tables=True, | |
| **{ | |
| "words_per_chunk": 50, | |
| "overlap_words": 25, | |
| "respect_sentence_boundaries": True, | |
| **destructurer_kwargs, | |
| } | |
| ) | |
| # Получаем сущности | |
| entities = destructurer.destructure() | |
| # Фильтруем сущности для поиска | |
| filtering_entities = [entity for entity in entities if entity.in_search_text is not None] | |
| filtering_texts = [entity.in_search_text for entity in filtering_entities] | |
| # Получаем эмбеддинги с поддержкой callback | |
| embeddings = self.vectorizer.vectorize(filtering_texts, progress_callback) | |
| embeddings_dict = { | |
| str(entity.id): embedding # Преобразуем UUID в строку для ключа | |
| for entity, embedding in zip(filtering_entities, embeddings) | |
| } | |
| # Сохраняем в базу | |
| self.chunk_repository.add_entities(entities, dataset_id, embeddings_dict) | |
| # Переинициализируем FAISS индекс, если это текущий датасет | |
| if self.current_dataset_id == dataset_id: | |
| self._ensure_faiss_initialized(dataset_id) | |
| logger.info(f"Added {len(entities)} entities to dataset {dataset_id}") | |
| def build_text( | |
| self, | |
| entities: list[LinkerEntity], | |
| chunk_scores: Optional[list[float]] = None, | |
| include_tables: bool = True, | |
| max_documents: Optional[int] = None, | |
| ) -> str: | |
| """ | |
| Сборка текста из сущностей. | |
| Args: | |
| entities: Список сущностей | |
| chunk_scores: Список весов чанков | |
| include_tables: Флаг включения таблиц | |
| max_documents: Максимальное количество документов | |
| Returns: | |
| Собранный текст | |
| """ | |
| logger.info(f"Building text for {len(entities)} entities") | |
| if chunk_scores is not None: | |
| chunk_scores = {entity.id: score for entity, score in zip(entities, chunk_scores)} | |
| builder = InjectionBuilder(self.chunk_repository) | |
| return builder.build( | |
| [entity.id for entity in entities], # Передаем UUID напрямую | |
| chunk_scores=chunk_scores, | |
| include_tables=include_tables, | |
| max_documents=max_documents, | |
| ) | |
| def search_similar( | |
| self, | |
| query: str, | |
| dataset_id: int, | |
| ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| """ | |
| Поиск похожих сущностей. | |
| Args: | |
| query: Текст запроса | |
| dataset_id: ID датасета | |
| Returns: | |
| tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| - Вектор запроса | |
| - Оценки сходства | |
| - Идентификаторы найденных сущностей | |
| """ | |
| # Убеждаемся, что FAISS инициализирован для текущего датасета | |
| self._ensure_faiss_initialized(dataset_id) | |
| if self.faiss_search is None: | |
| return np.array([]), np.array([]), np.array([]) | |
| # Выполняем поиск | |
| return self.faiss_search.search_vectors(query) | |
| def add_neighboring_chunks( | |
| self, | |
| entities: list[LinkerEntity], | |
| max_distance: int = 1, | |
| ) -> list[LinkerEntity]: | |
| """ | |
| Добавление соседних чанков. | |
| Args: | |
| entities: Список сущностей | |
| max_distance: Максимальное расстояние для поиска соседей | |
| Returns: | |
| Расширенный список сущностей | |
| """ | |
| # Убедимся, что все ID представлены в UUID формате | |
| for entity in entities: | |
| if not isinstance(entity.id, UUID): | |
| entity.id = UUID(str(entity.id)) | |
| builder = InjectionBuilder(self.chunk_repository) | |
| return builder.add_neighboring_chunks(entities, max_distance) |