Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python | |
| """ | |
| Скрипт для визуального тестирования процесса чанкинга и сборки документа. | |
| Этот скрипт: | |
| 1. Считывает test_input/test.docx с помощью UniversalParser | |
| 2. Чанкит документ через Destructurer с fixed_size-стратегией | |
| 3. Сохраняет результат чанкинга в test_output/test.csv | |
| 4. Выбирает 20-30 случайных чанков из CSV | |
| 5. Создает InjectionBuilder с InMemoryEntityRepository | |
| 6. Собирает текст из выбранных чанков | |
| 7. Сохраняет результат в test_output/test_builded.txt | |
| """ | |
| import json | |
| import logging | |
| import os | |
| import random | |
| from pathlib import Path | |
| from typing import List | |
| from uuid import UUID | |
| import pandas as pd | |
| from ntr_fileparser import UniversalParser | |
| from ntr_text_fragmentation import (DocumentAsEntity, EntitiesExtractor, | |
| InjectionBuilder, InMemoryEntityRepository, | |
| LinkerEntity) | |
| def setup_logging() -> None: | |
| """Настройка логгирования.""" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(levelname)s - [%(pathname)s:%(lineno)d] - %(message)s", | |
| ) | |
| def ensure_directories() -> None: | |
| """Проверка наличия необходимых директорий.""" | |
| for directory in ["test_input", "test_output"]: | |
| Path(directory).mkdir(parents=True, exist_ok=True) | |
| def save_entities_to_csv(entities: List[LinkerEntity], csv_path: str) -> None: | |
| """ | |
| Сохраняет сущности в CSV файл. | |
| Args: | |
| entities: Список сущностей | |
| csv_path: Путь для сохранения CSV файла | |
| """ | |
| data = [] | |
| for entity in entities: | |
| # Базовые поля для всех типов сущностей | |
| entity_dict = { | |
| "id": str(entity.id), | |
| "type": entity.type, | |
| "name": entity.name, | |
| "text": entity.text, | |
| "metadata": json.dumps(entity.metadata or {}, ensure_ascii=False), | |
| "in_search_text": entity.in_search_text, | |
| "source_id": str(entity.source_id) if entity.source_id else None, | |
| "target_id": str(entity.target_id) if entity.target_id else None, | |
| "number_in_relation": entity.number_in_relation, | |
| "groupper": entity.groupper, | |
| "type": entity.type, | |
| } | |
| # Дополнительные поля специфичные для подклассов (если они есть в __dict__) | |
| # Это не самый надежный способ, но для скрипта визуализации может подойти | |
| # Сериализация LinkerEntity теперь должна сама класть доп поля в metadata | |
| # for key, value in entity.__dict__.items(): | |
| # if key not in entity_dict and not key.startswith('_'): | |
| # entity_dict[key] = value | |
| data.append(entity_dict) | |
| df = pd.DataFrame(data) | |
| # Указываем кодировку UTF-8 при записи CSV | |
| df.to_csv(csv_path, index=False, encoding='utf-8') | |
| logging.info(f"Сохранено {len(entities)} сущностей в {csv_path}") | |
| def load_entities_from_csv(csv_path: str) -> List[LinkerEntity]: | |
| """ | |
| Загружает сущности из CSV файла. | |
| Args: | |
| csv_path: Путь к CSV файлу | |
| Returns: | |
| Список сущностей | |
| """ | |
| df = pd.read_csv(csv_path) | |
| entities = [] | |
| for _, row in df.iterrows(): | |
| # Обработка метаданных | |
| metadata_str = row.get("metadata", "{}") | |
| try: | |
| # Используем json.loads для парсинга JSON строки | |
| metadata = ( | |
| json.loads(metadata_str) | |
| if pd.notna(metadata_str) and metadata_str | |
| else {} | |
| ) | |
| except json.JSONDecodeError: # Ловим ошибку JSON | |
| logging.warning( | |
| f"Не удалось распарсить метаданные JSON: {metadata_str}. Используется пустой словарь." | |
| ) | |
| metadata = {} | |
| # Общие поля для всех типов сущностей | |
| # Преобразуем ID обратно в UUID | |
| entity_id = row['id'] | |
| if isinstance(entity_id, str): | |
| try: | |
| entity_id = UUID(entity_id) | |
| except ValueError: | |
| logging.warning( | |
| f"Неверный формат UUID для id: {entity_id}. Пропускаем сущность." | |
| ) | |
| continue | |
| common_args = { | |
| "id": entity_id, | |
| "name": row["name"] if pd.notna(row.get("name")) else "", | |
| "text": row["text"] if pd.notna(row.get("text")) else "", | |
| "metadata": metadata, | |
| "in_search_text": ( | |
| row["in_search_text"] if pd.notna(row.get('in_search_text')) else None | |
| ), | |
| "type": ( | |
| row["type"] if pd.notna(row.get('type')) else LinkerEntity.__name__ | |
| ), # Используем базовый тип, если не указан | |
| "groupper": row["groupper"] if pd.notna(row.get("groupper")) else None, | |
| } | |
| # Добавляем поля связи, если они есть, преобразуя в UUID | |
| source_id_str = row.get("source_id") | |
| target_id_str = row.get("target_id") | |
| if pd.notna(source_id_str): | |
| try: | |
| common_args["source_id"] = UUID(source_id_str) | |
| except ValueError: | |
| logging.warning( | |
| f"Неверный формат UUID для source_id: {source_id_str}. Пропускаем поле." | |
| ) | |
| if pd.notna(target_id_str): | |
| try: | |
| common_args["target_id"] = UUID(target_id_str) | |
| except ValueError: | |
| logging.warning( | |
| f"Неверный формат UUID для target_id: {target_id_str}. Пропускаем поле." | |
| ) | |
| if pd.notna(row.get("number_in_relation")): | |
| try: | |
| common_args["number_in_relation"] = int(row["number_in_relation"]) | |
| except ValueError: | |
| logging.warning( | |
| f"Неверный формат для number_in_relation: {row['number_in_relation']}. Пропускаем поле." | |
| ) | |
| # Пытаемся десериализовать в конкретный тип, если он известен | |
| entity_class = LinkerEntity._entity_classes.get( | |
| common_args["type"], LinkerEntity | |
| ) | |
| try: | |
| # Создаем экземпляр, передавая только те аргументы, которые ожидает класс | |
| # (используя LinkerEntity._deserialize_to_me как пример, но нужно убедиться, | |
| # что он принимает все нужные поля или имеет **kwargs) | |
| # Пока создаем базовый LinkerEntity, т.к. подклассы могут требовать специфичные поля | |
| # которых нет в CSV или в common_args | |
| entity = LinkerEntity(**common_args) | |
| # Если нужно строгое восстановление типов, потребуется более сложная логика | |
| # с проверкой полей каждого подкласса | |
| except TypeError as e: | |
| logging.warning( | |
| f"Ошибка создания экземпляра {entity_class.__name__} для ID {common_args['id']}: {e}. Создан базовый LinkerEntity." | |
| ) | |
| entity = LinkerEntity(**common_args) # Откат к базовому классу | |
| entities.append(entity) | |
| logging.info(f"Загружено {len(entities)} сущностей из {csv_path}") | |
| return entities | |
| def main() -> None: | |
| """Основная функция скрипта.""" | |
| setup_logging() | |
| ensure_directories() | |
| # Пути к файлам | |
| input_doc_path = "test_input/test2.docx" | |
| output_csv_path = "test_output/test2.csv" | |
| output_text_path = "test_output/test2.md" | |
| # Проверка наличия входного файла | |
| if not os.path.exists(input_doc_path): | |
| logging.error(f"Файл {input_doc_path} не найден!") | |
| return | |
| logging.info(f"Парсинг документа {input_doc_path}") | |
| try: | |
| # Шаг 1: Парсинг документа дважды, как если бы это были два разных документа | |
| parser = UniversalParser() | |
| document1 = parser.parse_by_path(input_doc_path) | |
| document2 = parser.parse_by_path(input_doc_path) | |
| # Меняем название второго документа, чтобы отличить его | |
| document2.name = document2.name + "_copy" if document2.name else "copy_doc" | |
| # Шаг 2: Чанкинг и извлечение таблиц с использованием EntitiesExtractor | |
| all_entities = [] | |
| # Обработка первого документа | |
| logging.info("Начало процесса деструктуризации первого документа") | |
| # Инициализируем экстрактор без документа (используем дефолтные настройки или настроим позже) | |
| extractor1 = EntitiesExtractor() | |
| # Настройка чанкинга | |
| extractor1.configure_chunking( | |
| strategy_name="fixed_size", | |
| strategy_params={ | |
| "words_per_chunk": 50, | |
| "overlap_words": 25, | |
| "respect_sentence_boundaries": True, # Добавлено по запросу | |
| }, | |
| ) | |
| # Настройка извлечения таблиц | |
| extractor1.configure_tables_extraction(process_tables=True) | |
| # Выполнение деструктуризации | |
| entities1 = extractor1.extract(document1) | |
| # Находим ID документа 1 | |
| doc1_entity = next((e for e in entities1 if e.type == DocumentAsEntity.__name__), None) | |
| if not doc1_entity: | |
| logging.error("Не удалось найти DocumentAsEntity для первого документа!") | |
| return | |
| doc1_id = doc1_entity.id | |
| logging.info(f"ID первого документа: {doc1_id}") | |
| logging.info(f"Получено {len(entities1)} сущностей из первого документа") | |
| all_entities.extend(entities1) | |
| # Обработка второго документа | |
| logging.info("Начало процесса деструктуризации второго документа") | |
| # Инициализируем экстрактор без документа | |
| extractor2 = EntitiesExtractor() | |
| # Настройка чанкинга (те же параметры) | |
| extractor2.configure_chunking( | |
| strategy_name="fixed_size", | |
| strategy_params={ | |
| "words_per_chunk": 50, | |
| "overlap_words": 25, | |
| "respect_sentence_boundaries": True, | |
| }, | |
| ) | |
| # Настройка извлечения таблиц | |
| extractor2.configure_tables_extraction(process_tables=True) | |
| # Выполнение деструктуризации | |
| entities2 = extractor2.extract(document2) | |
| # Находим ID документа 2 | |
| doc2_entity = next((e for e in entities2 if e.type == DocumentAsEntity.__name__), None) | |
| if not doc2_entity: | |
| logging.error("Не удалось найти DocumentAsEntity для второго документа!") | |
| return | |
| doc2_id = doc2_entity.id | |
| logging.info(f"ID второго документа: {doc2_id}") | |
| logging.info(f"Получено {len(entities2)} сущностей из второго документа") | |
| all_entities.extend(entities2) | |
| logging.info( | |
| f"Всего получено {len(all_entities)} сущностей из обоих документов" | |
| ) | |
| # Шаг 3: Сохранение результатов чанкинга в CSV | |
| save_entities_to_csv(all_entities, output_csv_path) | |
| # Шаг 4: Загрузка сущностей из CSV и выбор случайных чанков | |
| loaded_entities = load_entities_from_csv(output_csv_path) | |
| # Шаг 5: Создание InjectionBuilder с InMemoryEntityRepository | |
| # Сначала создаем репозиторий со ВСЕМИ загруженными сущностями | |
| repository = InMemoryEntityRepository(loaded_entities) | |
| builder = InjectionBuilder(repository=repository) | |
| # Фильтрация только чанков (сущностей с in_search_text) | |
| # Убедимся, что работаем с десериализованными сущностями из репозитория | |
| # (Репозиторий уже десериализует при инициализации, если нужно) | |
| all_entities_from_repo = repository.get_entities_by_ids( | |
| [e.id for e in loaded_entities] | |
| ) | |
| # Выбираем все сущности с in_search_text | |
| selectable_entities = [ | |
| e for e in all_entities_from_repo if e.in_search_text is not None | |
| ] | |
| # Выбор случайных сущностей (от 20 до 30, но не более доступных) | |
| num_entities_to_select = min(random.randint(100, 500), len(selectable_entities)) | |
| if num_entities_to_select > 0: | |
| selected_entities = random.sample( | |
| selectable_entities, num_entities_to_select | |
| ) | |
| selected_ids = [entity.id for entity in selected_entities] | |
| logging.info( | |
| f"Выбрано {len(selected_ids)} случайных ID сущностей (с in_search_text) для сборки" | |
| ) | |
| # Дополнительная статистика по документам | |
| # Используем репозиторий для получения информации о владельцах | |
| selected_entities_details = repository.get_entities_by_ids(selected_ids) | |
| # Считаем на основе owner_id | |
| doc1_entities_count = sum(1 for e in selected_entities_details if e.owner_id == doc1_id) | |
| doc2_entities_count = sum(1 for e in selected_entities_details if e.owner_id == doc2_id) | |
| other_owner_count = len(selected_entities_details) - (doc1_entities_count + doc2_entities_count) | |
| logging.info( | |
| f"Из них {doc1_entities_count} принадлежат первому документу (ID: {doc1_id}), " | |
| f"{doc2_entities_count} второму (ID: {doc2_id}) (на основе owner_id). " | |
| f"{other_owner_count} имеют другого владельца (вероятно, таблицы/строки)." | |
| ) | |
| else: | |
| logging.warning("Не найдено сущностей с in_search_text для выбора.") | |
| selected_ids = [] | |
| selected_entities = [] # Добавлено для ясности | |
| # Шаг 6: Сборка текста из выбранных ID | |
| logging.info("Начало сборки текста из выбранных ID") | |
| # Передаем ID, а не сущности, т.к. builder сам их получит из репозитория | |
| assembled_text = builder.build( | |
| selected_ids, include_tables=True | |
| ) # Включаем таблицы | |
| # Шаг 7: Сохранение результата в файл | |
| with open(output_text_path, "w", encoding="utf-8") as f: | |
| f.write(assembled_text.replace('\n', '\n\n')) | |
| logging.info(f"Результат сборки сохранен в {output_text_path}") | |
| except Exception as e: | |
| logging.error(f"Произошла ошибка: {e}", exc_info=True) | |
| if __name__ == "__main__": | |
| main() | |