Spaces:
Running
Running
| import json | |
| import csv | |
| import os | |
| from dataclasses import dataclass | |
| import dateutil | |
| import numpy as np | |
| from src.display.utils import AutoEvalColumn, Tasks | |
| class EvalResult: | |
| """Represents one full evaluation. Built from a combination of the result and request file for a given run. | |
| """ | |
| eval_name: str # org_model_precision (uid) | |
| full_model: str # org/model (path on hub) | |
| results: dict | |
| date: str = "" # submission date of request file | |
| modelmap = {} | |
| def init_model_map(self, mapfile): | |
| with open(mapfile) as f: | |
| reader = csv.reader(f) | |
| for row in reader: | |
| self.modelmap[row[0]] = row[1] | |
| def init_from_json_file(self, json_filepath): | |
| """Inits the result from the specific model result file""" | |
| with open(json_filepath) as fp: | |
| data = json.load(fp) | |
| env_info = data.get("environment_info").get("parsed_arguments") | |
| full_model = env_info.get("model") | |
| # Use the display name, if available | |
| full_model = self.modelmap.get(full_model,full_model) | |
| # Extract results available in this file (some results are split in several files) | |
| results = {} | |
| for task in Tasks: | |
| task = task.value | |
| # We average all scores of a given metric (not all metrics are present in all files) | |
| accs = np.array([v.get(task.metric, None) for k, v in data["results"].items() if task.benchmark == k]) | |
| if accs.size == 0 or any([acc is None for acc in accs]): | |
| continue | |
| mean_acc = np.mean(accs) * 100.0 | |
| results[task.benchmark] = mean_acc | |
| return self( | |
| eval_name=full_model, | |
| full_model=full_model, | |
| results=results, | |
| ) | |
| def to_dict(self): | |
| """Converts the Eval Result to a dict compatible with our dataframe display""" | |
| average = sum([v for v in self.results.values() if v is not None]) / len(Tasks) | |
| data_dict = { | |
| "eval_name": self.eval_name, # not a column, just a save name, | |
| AutoEvalColumn.model.name: self.full_model, | |
| AutoEvalColumn.average.name: average, | |
| } | |
| for task in Tasks: | |
| data_dict[task.value.col_name] = self.results[task.value.benchmark] | |
| return data_dict | |
| def get_raw_eval_results(results_path: str) -> list[EvalResult]: | |
| """From the path of the results folder root, extract all needed info for results""" | |
| model_result_filepaths = [] | |
| EvalResult.init_model_map(results_path+"/modelmap.csv") | |
| for root, _, files in os.walk(results_path): | |
| # We should only have json files in model results | |
| # if len(files) == 0 or any([not f.endswith(".json") for f in files]): | |
| # continue | |
| # skip anything not results | |
| files = [f for f in files if (f.endswith("_evaluation_results.json"))] | |
| # Sort the files by date | |
| try: | |
| files.sort(key=lambda x: x.removesuffix("_evaluation_results.json")) | |
| except dateutil.parser._parser.ParserError: | |
| files = [files[-1]] | |
| for file in files: | |
| model_result_filepaths.append(os.path.join(root, file)) | |
| eval_results = {} | |
| for model_result_filepath in model_result_filepaths: | |
| # Creation of result | |
| eval_result = EvalResult.init_from_json_file(model_result_filepath) | |
| # eval_result.update_with_request_file(requests_path) | |
| # Store results of same eval together | |
| eval_name = eval_result.eval_name | |
| if eval_name in eval_results.keys(): | |
| eval_results[eval_name].results.update({k: v for k, v in eval_result.results.items() if v is not None}) | |
| else: | |
| eval_results[eval_name] = eval_result | |
| results = [] | |
| for v in eval_results.values(): | |
| try: | |
| v.to_dict() # we test if the dict version is complete | |
| results.append(v) | |
| except KeyError: # not all eval values present | |
| continue | |
| return results | |