Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Скрипт для запуска множества пайплайнов оценки (`pipeline.py`) | |
| с различными комбинациями параметров. | |
| Собирает команды для `pipeline.py` и запускает их последовательно, | |
| логируя вывод каждого запуска. | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import pathlib | |
| import subprocess | |
| import sys | |
| import time | |
| from datetime import datetime | |
| from itertools import product | |
| from uuid import uuid4 | |
| # --- Конфигурация Экспериментов --- | |
| # Модели для тестирования | |
| MODELS_TO_TEST = [ | |
| # "intfloat/e5-base", | |
| # "intfloat/e5-large", | |
| "BAAI/bge-m3", | |
| # "deepvk/USER-bge-m3" | |
| # "ai-forever/FRIDA" # Требует --use-sentence-transformers | |
| ] | |
| # Параметры чанкинга (слова / перекрытие) | |
| CHUNKING_PARAMS = [ | |
| # Пример для стратегии "fixed_size" | |
| {"strategy": "fixed_size", "params": {"words_per_chunk": 50, "overlap_words": 25}}, | |
| # {"strategy": "fixed_size", "params": {"words_per_chunk": 100, "overlap_words": 25}}, | |
| # {"strategy": "fixed_size", "params": {"words_per_chunk": 50, "overlap_words": 0}}, | |
| # TODO: Добавить другие стратегии и их параметры, если нужно | |
| # {"strategy": "some_other_strategy", "params": {"param1": "value1"}} | |
| ] | |
| # Значения Top-N для ретривера | |
| TOP_N_VALUES = [20, 50, 100] | |
| # Использовать ли сборку контекста (InjectionBuilder) | |
| USE_INJECTION_OPTIONS = [False, True] | |
| # Порог схожести для fuzzy сравнения (чанк/пункт) | |
| SIMILARITY_THRESHOLDS = [0.7] | |
| # Опции использования Query Expansion | |
| USE_QE_OPTIONS = [False, True] | |
| # Опции обработки таблиц | |
| PROCESS_TABLES_OPTIONS = [True] | |
| # Опции включения соседей | |
| INCLUDE_NEIGHBORS_OPTIONS = [True] | |
| # --- Настройки Скрипта --- | |
| DEFAULT_LOG_DIR = "logs" # Директория для логов отдельных запусков pipeline.py | |
| DEFAULT_INTERMEDIATE_DIR = "data/intermediate" # Куда pipeline.py сохраняет свои результаты | |
| DEFAULT_PYTHON_EXECUTABLE = sys.executable # Использовать тот же python, что и для запуска этого скрипта | |
| def parse_args(): | |
| """Парсит аргументы командной строки.""" | |
| parser = argparse.ArgumentParser(description="Запуск серии оценочных пайплайнов") | |
| # Флаги для пропуска определенных измерений | |
| parser.add_argument("--skip-models", action="store_true", | |
| help="Пропустить итерацию по разным моделям (использовать первую в списке)") | |
| parser.add_argument("--skip-chunking", action="store_true", | |
| help="Пропустить итерацию по разным параметрам чанкинга (использовать первую в списке)") | |
| parser.add_argument("--skip-top-n", action="store_true", | |
| help="Пропустить итерацию по разным top_n (использовать первое значение)") | |
| parser.add_argument("--skip-injection", action="store_true", | |
| help="Пропустить итерацию по опциям сборки контекста (использовать False)") | |
| parser.add_argument("--skip-thresholds", action="store_true", | |
| help="Пропустить итерацию по порогам схожести (использовать первый)") | |
| parser.add_argument("--skip-process-tables", action="store_true", | |
| help="Пропустить итерацию по обработке таблиц (использовать True)") | |
| parser.add_argument("--skip-include-neighbors", action="store_true", | |
| help="Пропустить итерацию по включению соседей (использовать False)") | |
| parser.add_argument("--skip-qe", action="store_true", | |
| help="Пропустить итерацию по использованию Query Expansion (использовать False)") | |
| # Настройки путей и выполнения | |
| parser.add_argument("--log-dir", type=str, default=DEFAULT_LOG_DIR, | |
| help=f"Директория для сохранения логов запусков (по умолчанию: {DEFAULT_LOG_DIR})") | |
| parser.add_argument("--intermediate-dir", type=str, default=DEFAULT_INTERMEDIATE_DIR, | |
| help=f"Директория для промежуточных результатов pipeline.py (по умолчанию: {DEFAULT_INTERMEDIATE_DIR})") | |
| parser.add_argument("--device", type=str, default="cuda:0", | |
| help="Устройство для вычислений в pipeline.py (напр., cpu, cuda:0)") | |
| parser.add_argument("--python-executable", type=str, default=DEFAULT_PYTHON_EXECUTABLE, | |
| help="Путь к интерпретатору Python для запуска pipeline.py") | |
| # Параметры, передаваемые в pipeline.py (если не перебираются) | |
| parser.add_argument("--data-folder", type=str, default="data/input/docs", help="Папка с документами для pipeline.py") | |
| parser.add_argument("--search-dataset-path", type=str, default="data/input/search_dataset_text.xlsx", help="Поисковый датасет для pipeline.py") | |
| parser.add_argument("--qa-dataset-path", type=str, default="data/input/question_answering.xlsx", help="QA датасет для pipeline.py") | |
| return parser.parse_args() | |
| def run_single_pipeline(cmd: list[str], log_path: str): | |
| """ | |
| Запускает один экземпляр pipeline.py и логирует его вывод. | |
| Args: | |
| cmd: Список аргументов команды для subprocess. | |
| log_path: Путь к файлу для сохранения лога. | |
| Returns: | |
| Код возврата процесса. | |
| """ | |
| print(f"\n--- Запуск: {' '.join(cmd)} ---") | |
| print(f"--- Лог: {log_path} --- ") | |
| start_time = time.time() | |
| return_code = -1 | |
| try: | |
| with open(log_path, "w", encoding="utf-8") as log_file: | |
| log_file.write(f"Команда: {' '.join(cmd)}\n") | |
| log_file.write(f"Время запуска: {datetime.now()}\n\n") | |
| log_file.flush() | |
| # Запускаем процесс | |
| process = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, # Перенаправляем stderr в stdout | |
| text=True, | |
| encoding='utf-8', # Указываем кодировку | |
| errors='replace', # Заменяем ошибки кодирования | |
| bufsize=1 # Построчная буферизация | |
| ) | |
| # Читаем и пишем вывод построчно | |
| for line in process.stdout: | |
| print(line, end="") # Выводим в консоль | |
| log_file.write(line) # Пишем в лог | |
| log_file.flush() | |
| # Ждем завершения и получаем код возврата | |
| process.wait() | |
| return_code = process.returncode | |
| except Exception as e: | |
| print(f"\nОшибка при запуске процесса: {e}") | |
| with open(log_path, "a", encoding="utf-8") as log_file: | |
| log_file.write(f"\nОшибка при запуске: {e}\n") | |
| return_code = 1 # Считаем ошибкой | |
| end_time = time.time() | |
| duration = end_time - start_time | |
| result_message = f"Успешно завершено за {duration:.2f} сек." | |
| if return_code != 0: | |
| result_message = f"Завершено с ошибкой (код {return_code}) за {duration:.2f} сек." | |
| print(f"--- {result_message} ---") | |
| with open(log_path, "a", encoding="utf-8") as log_file: | |
| log_file.write(f"\nВремя завершения: {datetime.now()}") | |
| log_file.write(f"\nДлительность: {duration:.2f} сек.") | |
| log_file.write(f"\nКод возврата: {return_code}\n") | |
| return return_code | |
| def main(): | |
| """Основная функция скрипта.""" | |
| args = parse_args() | |
| # --- Генерируем ID для всей серии запусков --- | |
| batch_run_id = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| print(f"Запуск серии экспериментов. Batch ID: {batch_run_id}") | |
| # Создаем директории для логов и промежуточных результатов | |
| os.makedirs(args.log_dir, exist_ok=True) | |
| os.makedirs(args.intermediate_dir, exist_ok=True) | |
| # Определяем абсолютный путь к pipeline.py | |
| RUN_PIPELINES_SCRIPT_PATH = pathlib.Path(__file__).resolve() | |
| SCRIPTS_TESTING_DIR = RUN_PIPELINES_SCRIPT_PATH.parent | |
| PIPELINE_SCRIPT_PATH = SCRIPTS_TESTING_DIR / "pipeline.py" | |
| # --- Определяем параметры для перебора --- | |
| models = [MODELS_TO_TEST[0]] if args.skip_models else MODELS_TO_TEST | |
| chunking_configs = [CHUNKING_PARAMS[0]] if args.skip_chunking else CHUNKING_PARAMS | |
| top_n_list = [TOP_N_VALUES[0]] if args.skip_top_n else TOP_N_VALUES | |
| use_injection_list = [False] if args.skip_injection else USE_INJECTION_OPTIONS | |
| threshold_list = [SIMILARITY_THRESHOLDS[0]] if args.skip_thresholds else SIMILARITY_THRESHOLDS | |
| # Определяем списки для новых измерений | |
| process_tables_list = [PROCESS_TABLES_OPTIONS[0]] if args.skip_process_tables else PROCESS_TABLES_OPTIONS | |
| include_neighbors_list = [INCLUDE_NEIGHBORS_OPTIONS[0]] if args.skip_include_neighbors else INCLUDE_NEIGHBORS_OPTIONS | |
| use_qe_list = [USE_QE_OPTIONS[0]] if args.skip_qe else USE_QE_OPTIONS | |
| # --- Создаем список всех комбинаций параметров --- | |
| parameter_combinations = list(product( | |
| models, | |
| chunking_configs, | |
| top_n_list, | |
| use_injection_list, | |
| threshold_list, | |
| process_tables_list, | |
| include_neighbors_list, | |
| use_qe_list | |
| )) | |
| total_runs = len(parameter_combinations) | |
| print(f"Всего запланировано запусков: {total_runs}") | |
| # --- Запускаем пайплайны для каждой комбинации --- | |
| completed_runs = 0 | |
| failed_runs = 0 | |
| start_time_all = time.time() | |
| for i, (model, chunk_cfg, top_n, use_injection, threshold, process_tables, include_neighbors, use_qe) in enumerate(parameter_combinations): | |
| print(f"\n{'='*80}") | |
| print(f"Запуск {i+1}/{total_runs}") | |
| print(f" Модель: {model}") | |
| # Логируем параметры чанкинга | |
| strategy = chunk_cfg['strategy'] | |
| params = chunk_cfg['params'] | |
| params_str = json.dumps(params, ensure_ascii=False) | |
| print(f" Чанкинг: Стратегия='{strategy}', Параметры={params_str}") | |
| print(f" Обработка таблиц: {process_tables}") | |
| print(f" Top-N: {top_n}") | |
| print(f" Сборка контекста: {use_injection}") | |
| print(f" Query Expansion: {use_qe}") | |
| print(f" Включение соседей: {include_neighbors}") | |
| print(f" Порог схожести: {threshold}") | |
| print(f"{'='*80}") | |
| # Генерируем уникальный ID для этого запуска | |
| run_id = f"run_{datetime.now().strftime('%Y%m%d%H%M%S')}_{uuid4().hex[:8]}" | |
| # Формируем команду для pipeline.py | |
| cmd = [ | |
| args.python_executable, | |
| str(PIPELINE_SCRIPT_PATH), # Используем абсолютный путь | |
| "--run-id", run_id, | |
| "--batch-id", batch_run_id, | |
| "--data-folder", args.data_folder, | |
| "--search-dataset-path", args.search_dataset_path, | |
| "--output-dir", args.intermediate_dir, | |
| "--model-name", model, | |
| "--chunking-strategy", strategy, | |
| "--strategy-params", params_str, | |
| "--top-n", str(top_n), | |
| "--similarity-threshold", str(threshold), | |
| "--device", args.device, | |
| ] | |
| # Добавляем флаг --use-injection, если нужно | |
| if use_injection: | |
| cmd.append("--use-injection") | |
| # Добавляем флаг --no-process-tables, если process_tables == False | |
| if not process_tables: | |
| cmd.append("--no-process-tables") | |
| # Добавляем флаг --include-neighbors, если include_neighbors == True | |
| if include_neighbors: | |
| cmd.append("--include-neighbors") | |
| # Добавляем флаг --use-qe, если use_qe == True | |
| if use_qe: | |
| cmd.append("--use-qe") | |
| # Добавляем флаг --use-sentence-transformers для определенных моделей | |
| if "FRIDA" in model or "sentence-transformer" in model.lower(): # Пример | |
| cmd.append("--use-sentence-transformers") | |
| # Формируем путь к лог-файлу | |
| log_filename = f"{run_id}_log.txt" | |
| log_path = os.path.join(args.log_dir, log_filename) | |
| # Запускаем пайплайн | |
| return_code = run_single_pipeline(cmd, log_path) | |
| if return_code == 0: | |
| completed_runs += 1 | |
| else: | |
| failed_runs += 1 | |
| print(f"*** ВНИМАНИЕ: Запуск {i+1} завершился с ошибкой! Лог: {log_path} ***") | |
| # --- Вывод итоговой статистики --- | |
| end_time_all = time.time() | |
| total_duration = end_time_all - start_time_all | |
| print(f"\n{'='*80}") | |
| print("Все запуски завершены.") | |
| print(f"Общее время выполнения: {total_duration:.2f} сек ({total_duration/60:.2f} мин)") | |
| print(f"Всего запусков: {total_runs}") | |
| print(f"Успешно завершено: {completed_runs}") | |
| print(f"Завершено с ошибками: {failed_runs}") | |
| print(f"Промежуточные результаты сохранены в: {args.intermediate_dir}") | |
| print(f"Логи запусков сохранены в: {args.log_dir}") | |
| print(f"{'='*80}") | |
| if __name__ == "__main__": | |
| main() | |