Spaces:
Running
Running
| import os | |
| import json | |
| import asyncio | |
| import aiofiles | |
| from agent_monitor.monitor import analyze_agent_steps | |
| from agent_monitor.failure_report import analyze_agent_performance, AsyncOpenAIClient | |
| import traceback | |
| from tqdm import tqdm | |
| async def check_and_process_uploads(): | |
| upload_dir = "evals_upload" | |
| processed_dir = "evals_processed" | |
| live_dir = "evals_live" | |
| new_uploads = [f for f in os.listdir(upload_dir) if f.endswith('.json')] | |
| if not new_uploads: | |
| print("No new uploads found.") | |
| return | |
| # check for all new uploads whether they are already in live or processed directory | |
| # Also check whether the files are actually identical | |
| unprocessed_uploads = [] | |
| for upload in new_uploads: | |
| upload_path = os.path.join(upload_dir, upload) | |
| processed_path = os.path.join(processed_dir, upload) | |
| live_path = os.path.join(live_dir, upload) | |
| if not os.path.exists(live_path) and not os.path.exists(processed_path): | |
| unprocessed_uploads.append(upload) | |
| elif os.path.exists(processed_path): | |
| print(f"Upload {upload} is already in processed directory.") | |
| elif os.path.exists(live_path): | |
| print(f"Upload {upload} is already in live directory.") | |
| else: | |
| unprocessed_uploads.append(upload) | |
| print(f"Processing {len(unprocessed_uploads)} new uploads.") | |
| tasks = [] | |
| for upload in tqdm(unprocessed_uploads): | |
| upload_path = os.path.join(upload_dir, upload) | |
| processed_path = os.path.join(processed_dir, upload) | |
| # tasks.append(process_single_upload(upload_path, processed_path)) # for async processing | |
| await process_single_upload(upload_path, processed_path) | |
| # await asyncio.gather(*tasks) # for async processing | |
| async def process_single_upload(upload_path, processed_path): | |
| # Check the structure of the upload | |
| check_result = await check_upload_structure(upload_path) | |
| if check_result['is_valid']: | |
| # Process the file | |
| await process_upload(upload_path, processed_path) | |
| # Move the file to processed directory | |
| # await asyncio.to_thread(shutil.move, upload_path, processed_path) | |
| else: | |
| print(f"Upload check failed for {upload_path}: {check_result['message']}") | |
| async def check_upload_structure(file_path): | |
| try: | |
| async with aiofiles.open(file_path, 'r') as f: | |
| data = json.loads(await f.read()) | |
| # Check for required keys | |
| required_keys = ['config', 'results', 'raw_eval_results', 'raw_logging_results'] | |
| missing_keys = [key for key in required_keys if key not in data] | |
| if missing_keys: | |
| return {'is_valid': False, 'message': f"Missing required keys: {', '.join(missing_keys)}"} | |
| # Check for specific structure in raw_logging_results | |
| if not isinstance(data['raw_logging_results'], list) and not "inspect" in data['config']['benchmark_name']: | |
| return {'is_valid': False, 'message': "raw_logging_results should be a list"} | |
| if "inspect" not in data['config']['benchmark_name']: | |
| for item in data['raw_logging_results']: | |
| if not all(key in item for key in ['weave_task_id', 'inputs', 'outputs']): | |
| return {'is_valid': False, 'message': "Each item in raw_logging_results should have weave_task_id, inputs, and outputs"} | |
| return {'is_valid': True, 'message': "File structure is valid"} | |
| except json.JSONDecodeError: | |
| return {'is_valid': False, 'message': "Invalid JSON format"} | |
| except Exception as e: | |
| return {'is_valid': False, 'message': f"Unexpected error: {str(e)}"} | |
| async def process_upload(input_path, output_path): | |
| print(f"Processing {input_path}...") | |
| # load the file | |
| with open(input_path, 'r') as f: | |
| data = json.loads(f.read()) | |
| assert 'raw_logging_results' in data, "raw_logging_results key not found in the file" | |
| try: | |
| if isinstance(data['raw_logging_results'], list): | |
| openai_client = AsyncOpenAIClient(model="gpt-4o-mini") | |
| processed_calls = await analyze_agent_steps(data['raw_logging_results'], openai_client, llm_eval=False) | |
| else: | |
| processed_calls = data['raw_logging_results'] | |
| # # experimental | |
| # failure_report = await analyze_agent_performance(data['raw_logging_results'], data['results']['failed_tasks'], openai_client) | |
| data['raw_logging_results'] = processed_calls | |
| data['failure_report'] = None | |
| except Exception as e: | |
| traceback.print_exc() | |
| print(f"Error in processing: {str(e)}") | |
| return | |
| with open(output_path, 'w') as f: | |
| json.dump(data, f, indent=4) | |
| print(f"Processing of {input_path} successful. Results saved to {output_path}") | |
| if __name__ == "__main__": | |
| # process single upload for testing | |
| asyncio.run(process_single_upload("evals_upload/inspect_evalsswe_bench_1729538131_UPLOAD.json", "evals_processed/inspect_evalsswe_bench_1729538131_UPLOAD.json")) |