Spaces:
Runtime error
Runtime error
| import logging | |
| from collections import Counter | |
| from dataclasses import dataclass | |
| import pandas as pd | |
| from components.parser.abbreviations.abbreviation import Abbreviation | |
| logger = logging.getLogger(__name__) | |
| class ParsedRow: | |
| """ | |
| Класс для хранения данных, полученных из строки таблицы. | |
| """ | |
| index: int | |
| cols: list[str] | |
| def apply_abbreviations(self, abbreviations: list) -> None: | |
| """ | |
| Применяет список аббревиатур к строке таблицы. | |
| Args: | |
| abbreviations: list[Abbreviation] - список аббревиатур, которые нужно применить | |
| """ | |
| for abbreviation in abbreviations: | |
| self.cols = [abbreviation.apply(column) for column in self.cols] | |
| def to_text(self, header: list[str] | None = None) -> str: | |
| """ | |
| Преобразование строки таблицы в текст. | |
| Пример такого преобразования: | |
| ``` | |
| ПиП : Привет и Пока | |
| ``` | |
| Args: | |
| header: list[str] | None - шапка таблицы, если обрабатывается многоколоночная таблица | |
| Returns: | |
| str - строка таблицы в текстовом формате | |
| """ | |
| if header is not None: | |
| return '\n'.join(self._apply_header(header)).strip() | |
| else: | |
| return ' : '.join(self.cols).strip() | |
| def _apply_header(self, header: list[str]) -> list[str]: | |
| """ | |
| Применение шапки таблицы к строке. | |
| Args: | |
| header: list[str] - шапка таблицы | |
| Returns: | |
| list[str] - список колонок с применённой шапкой | |
| """ | |
| if len(header) != len(self.cols): | |
| logging.debug( | |
| f'Количество колонок в строке {self.index} не совпадает с количеством колонок в шапке таблицы' | |
| ) | |
| named_part = [ | |
| f'{header[col_index]}: {col_value}' | |
| for col_index, col_value in enumerate(self.cols[: len(header)]) | |
| ] | |
| unnamed_part = self.cols[len(header) :] | |
| return named_part + unnamed_part | |
| class ParsedTable: | |
| """ | |
| Класс для хранения данных, полученных из таблицы. | |
| index: int - номер таблицы | |
| short_type: str | None - либо "сокращения", либо "регламентирующие документы", для других таблиц не заполняется | |
| rows: list[ParsedRow] - строки таблицы | |
| name: str | None - название таблицы, если найдено | |
| """ | |
| index: int | |
| short_type: str | None | |
| header: list[str] | None | |
| rows: list[ParsedRow] | |
| name: str | None = None | |
| subtables: list['ParsedTable'] | None = None | |
| note: str | None = None | |
| rows_count: int = 0 # Количество строк в таблице | |
| modal_cols_count: int = 0 # Модальное (самое частое) количество столбцов | |
| has_merged_cells: bool = False # Наличие объединенных ячеек | |
| def apply_abbreviations(self, abbreviations) -> None: | |
| """ | |
| Применяет список аббревиатур ко всем элементам таблицы. | |
| Args: | |
| abbreviations: list[Abbreviation] | Abbreviation - объект или список объектов аббревиатур | |
| """ | |
| # Преобразуем одиночную аббревиатуру в список для унификации обработки | |
| if not isinstance(abbreviations, list): | |
| abbreviations = [abbreviations] | |
| # Применяем к названию таблицы, если оно есть | |
| if self.name: | |
| for abbreviation in abbreviations: | |
| self.name = abbreviation.apply(self.name) | |
| # Применяем к заголовку таблицы, если он есть | |
| if self.header: | |
| for abbreviation in abbreviations: | |
| self.header = [abbreviation.apply(column) for column in self.header] | |
| # Применяем к строкам таблицы | |
| for row in self.rows: | |
| row.apply_abbreviations(abbreviations) | |
| # Применяем к примечанию, если оно есть | |
| if self.note: | |
| for abbreviation in abbreviations: | |
| self.note = abbreviation.apply(self.note) | |
| # Применяем к подтаблицам, если они есть | |
| if self.subtables: | |
| for subtable in self.subtables: | |
| subtable.apply_abbreviations(abbreviations) | |
| def to_text(self) -> str: | |
| """ | |
| Преобразование таблицы в текст для дальнейшего разбиения на чанки. | |
| Если таблица имеет менее 12 строк, менее 5 столбцов и не содержит объединенных ячеек, | |
| то она будет преобразована в формат Markdown. | |
| Returns: | |
| str - таблица в текстовом формате | |
| """ | |
| # Если таблица соответствует критериям для Markdown форматирования | |
| if (self.rows_count < 12 and self.modal_cols_count < 5 and not self.has_merged_cells): | |
| return self._to_markdown() | |
| # Иначе используем стандартный текстовый формат | |
| result = [] | |
| # Основная таблица | |
| result.append('\n\n'.join(self._rich_row(row) for row in self.rows)) | |
| # Подтаблицы | |
| if self.subtables: | |
| for subtable in self.subtables: | |
| result.append(subtable.to_text()) | |
| # Примечание | |
| if self.note: | |
| result.append(f"Примечание к таблице {self.index + 1}: {self.note}") | |
| return '\n\n'.join(result) | |
| def _to_markdown(self) -> str: | |
| """ | |
| Преобразование таблицы в формат Markdown. | |
| Returns: | |
| str - таблица в формате Markdown | |
| """ | |
| result = [] | |
| # Добавляем название таблицы, если оно есть | |
| if self.name: | |
| result.append(f"### {self.name}") | |
| result.append("") | |
| # Собираем заголовок таблицы | |
| if self.header: | |
| header_row = "| " + " | ".join(self.header) + " |" | |
| separator = "| " + " | ".join(["---"] * len(self.header)) + " |" | |
| result.append(header_row) | |
| result.append(separator) | |
| else: | |
| # Если нет заголовка, используем максимальное количество колонок | |
| max_cols = max([len(row.cols) for row in self.rows]) if self.rows else 0 | |
| if max_cols > 0: | |
| separator = "| " + " | ".join(["---"] * max_cols) + " |" | |
| result.append(separator) | |
| # Добавляем строки таблицы | |
| for row in self.rows: | |
| # Формируем строку в формате Markdown | |
| markdown_row = "| " + " | ".join(row.cols) + " |" | |
| result.append(markdown_row) | |
| # Добавляем примечание, если оно есть | |
| if self.note: | |
| result.append("") | |
| result.append(f"*Примечание: {self.note}*") | |
| # Добавляем подтаблицы, если они есть | |
| if self.subtables: | |
| for subtable in self.subtables: | |
| result.append("") | |
| result.append(subtable.to_text()) | |
| return "\n".join(result) | |
| def _rich_row(self, row: ParsedRow) -> str: | |
| """ | |
| Преобразование строки таблицы в текст с учётом самой таблицы. | |
| Примеры такого преобразования: | |
| ``` | |
| Т1 сокращения [Название таблицы] | |
| 1 | |
| ПиП : Привет и Пока | |
| ``` | |
| ``` | |
| Т2 [Название таблицы] | |
| 1 | |
| Столбец 1 : Значение 1 | |
| Столбец 2 : Значение 2 | |
| ``` | |
| Args: | |
| row: ParsedRow - строка таблицы | |
| Returns: | |
| str - строка таблицы в текстовом формате | |
| """ | |
| table_header = f'Т{self.index + 1}' | |
| if self.short_type is not None: | |
| table_header += f' {self.short_type}' | |
| if self.name is not None: | |
| table_header += f' [{self.name}]' | |
| return f'{table_header}\n{row.index}\n{row.to_text(self.header)}' | |
| def normalize(self) -> 'ParsedTable': | |
| """ | |
| Нормализует таблицу, обрабатывая подтаблицы и примечания. | |
| Нормализация включает: | |
| 1. Определение нормального количества столбцов (мода) | |
| 2. Выделение подтаблиц, если встречаются строки с одним столбцом, | |
| когда нормальное число столбцов не равно 1 | |
| 3. Обработка примечаний (последняя строка с одним столбцом) | |
| 4. Вычисление количества строк, модального количества столбцов | |
| 5. Определение наличия объединенных ячеек | |
| Returns: | |
| ParsedTable - нормализованная таблица | |
| """ | |
| if not self.rows: | |
| return self | |
| # Находим моду по количеству столбцов | |
| col_counts = [len(row.cols) for row in self.rows] | |
| mode_count = Counter(col_counts).most_common(1)[0][0] | |
| # Устанавливаем статистику таблицы | |
| rows_count = len(self.rows) | |
| modal_cols_count = mode_count | |
| # Проверяем наличие объединенных ячеек - если есть строки с разным количеством колонок | |
| has_merged_cells = len(set(col_counts)) > 1 | |
| # Если мода не равна 1, ищем строки с одним столбцом для обработки | |
| if mode_count != 1: | |
| normalized_rows = [] | |
| subtables = [] | |
| current_subtable_rows = [] | |
| current_subtable_name = None | |
| last_row_index = len(self.rows) - 1 | |
| note = None | |
| for i, row in enumerate(self.rows): | |
| if len(row.cols) == 1 and i != last_row_index: | |
| # Это может быть подзаголовок подтаблицы | |
| if current_subtable_rows: | |
| # Создаем подтаблицу из накопленных строк | |
| subtable = ParsedTable( | |
| index=len(subtables), | |
| short_type=self.short_type, | |
| header=self.header, # Используем хедер основной таблицы | |
| rows=current_subtable_rows, | |
| name=current_subtable_name, | |
| rows_count=len(current_subtable_rows), | |
| modal_cols_count=mode_count, | |
| has_merged_cells=has_merged_cells | |
| ) | |
| subtables.append(subtable) | |
| # Начинаем новую подтаблицу | |
| # Формируем имя подтаблицы как комбинацию имени таблицы и текста подзаголовка | |
| current_subtable_name = ( | |
| f"{self.name}: {row.cols[0]}" if self.name else row.cols[0] | |
| ) | |
| current_subtable_rows = [] | |
| elif len(row.cols) == 1 and i == last_row_index: | |
| # Это примечание | |
| note = row.cols[0] | |
| else: | |
| # Обычная строка | |
| if current_subtable_name: | |
| # Добавляем в текущую подтаблицу | |
| current_subtable_rows.append(row) | |
| else: | |
| # Добавляем в основную таблицу | |
| normalized_rows.append(row) | |
| # Добавляем последнюю подтаблицу, если она есть | |
| if current_subtable_rows: | |
| subtable = ParsedTable( | |
| index=len(subtables), | |
| short_type=self.short_type, # Используем тип основной таблицы | |
| header=self.header, # Используем хедер основной таблицы | |
| rows=current_subtable_rows, | |
| name=current_subtable_name, | |
| rows_count=len(current_subtable_rows), | |
| modal_cols_count=mode_count, | |
| has_merged_cells=has_merged_cells | |
| ) | |
| subtables.append(subtable) | |
| # Создаем новую таблицу с обновленными статистическими полями | |
| return ParsedTable( | |
| index=self.index, | |
| short_type=self.short_type, | |
| header=self.header, | |
| rows=normalized_rows, | |
| name=self.name, | |
| subtables=subtables if subtables else None, | |
| note=note, | |
| rows_count=len(normalized_rows), | |
| modal_cols_count=modal_cols_count, | |
| has_merged_cells=has_merged_cells | |
| ) | |
| # Если нет специальной обработки, просто обновляем статистические поля | |
| self.rows_count = rows_count | |
| self.modal_cols_count = modal_cols_count | |
| self.has_merged_cells = has_merged_cells | |
| return self | |
| class ParsedTables: | |
| """ | |
| Класс для хранения данных, полученных из всех таблиц файла. | |
| """ | |
| tables: list[ParsedTable] | |
| def apply_abbreviations(self, abbreviations) -> None: | |
| """ | |
| Применяет список аббревиатур ко всем таблицам. | |
| Args: | |
| abbreviations: list[Abbreviation] | Abbreviation - объект или список объектов аббревиатур | |
| """ | |
| # Преобразуем одиночную аббревиатуру в список для унификации обработки | |
| if not isinstance(abbreviations, list): | |
| abbreviations = [abbreviations] | |
| for table in self.tables: | |
| table.apply_abbreviations(abbreviations) | |
| def to_text(self) -> str: | |
| """ | |
| Преобразование всех таблиц в текст для дальнейшего разбиения на чанки. | |
| Returns: | |
| str - все таблицы в текстовом формате | |
| """ | |
| return '\n\n'.join(table.to_text() for table in self.tables) | |
| def normalize(self) -> 'ParsedTables': | |
| """ | |
| Нормализует все таблицы, обрабатывая подтаблицы и примечания. | |
| Returns: | |
| ParsedTables - нормализованные таблицы | |
| """ | |
| normalized_tables = [table.normalize() for table in self.tables] | |
| return ParsedTables(tables=normalized_tables) | |
| class ParsedText: | |
| """ | |
| Класс для хранения текста, полученного из XML файла. | |
| """ | |
| content: list[str] | |
| def apply_abbreviations( | |
| self, abbreviations: list[Abbreviation] | Abbreviation | |
| ) -> None: | |
| """ | |
| Применяет список аббревиатур ко всем строкам текста. | |
| Args: | |
| abbreviations: list[Abbreviation] | Abbreviation - объект или список объектов аббревиатур | |
| """ | |
| # Преобразуем одиночную аббревиатуру в список для унификации обработки | |
| if not isinstance(abbreviations, list): | |
| abbreviations = [abbreviations] | |
| for abbreviation in abbreviations: | |
| self.content = [abbreviation.apply(line) for line in self.content] | |
| def to_text(self) -> str: | |
| """ | |
| Возвращает текстовое представление. | |
| Returns: | |
| str - текст документа | |
| """ | |
| return "\n\n".join(self.content) | |
| class ParsedXML: | |
| """ | |
| Класс для хранения данных, полученных из xml файла. | |
| """ | |
| status: str | |
| name: str | None | |
| owner: str | None | |
| filename: str | |
| tables: ParsedTables | None = None | |
| text: ParsedText | None = None | |
| abbreviations: list = None # Список аббревиатур, извлеченных из документа | |
| id: int | None = None | |
| def apply_abbreviations( | |
| self, abbreviations: list[Abbreviation] | Abbreviation | |
| ) -> None: | |
| """ | |
| Применяет список аббревиатур ко всем элементам документа. | |
| Args: | |
| abbreviations: list[Abbreviation] | Abbreviation - объект или список объектов аббревиатур | |
| """ | |
| # Преобразуем одиночную аббревиатуру в список для унификации обработки | |
| if not isinstance(abbreviations, list): | |
| abbreviations = [abbreviations] | |
| # Применяем к содержимому таблиц, если они есть | |
| if self.tables: | |
| self.tables.apply_abbreviations(abbreviations) | |
| # Применяем к текстовому содержимому, если оно есть | |
| if self.text: | |
| self.text.apply_abbreviations(abbreviations) | |
| def apply_document_abbreviations(self) -> None: | |
| """ | |
| Применяет аббревиатуры, извлеченные из документа, ко всему его содержимому. | |
| """ | |
| if self.abbreviations: | |
| self.apply_abbreviations(self.abbreviations) | |
| def __post_init__(self) -> None: | |
| """ | |
| Пост-инициализация объекта ParsedXML. | |
| """ | |
| logger.debug( | |
| f'Initializing ParsedXML: name="{self.name}", owner="{self.owner}", status="{self.status}"' | |
| ) | |
| def only_info(self) -> 'ParsedXML': | |
| """ | |
| Создает новый объект ParsedXML только с базовой информацией, без контента. | |
| """ | |
| return ParsedXML( | |
| status=self.status, | |
| name=self.name, | |
| owner=self.owner, | |
| filename=self.filename, | |
| id=self.id, | |
| ) | |
| def to_text(self) -> str: | |
| """ | |
| Возвращает текстовое представление всего документа, включая таблицы и текст. | |
| Returns: | |
| str - полный текст документа | |
| """ | |
| result = [] | |
| # Добавляем текст таблиц, если они есть | |
| if self.tables: | |
| result.append(self.tables.to_text()) | |
| # Добавляем основной текст, если он есть | |
| if self.text: | |
| result.append(self.text.to_text()) | |
| return "\n\n".join(result) | |
| class ParsedXMLs: | |
| """ | |
| Класс для хранения данных, полученных из всех xml файлов. | |
| """ | |
| xmls: list[ParsedXML] | |
| def to_pandas(self) -> pd.DataFrame: | |
| """ | |
| Преобразование данных в pandas DataFrame. | |
| """ | |
| return pd.DataFrame( | |
| [ | |
| { | |
| 'status': xml.status, | |
| 'name': xml.name, | |
| 'owner': xml.owner, | |
| 'filename': xml.filename, | |
| } | |
| for xml in self.xmls | |
| ] | |
| ) | |