Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python | |
| """ | |
| Скрипт для подготовки датасета с вопросами и текстами пунктов/приложений. | |
| Преобразует исходный датасет, содержащий списки пунктов, в расширенный датасет, | |
| где каждому пункту/приложению соответствует отдельная строка. | |
| """ | |
| import argparse | |
| import sys | |
| from pathlib import Path | |
| from typing import Any, Dict | |
| import pandas as pd | |
| from tqdm import tqdm | |
| from ntr_text_fragmentation import Destructurer | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from ntr_fileparser import UniversalParser | |
| def parse_args(): | |
| """ | |
| Парсит аргументы командной строки. | |
| Returns: | |
| Аргументы командной строки | |
| """ | |
| parser = argparse.ArgumentParser(description="Подготовка датасета с текстами пунктов") | |
| parser.add_argument('--input-dataset', type=str, default='data/dataset.xlsx', | |
| help='Путь к исходному датасету (Excel-файл)') | |
| parser.add_argument('--output-dataset', type=str, default='data/dataset_with_texts.xlsx', | |
| help='Путь для сохранения подготовленного датасета (Excel-файл)') | |
| parser.add_argument('--data-folder', type=str, default='data/docs', | |
| help='Путь к папке с документами') | |
| parser.add_argument('--debug', action='store_true', | |
| help='Включить режим отладки с дополнительным выводом информации') | |
| return parser.parse_args() | |
| def load_dataset(file_path: str, debug: bool = False) -> pd.DataFrame: | |
| """ | |
| Загружает исходный датасет с вопросами. | |
| Args: | |
| file_path: Путь к Excel-файлу | |
| debug: Режим отладки | |
| Returns: | |
| DataFrame с вопросами | |
| """ | |
| print(f"Загрузка исходного датасета из {file_path}...") | |
| df = pd.read_excel(file_path) | |
| # Преобразуем строковые списки в настоящие списки | |
| for col in ['puncts', 'appendices']: | |
| if col in df.columns: | |
| df[col] = df[col].apply(lambda x: | |
| eval(x) if isinstance(x, str) and x.strip() | |
| else ([] if pd.isna(x) else x)) | |
| # Вывод отладочной информации о форматах пунктов/приложений | |
| if debug: | |
| all_puncts = set() | |
| all_appendices = set() | |
| for _, row in df.iterrows(): | |
| if 'puncts' in row and row['puncts']: | |
| all_puncts.update(row['puncts']) | |
| if 'appendices' in row and row['appendices']: | |
| all_appendices.update(row['appendices']) | |
| print(f"\nУникальные форматы пунктов в датасете ({len(all_puncts)}):") | |
| for i, p in enumerate(sorted(all_puncts)): | |
| if i < 20 or i > len(all_puncts) - 20: | |
| print(f" - {repr(p)}") | |
| elif i == 20: | |
| print(" ... (пропущено)") | |
| print(f"\nУникальные форматы приложений в датасете ({len(all_appendices)}):") | |
| for app in sorted(all_appendices): | |
| print(f" - {repr(app)}") | |
| print(f"Загружено {len(df)} вопросов") | |
| return df | |
| def read_documents(folder_path: str) -> Dict[str, Any]: | |
| """ | |
| Читает все документы из указанной папки. | |
| Args: | |
| folder_path: Путь к папке с документами | |
| Returns: | |
| Словарь {имя_файла: parsed_document} | |
| """ | |
| print(f"Чтение документов из {folder_path}...") | |
| parser = UniversalParser() | |
| documents = {} | |
| for file_path in tqdm(list(Path(folder_path).glob("*.docx")), desc="Чтение документов"): | |
| try: | |
| doc_name = file_path.stem | |
| documents[doc_name] = parser.parse_by_path(str(file_path)) | |
| except Exception as e: | |
| print(f"Ошибка при чтении файла {file_path}: {e}") | |
| print(f"Прочитано {len(documents)} документов") | |
| return documents | |
| def normalize_punct_format(punct: str) -> str: | |
| """ | |
| Нормализует формат номера пункта для единообразного сравнения. | |
| Args: | |
| punct: Номер пункта | |
| Returns: | |
| Нормализованный номер пункта | |
| """ | |
| # Убираем пробелы | |
| punct = punct.strip() | |
| # Убираем завершающую точку, если она есть | |
| if punct.endswith('.'): | |
| punct = punct[:-1] | |
| return punct | |
| def normalize_appendix_format(appendix: str) -> str: | |
| """ | |
| Нормализует формат номера приложения для единообразного сравнения. | |
| Args: | |
| appendix: Номер приложения | |
| Returns: | |
| Нормализованный номер приложения | |
| """ | |
| # Убираем пробелы | |
| appendix = appendix.strip() | |
| # Обработка форматов с дефисом (например, "14-1") | |
| if "-" in appendix: | |
| return appendix | |
| return appendix | |
| def find_matching_key(search_key, available_keys, item_type='punct', debug_mode=False): | |
| """ | |
| Ищет наиболее подходящий ключ среди доступных ключей с учетом типа элемента | |
| Args: | |
| search_key: Ключ для поиска | |
| available_keys: Доступные ключи | |
| item_type: Тип элемента ('punct' или 'appendix') | |
| debug_mode: Режим отладки | |
| Returns: | |
| Найденный ключ или None | |
| """ | |
| if not available_keys: | |
| return None | |
| # Нормализуем ключ в зависимости от типа элемента | |
| if item_type == 'punct': | |
| normalized_search_key = normalize_punct_format(search_key) | |
| else: # appendix | |
| normalized_search_key = normalize_appendix_format(search_key) | |
| # Проверяем прямое совпадение ключей | |
| for key in available_keys: | |
| if item_type == 'punct': | |
| normalized_key = normalize_punct_format(key) | |
| else: # appendix | |
| normalized_key = normalize_appendix_format(key) | |
| if normalized_key == normalized_search_key: | |
| if debug_mode: | |
| print(f"Найдено прямое совпадение для {item_type} {search_key} -> {key}") | |
| return key | |
| # Если прямого совпадения нет, проверяем "мягкое" совпадение | |
| # Только для пунктов, не для приложений | |
| if item_type == 'punct': | |
| for key in available_keys: | |
| normalized_key = normalize_punct_format(key) | |
| # Если ключ содержит "/", это подпункт приложения, его не следует сопоставлять с обычным пунктом | |
| if '/' in key and '/' not in search_key: | |
| continue | |
| # Проверяем совпадение конца номера (например, "1.2" и "1.2.") | |
| if normalized_key.rstrip('.') == normalized_search_key.rstrip('.'): | |
| if debug_mode: | |
| print(f"Найдено мягкое совпадение для {search_key} -> {key}") | |
| return key | |
| return None | |
| def extract_item_texts(documents, debug_mode=False): | |
| """ | |
| Извлекает тексты пунктов и приложений из документов. | |
| Args: | |
| documents: Словарь с распарсенными документами {doc_name: document} | |
| debug_mode: Включать ли режим отладки | |
| Returns: | |
| Словарь с текстами пунктов и приложений, организованный по названиям документов | |
| """ | |
| print("Извлечение текстов пунктов и приложений...") | |
| item_texts = {} | |
| all_extracted_items = set() | |
| all_extracted_appendices = set() | |
| for doc_name, document in tqdm(documents.items(), desc="Применение стратегии numbered_items"): | |
| # Используем стратегию numbered_items с режимом отладки | |
| destructurer = Destructurer(document) | |
| destructurer.configure('numbered_items', debug_mode=debug_mode) | |
| entities, _ = destructurer.destructure() | |
| # Инициализируем структуру для документа, если она еще не создана | |
| if doc_name not in item_texts: | |
| item_texts[doc_name] = { | |
| 'puncts': {}, # Для пунктов основного текста | |
| 'appendices': {} # Для приложений | |
| } | |
| for entity in entities: | |
| # Пропускаем сущность документа | |
| if entity.type == "Document": | |
| continue | |
| # Работаем только с чанками для поиска | |
| if hasattr(entity, 'use_in_search') and entity.use_in_search: | |
| metadata = entity.metadata | |
| text = entity.text | |
| # Для пунктов | |
| if 'item_number' in metadata: | |
| item_number = metadata['item_number'] | |
| # Проверяем, является ли пункт подпунктом приложения | |
| if 'appendix_number' in metadata: | |
| # Это подпункт приложения | |
| appendix_number = metadata['appendix_number'] | |
| # Создаем структуру для приложения, если ее еще нет | |
| if appendix_number not in item_texts[doc_name]['appendices']: | |
| item_texts[doc_name]['appendices'][appendix_number] = { | |
| 'main_text': '', # Основной текст приложения | |
| 'subpuncts': {} # Подпункты приложения | |
| } | |
| # Добавляем подпункт в словарь подпунктов | |
| item_texts[doc_name]['appendices'][appendix_number]['subpuncts'][item_number] = text | |
| if debug_mode: | |
| print(f"Извлечен подпункт {item_number} приложения {appendix_number} из {doc_name}") | |
| all_extracted_items.add(item_number) | |
| else: | |
| # Обычный пункт | |
| item_texts[doc_name]['puncts'][item_number] = text | |
| if debug_mode: | |
| print(f"Извлечен пункт {item_number} из {doc_name}") | |
| all_extracted_items.add(item_number) | |
| # Для приложений | |
| elif 'appendix_number' in metadata and 'item_number' not in metadata: | |
| appendix_number = metadata['appendix_number'] | |
| # Создаем структуру для приложения, если ее еще нет | |
| if appendix_number not in item_texts[doc_name]['appendices']: | |
| item_texts[doc_name]['appendices'][appendix_number] = { | |
| 'main_text': text, # Основной текст приложения | |
| 'subpuncts': {} # Подпункты приложения | |
| } | |
| else: | |
| # Если приложение уже существует, обновляем основной текст | |
| item_texts[doc_name]['appendices'][appendix_number]['main_text'] = text | |
| if debug_mode: | |
| print(f"Извлечено приложение {appendix_number} из {doc_name}") | |
| all_extracted_appendices.add(appendix_number) | |
| # Выводим статистику, если включен режим отладки | |
| if debug_mode: | |
| print(f"\nВсего извлечено уникальных пунктов: {len(all_extracted_items)}") | |
| print(f"Примеры форматов пунктов: {', '.join(sorted(list(all_extracted_items))[:20])}") | |
| print(f"\nВсего извлечено уникальных приложений: {len(all_extracted_appendices)}") | |
| print(f"Форматы приложений: {', '.join(sorted(list(all_extracted_appendices)))}") | |
| # Подсчитываем общее количество пунктов и приложений | |
| total_puncts = sum(len(doc_data['puncts']) for doc_data in item_texts.values()) | |
| total_appendices = sum(len(doc_data['appendices']) for doc_data in item_texts.values()) | |
| print(f"Извлечено {total_puncts} пунктов и {total_appendices} приложений из {len(item_texts)} документов") | |
| return item_texts | |
| def is_subpunct(parent_punct: str, possible_subpunct: str) -> bool: | |
| """ | |
| Проверяет, является ли пункт подпунктом другого пункта. | |
| Args: | |
| parent_punct: Родительский пункт (например, "14") | |
| possible_subpunct: Возможный подпункт (например, "14.1") | |
| Returns: | |
| True, если possible_subpunct является подпунктом parent_punct | |
| """ | |
| # Нормализуем пункты | |
| parent = normalize_punct_format(parent_punct) | |
| child = normalize_punct_format(possible_subpunct) | |
| # Проверяем, начинается ли child с parent и после него идет точка или другой разделитель | |
| if child.startswith(parent): | |
| # Если длины равны, это тот же самый пункт | |
| if len(child) == len(parent): | |
| return False | |
| # Проверяем символ после parent - должна быть точка (дефис исключен, т.к. это разные пункты) | |
| next_char = child[len(parent)] | |
| return next_char in ['.'] | |
| return False | |
| def collect_subpuncts(punct: str, all_puncts: dict) -> dict: | |
| """ | |
| Собирает все подпункты для указанного пункта. | |
| Args: | |
| punct: Пункт, для которого нужно найти подпункты (например, "14") | |
| all_puncts: Словарь всех пунктов {punct: text} | |
| Returns: | |
| Словарь {punct: text} с пунктом и всеми его подпунктами | |
| """ | |
| result = {} | |
| normalized_punct = normalize_punct_format(punct) | |
| # Добавляем сам пункт, если он существует | |
| if normalized_punct in all_puncts: | |
| result[normalized_punct] = all_puncts[normalized_punct] | |
| # Ищем подпункты | |
| for possible_subpunct in all_puncts.keys(): | |
| if is_subpunct(normalized_punct, possible_subpunct): | |
| result[possible_subpunct] = all_puncts[possible_subpunct] | |
| return result | |
| def prepare_expanded_dataset(df, item_texts, output_path, debug_mode=False): | |
| """ | |
| Подготавливает расширенный датасет, добавляя тексты пунктов и приложений. | |
| Args: | |
| df: Исходный датасет | |
| item_texts: Словарь с текстами пунктов и приложений | |
| output_path: Путь для сохранения расширенного датасета | |
| debug_mode: Включать ли режим отладки | |
| Returns: | |
| Датафрейм с расширенным датасетом | |
| """ | |
| rows = [] | |
| skipped_items = 0 | |
| total_items = 0 | |
| for _, row in df.iterrows(): | |
| question_id = row['id'] | |
| question = row['question'] | |
| filepath = row.get('filepath', '') | |
| # Получаем имя файла без пути | |
| doc_name = Path(filepath).stem if filepath else '' | |
| # Пропускаем, если файл не найден | |
| if not doc_name or doc_name not in item_texts: | |
| if debug_mode and doc_name: | |
| print(f"Документ {doc_name} не найден в извлеченных данных") | |
| continue | |
| # Обрабатываем пункты | |
| puncts = row.get('puncts', []) | |
| if isinstance(puncts, str) and puncts.strip(): | |
| # Преобразуем строковое представление в список | |
| try: | |
| puncts = eval(puncts) | |
| except: | |
| puncts = [] | |
| if not isinstance(puncts, list): | |
| puncts = [] | |
| for punct in puncts: | |
| total_items += 1 | |
| if debug_mode: | |
| print(f"\nОбработка пункта {punct} для вопроса {question_id} из {doc_name}") | |
| # Ищем соответствующий пункт в документе | |
| available_keys = list(item_texts[doc_name]['puncts'].keys()) | |
| matching_key = find_matching_key(punct, available_keys, 'punct', debug_mode) | |
| if matching_key: | |
| # Сохраняем основной текст пункта | |
| item_text = item_texts[doc_name]['puncts'][matching_key] | |
| # Список всех включенных ключей (для отслеживания что было приконкатенировано) | |
| matched_keys = [matching_key] | |
| # Ищем все подпункты для этого пункта | |
| subpuncts = {} | |
| for key in available_keys: | |
| if is_subpunct(matching_key, key): | |
| subpuncts[key] = item_texts[doc_name]['puncts'][key] | |
| matched_keys.append(key) | |
| # Если есть подпункты, добавляем их к основному тексту | |
| if subpuncts: | |
| # Сортируем подпункты по номеру | |
| sorted_subpuncts = sorted(subpuncts.items(), key=lambda x: x[0]) | |
| # Добавляем разделитель и все подпункты | |
| combined_text = item_text | |
| for key, subtext in sorted_subpuncts: | |
| combined_text += f"\n\n{key} {subtext}" | |
| item_text = combined_text | |
| # Добавляем строку с пунктом и его подпунктами | |
| rows.append({ | |
| 'id': question_id, | |
| 'question': question, | |
| 'filename': doc_name, | |
| 'text': item_text, | |
| 'item_type': 'punct', | |
| 'item_id': punct, | |
| 'matching_keys': ", ".join(matched_keys) | |
| }) | |
| if debug_mode: | |
| print(f"Добавлен пункт {matching_key} для {question_id} с {len(matched_keys)} ключами") | |
| if len(matched_keys) > 1: | |
| print(f" Включены ключи: {', '.join(matched_keys)}") | |
| else: | |
| skipped_items += 1 | |
| if debug_mode: | |
| print(f"Не найден соответствующий пункт для {punct} в {doc_name}") | |
| # Обрабатываем приложения | |
| appendices = row.get('appendices', []) | |
| if isinstance(appendices, str) and appendices.strip(): | |
| # Преобразуем строковое представление в список | |
| try: | |
| appendices = eval(appendices) | |
| except: | |
| appendices = [] | |
| if not isinstance(appendices, list): | |
| appendices = [] | |
| for appendix in appendices: | |
| total_items += 1 | |
| if debug_mode: | |
| print(f"\nОбработка приложения {appendix} для вопроса {question_id} из {doc_name}") | |
| # Ищем соответствующее приложение в документе | |
| available_keys = list(item_texts[doc_name]['appendices'].keys()) | |
| matching_key = find_matching_key(appendix, available_keys, 'appendix', debug_mode) | |
| if matching_key: | |
| appendix_content = item_texts[doc_name]['appendices'][matching_key] | |
| # Список всех включенных ключей (для отслеживания что было приконкатенировано) | |
| matched_keys = [matching_key] | |
| # Формируем полный текст приложения, включая все подпункты | |
| if isinstance(appendix_content, dict): | |
| # Начинаем с основного текста | |
| full_text = appendix_content.get('main_text', '') | |
| # Добавляем все подпункты в отсортированном порядке | |
| if 'subpuncts' in appendix_content and appendix_content['subpuncts']: | |
| subpuncts = appendix_content['subpuncts'] | |
| sorted_subpuncts = sorted(subpuncts.items(), key=lambda x: x[0]) | |
| # Добавляем разделитель, если есть основной текст | |
| if full_text: | |
| full_text += "\n\n" | |
| # Добавляем все подпункты | |
| for i, (key, subtext) in enumerate(sorted_subpuncts): | |
| matched_keys.append(f"{matching_key}/{key}") | |
| if i > 0: | |
| full_text += "\n\n" | |
| full_text += f"{key} {subtext}" | |
| else: | |
| # Если приложение просто строка | |
| full_text = appendix_content | |
| # Добавляем строку с приложением | |
| rows.append({ | |
| 'id': question_id, | |
| 'question': question, | |
| 'filename': doc_name, | |
| 'text': full_text, | |
| 'item_type': 'appendix', | |
| 'item_id': appendix, | |
| 'matching_keys': ", ".join(matched_keys) | |
| }) | |
| if debug_mode: | |
| print(f"Добавлено приложение {matching_key} для {question_id} с {len(matched_keys)} ключами") | |
| if len(matched_keys) > 1: | |
| print(f" Включены ключи: {', '.join(matched_keys)}") | |
| else: | |
| skipped_items += 1 | |
| if debug_mode: | |
| print(f"Не найдено соответствующее приложение для {appendix} в {doc_name}") | |
| extended_df = pd.DataFrame(rows) | |
| # Сохраняем расширенный датасет | |
| extended_df.to_excel(output_path, index=False) | |
| print(f"Расширенный датасет сохранен в {output_path}") | |
| print(f"Всего обработано элементов: {total_items}") | |
| print(f"Всего элементов в расширенном датасете: {len(extended_df)}") | |
| print(f"Пропущено элементов из-за отсутствия соответствия: {skipped_items}") | |
| return extended_df | |
| def main(): | |
| # Парсим аргументы командной строки | |
| args = parse_args() | |
| # Определяем режим отладки | |
| debug = args.debug | |
| # Загружаем исходный датасет | |
| df = load_dataset(args.input_dataset, debug) | |
| # Читаем документы | |
| documents = read_documents(args.data_folder) | |
| # Извлекаем тексты пунктов и приложений | |
| item_texts = extract_item_texts(documents, debug) | |
| # Подготавливаем расширенный датасет | |
| expanded_df = prepare_expanded_dataset(df, item_texts, args.output_dataset, debug) | |
| print("Готово!") | |
| if __name__ == "__main__": | |
| main() |