Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python | |
| """ | |
| Скрипт для отладки и анализа чанков, найденных для конкретного вопроса. | |
| Показывает, какие чанки находятся, какие пункты ожидаются и значения метрик нечеткого сравнения. | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import sys | |
| from difflib import SequenceMatcher | |
| from pathlib import Path | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| # Константы для настройки | |
| DATA_FOLDER = "data/docs" # Путь к папке с документами | |
| MODEL_NAME = "intfloat/e5-base" # Название модели для векторизации | |
| DATASET_PATH = "data/dataset.xlsx" # Путь к Excel-датасету с вопросами | |
| OUTPUT_DIR = "data" # Директория для сохранения результатов | |
| TOP_N_VALUES = [5, 10, 20, 30, 50, 100] # Значения N для анализа | |
| THRESHOLD = 0.6 | |
| def parse_args(): | |
| """ | |
| Парсит аргументы командной строки. | |
| Returns: | |
| Аргументы командной строки | |
| """ | |
| parser = argparse.ArgumentParser(description="Скрипт для отладки чанкинга на конкретном вопросе") | |
| parser.add_argument("--data-folder", type=str, default=DATA_FOLDER, | |
| help=f"Путь к папке с документами (по умолчанию: {DATA_FOLDER})") | |
| parser.add_argument("--model-name", type=str, default=MODEL_NAME, | |
| help=f"Название модели для векторизации (по умолчанию: {MODEL_NAME})") | |
| parser.add_argument("--dataset-path", type=str, default=DATASET_PATH, | |
| help=f"Путь к Excel-датасету с вопросами (по умолчанию: {DATASET_PATH})") | |
| parser.add_argument("--output-dir", type=str, default=OUTPUT_DIR, | |
| help=f"Директория для сохранения результатов (по умолчанию: {OUTPUT_DIR})") | |
| parser.add_argument("--question-id", type=int, required=True, | |
| help="ID вопроса для отладки") | |
| parser.add_argument("--top-n", type=int, default=20, | |
| help="Количество чанков в топе для отладки (по умолчанию: 20)") | |
| parser.add_argument("--words-per-chunk", type=int, default=50, | |
| help="Количество слов в чанке для fixed_size стратегии (по умолчанию: 50)") | |
| parser.add_argument("--overlap-words", type=int, default=25, | |
| help="Количество слов перекрытия для fixed_size стратегии (по умолчанию: 25)") | |
| return parser.parse_args() | |
| def load_questions_dataset(file_path: str) -> pd.DataFrame: | |
| """ | |
| Загружает датасет с вопросами из Excel-файла. | |
| Args: | |
| file_path: Путь к Excel-файлу | |
| Returns: | |
| DataFrame с вопросами и пунктами | |
| """ | |
| print(f"Загрузка датасета из {file_path}...") | |
| df = pd.read_excel(file_path) | |
| print(f"Загружен датасет со столбцами: {df.columns.tolist()}") | |
| # Преобразуем NaN в пустые строки для текстовых полей | |
| text_columns = ['question', 'text', 'item_type'] | |
| for col in text_columns: | |
| if col in df.columns: | |
| df[col] = df[col].fillna('') | |
| return df | |
| def load_embeddings_and_data(filename: str, output_dir: str) -> tuple[np.ndarray | None, pd.DataFrame | None]: | |
| """ | |
| Загружает эмбеддинги и соответствующие данные из файлов. | |
| Args: | |
| filename: Базовое имя файла | |
| output_dir: Директория, где хранятся файлы | |
| Returns: | |
| Кортеж (эмбеддинги, данные) или (None, None), если файлы не найдены | |
| """ | |
| embeddings_path = os.path.join(output_dir, f"{filename}_embeddings.npy") | |
| data_path = os.path.join(output_dir, f"{filename}_data.csv") | |
| if os.path.exists(embeddings_path) and os.path.exists(data_path): | |
| print(f"Загрузка данных из {embeddings_path} и {data_path}...") | |
| embeddings = np.load(embeddings_path) | |
| data = pd.read_csv(data_path) | |
| return embeddings, data | |
| print(f"Ошибка: файлы {embeddings_path} и {data_path} не найдены.") | |
| print("Сначала запустите скрипт evaluate_chunking.py для создания эмбеддингов.") | |
| sys.exit(1) | |
| def calculate_chunk_overlap(chunk_text: str, punct_text: str) -> float: | |
| """ | |
| Рассчитывает степень перекрытия между чанком и пунктом. | |
| Args: | |
| chunk_text: Текст чанка | |
| punct_text: Текст пункта | |
| Returns: | |
| Коэффициент перекрытия от 0 до 1 | |
| """ | |
| # Если чанк входит в пункт, возвращаем 1.0 (полное вхождение) | |
| if chunk_text in punct_text: | |
| return 1.0 | |
| # Если пункт входит в чанк, возвращаем соотношение длин | |
| if punct_text in chunk_text: | |
| return len(punct_text) / len(chunk_text) | |
| # Используем SequenceMatcher для нечеткого сравнения | |
| matcher = SequenceMatcher(None, chunk_text, punct_text) | |
| # Находим наибольшую общую подстроку | |
| match = matcher.find_longest_match(0, len(chunk_text), 0, len(punct_text)) | |
| # Если совпадений нет | |
| if match.size == 0: | |
| return 0.0 | |
| # Возвращаем соотношение длины совпадения к минимальной длине | |
| return match.size / min(len(chunk_text), len(punct_text)) | |
| def format_text_for_display(text: str, max_length: int = 100) -> str: | |
| """ | |
| Форматирует текст для отображения, обрезая его при необходимости. | |
| Args: | |
| text: Исходный текст | |
| max_length: Максимальная длина для отображения | |
| Returns: | |
| Отформатированный текст | |
| """ | |
| if len(text) <= max_length: | |
| return text | |
| return text[:max_length] + "..." | |
| def analyze_question( | |
| question_id: int, | |
| questions_df: pd.DataFrame, | |
| chunks_df: pd.DataFrame, | |
| question_embeddings: np.ndarray, | |
| chunk_embeddings: np.ndarray, | |
| question_id_to_idx: dict, | |
| top_n: int | |
| ) -> dict: | |
| """ | |
| Анализирует конкретный вопрос и его релевантные чанки. | |
| Args: | |
| question_id: ID вопроса для анализа | |
| questions_df: DataFrame с вопросами | |
| chunks_df: DataFrame с чанками | |
| question_embeddings: Эмбеддинги вопросов | |
| chunk_embeddings: Эмбеддинги чанков | |
| question_id_to_idx: Словарь соответствия ID вопроса и его индекса | |
| top_n: Количество чанков в топе | |
| Returns: | |
| Словарь с результатами анализа | |
| """ | |
| # Проверяем, есть ли вопрос с таким ID | |
| if question_id not in question_id_to_idx: | |
| print(f"Ошибка: вопрос с ID {question_id} не найден в данных") | |
| sys.exit(1) | |
| # Получаем строки для выбранного вопроса | |
| question_rows = questions_df[questions_df['id'] == question_id] | |
| if len(question_rows) == 0: | |
| print(f"Ошибка: вопрос с ID {question_id} не найден в исходном датасете") | |
| sys.exit(1) | |
| # Получаем текст вопроса и его индекс в массиве эмбеддингов | |
| question_text = question_rows['question'].iloc[0] | |
| question_idx = question_id_to_idx[question_id] | |
| # Получаем ожидаемые пункты для вопроса | |
| expected_puncts = question_rows['text'].tolist() | |
| # Вычисляем косинусную близость между вопросом и всеми чанками | |
| similarity = cosine_similarity([question_embeddings[question_idx]], chunk_embeddings)[0] | |
| # Получаем связанные документы, если есть | |
| related_docs = [] | |
| if 'filename' in question_rows.columns: | |
| related_docs = question_rows['filename'].unique().tolist() | |
| related_docs = [doc for doc in related_docs if doc and not pd.isna(doc)] | |
| # Результаты для всех документов | |
| all_results = [] | |
| # Обрабатываем каждый связанный документ | |
| if related_docs: | |
| for doc_name in related_docs: | |
| # Фильтруем чанки по имени документа | |
| doc_chunks = chunks_df[chunks_df['doc_name'] == doc_name] | |
| if doc_chunks.empty: | |
| continue | |
| # Индексы чанков для документа | |
| doc_chunk_indices = doc_chunks.index.tolist() | |
| # Получаем значения близости для чанков документа | |
| doc_similarities = [similarity[chunks_df.index.get_loc(idx)] for idx in doc_chunk_indices] | |
| # Создаем словарь индекс -> схожесть | |
| similarity_dict = {idx: sim for idx, sim in zip(doc_chunk_indices, doc_similarities)} | |
| # Сортируем индексы по убыванию похожести | |
| sorted_indices = sorted(similarity_dict.keys(), key=lambda x: similarity_dict[x], reverse=True) | |
| # Берем топ-N | |
| top_indices = sorted_indices[:min(top_n, len(sorted_indices))] | |
| # Получаем топ-N чанков | |
| top_chunks = chunks_df.iloc[top_indices] | |
| # Формируем результаты для документа | |
| doc_results = { | |
| 'doc_name': doc_name, | |
| 'top_chunks': [] | |
| } | |
| # Для каждого чанка | |
| for idx, chunk in top_chunks.iterrows(): | |
| # Вычисляем перекрытие с каждым пунктом | |
| overlaps = [] | |
| for punct in expected_puncts: | |
| overlap = calculate_chunk_overlap(chunk['text'], punct) | |
| overlaps.append({ | |
| 'punct': format_text_for_display(punct), | |
| 'overlap': overlap | |
| }) | |
| # Находим максимальное перекрытие | |
| max_overlap = max(overlaps, key=lambda x: x['overlap']) if overlaps else {'overlap': 0} | |
| # Добавляем в результаты | |
| doc_results['top_chunks'].append({ | |
| 'chunk_id': chunk['id'], | |
| 'chunk_text': format_text_for_display(chunk['text']), | |
| 'similarity': similarity_dict[idx], | |
| 'overlaps': overlaps, | |
| 'max_overlap': max_overlap['overlap'], | |
| 'is_relevant': max_overlap['overlap'] >= THRESHOLD # Используем порог 0.7 | |
| }) | |
| all_results.append(doc_results) | |
| else: | |
| # Если нет связанных документов, анализируем чанки из всех документов | |
| # Получаем индексы для топ-N чанков по близости | |
| top_indices = np.argsort(similarity)[-top_n:][::-1] | |
| # Получаем топ-N чанков | |
| top_chunks = chunks_df.iloc[top_indices] | |
| # Группируем чанки по документам | |
| doc_groups = top_chunks.groupby('doc_name') | |
| for doc_name, group in doc_groups: | |
| doc_results = { | |
| 'doc_name': doc_name, | |
| 'top_chunks': [] | |
| } | |
| for idx, chunk in group.iterrows(): | |
| # Вычисляем перекрытие с каждым пунктом | |
| overlaps = [] | |
| for punct in expected_puncts: | |
| overlap = calculate_chunk_overlap(chunk['text'], punct) | |
| overlaps.append({ | |
| 'punct': format_text_for_display(punct), | |
| 'overlap': overlap | |
| }) | |
| # Находим максимальное перекрытие | |
| max_overlap = max(overlaps, key=lambda x: x['overlap']) if overlaps else {'overlap': 0} | |
| # Добавляем в результаты | |
| doc_results['top_chunks'].append({ | |
| 'chunk_id': chunk['id'], | |
| 'chunk_text': format_text_for_display(chunk['text']), | |
| 'similarity': similarity[chunks_df.index.get_loc(idx)], | |
| 'overlaps': overlaps, | |
| 'max_overlap': max_overlap['overlap'], | |
| 'is_relevant': max_overlap['overlap'] >= THRESHOLD # Используем порог 0.7 | |
| }) | |
| all_results.append(doc_results) | |
| # Формируем общие результаты для вопроса | |
| results = { | |
| 'question_id': question_id, | |
| 'question_text': question_text, | |
| 'expected_puncts': [format_text_for_display(punct) for punct in expected_puncts], | |
| 'related_docs': related_docs, | |
| 'results_by_doc': all_results | |
| } | |
| return results | |
| def main(): | |
| """ | |
| Основная функция скрипта. | |
| """ | |
| args = parse_args() | |
| # Загружаем датасет с вопросами | |
| questions_df = load_questions_dataset(args.dataset_path) | |
| # Формируем уникальное имя для сохраненных файлов на основе параметров стратегии и модели | |
| strategy_config_str = f"fixed_size_w{args.words_per_chunk}_o{args.overlap_words}" | |
| chunks_filename = f"chunks_{strategy_config_str}_{args.model_name.replace('/', '_')}" | |
| questions_filename = f"questions_{args.model_name.replace('/', '_')}" | |
| # Загружаем сохраненные эмбеддинги и данные | |
| chunk_embeddings, chunks_df = load_embeddings_and_data(chunks_filename, args.output_dir) | |
| question_embeddings, questions_df_with_embeddings = load_embeddings_and_data(questions_filename, args.output_dir) | |
| # Создаем словарь соответствия id вопроса и его индекса в эмбеддингах | |
| question_id_to_idx = { | |
| int(row['id']): i | |
| for i, (_, row) in enumerate(questions_df_with_embeddings.iterrows()) | |
| } | |
| # Анализируем выбранный вопрос для указанного top_n | |
| results = analyze_question( | |
| args.question_id, | |
| questions_df, | |
| chunks_df, | |
| question_embeddings, | |
| chunk_embeddings, | |
| question_id_to_idx, | |
| args.top_n | |
| ) | |
| # Сохраняем результаты в JSON файл | |
| output_filename = f"debug_question_{args.question_id}_top{args.top_n}.json" | |
| output_path = os.path.join(args.output_dir, output_filename) | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| json.dump(results, f, ensure_ascii=False, indent=2) | |
| print(f"Результаты сохранены в {output_path}") | |
| # Выводим краткую информацию | |
| print(f"\nАнализ вопроса ID {args.question_id}: {results['question_text']}") | |
| print(f"Ожидаемые пункты: {len(results['expected_puncts'])}") | |
| print(f"Связанные документы: {results['related_docs']}") | |
| # Статистика релевантности | |
| relevant_chunks = 0 | |
| total_chunks = 0 | |
| for doc_result in results['results_by_doc']: | |
| doc_relevant = sum(1 for chunk in doc_result['top_chunks'] if chunk['is_relevant']) | |
| doc_total = len(doc_result['top_chunks']) | |
| print(f"\nДокумент: {doc_result['doc_name']}") | |
| print(f"Релевантных чанков: {doc_relevant} из {doc_total} ({doc_relevant/doc_total*100:.1f}%)") | |
| relevant_chunks += doc_relevant | |
| total_chunks += doc_total | |
| if total_chunks > 0: | |
| print(f"\nОбщая точность: {relevant_chunks/total_chunks*100:.1f}%") | |
| else: | |
| print("\nНе найдено чанков для анализа") | |
| if __name__ == "__main__": | |
| main() |