Spaces:
Running
Running
| import pandas as pd | |
| import gradio as gr | |
| import csv | |
| import json | |
| import os | |
| import shutil | |
| from huggingface_hub import Repository | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| SUBJECTS = ["Classification", "VQA", "Retrieval", "Grounding"] | |
| MODEL_INFO = [ | |
| "Models", "Model Size(B)", "Data Source", | |
| "Overall", "IND", "OOD", | |
| "Classification", "VQA", "Retrieval", "Grounding" | |
| ] | |
| DATA_TITLE_TYPE = ['markdown', 'str', 'markdown', 'number', 'number', 'number', 'number', 'number', 'number', 'number'] | |
| # TODO: submission process not implemented yet | |
| SUBMISSION_NAME = "" | |
| SUBMISSION_URL = "" | |
| CSV_DIR = "results.csv" # TODO: Temporary file, to be updated with the actual file | |
| COLUMN_NAMES = MODEL_INFO | |
| LEADERBOARD_INTRODUCTION = """# MMEB Leaderboard | |
| ## Introduction | |
| We introduce MMEB, a benchmark for multimodal evaluation of models. The benchmark consists of four tasks: Classification, VQA, Retrieval, and Grounding. Models are evaluated based on 36 datasets. | |
| """ | |
| TABLE_INTRODUCTION = """""" | |
| LEADERBOARD_INFO = """ | |
| ## Dataset Summary | |
| """ | |
| CITATION_BUTTON_LABEL = "Copy the following snippet to cite these results" | |
| CITATION_BUTTON_TEXT = """""" | |
| SUBMIT_INTRODUCTION = """# Submit on MMEB Leaderboard Introduction | |
| ## ⚠ Please note that you need to submit the JSON file with the following format: | |
| ```json | |
| [ | |
| { | |
| "question_id": 123, | |
| "question": "abc", | |
| "options": ["abc", "xyz", ...], | |
| "answer": "ABC", | |
| "answer_index": 1, | |
| "category": "abc, | |
| "pred": "B", | |
| "model_outputs": "" | |
| }, ... | |
| ] | |
| ``` | |
| ... | |
| """ | |
| def get_df(): | |
| # TODO: Update this after the hf dataset has been created! | |
| # repo = Repository(local_dir=SUBMISSION_NAME, clone_from=SUBMISSION_URL, use_auth_token=HF_TOKEN) | |
| # repo.git_pull() | |
| df = pd.read_csv(CSV_DIR) | |
| df['Model Size(B)'] = df['Model Size(B)'].apply(process_model_size) | |
| df = df.sort_values(by=['Overall'], ascending=False) | |
| return df | |
| def add_new_eval( | |
| input_file, | |
| ): | |
| if input_file is None: | |
| return "Error! Empty file!" | |
| upload_data = json.loads(input_file) | |
| print("upload_data:\n", upload_data) | |
| data_row = [f'{upload_data["Model"]}', upload_data['Overall']] | |
| for subject in SUBJECTS: | |
| data_row += [upload_data[subject]] | |
| print("data_row:\n", data_row) | |
| submission_repo = Repository(local_dir=SUBMISSION_NAME, clone_from=SUBMISSION_URL, | |
| use_auth_token=HF_TOKEN, repo_type="dataset") | |
| submission_repo.git_pull() | |
| already_submitted = [] | |
| with open(CSV_DIR, mode='r') as file: | |
| reader = csv.reader(file, delimiter=',') | |
| for row in reader: | |
| already_submitted.append(row[0]) | |
| if data_row[0] not in already_submitted: | |
| with open(CSV_DIR, mode='a', newline='') as file: | |
| writer = csv.writer(file) | |
| writer.writerow(data_row) | |
| submission_repo.push_to_hub() | |
| print('Submission Successful') | |
| else: | |
| print('The entry already exists') | |
| def refresh_data(): | |
| df = get_df() | |
| return df[COLUMN_NAMES] | |
| def search_and_filter_models(df, query, min_size, max_size): | |
| filtered_df = df.copy() | |
| if query: | |
| filtered_df = filtered_df[filtered_df['Models'].str.contains(query, case=False, na=False)] | |
| size_mask = filtered_df['Model Size(B)'].apply(lambda x: | |
| (min_size <= 1000.0 <= max_size) if x == 'unknown' | |
| else (min_size <= x <= max_size)) | |
| filtered_df = filtered_df[size_mask] | |
| return filtered_df[COLUMN_NAMES] | |
| # def search_and_filter_models(df, query, min_size, max_size): | |
| # filtered_df = df.copy() | |
| # if query: | |
| # filtered_df = filtered_df[filtered_df['Models'].str.contains(query, case=False, na=False)] | |
| # def size_filter(x): | |
| # if isinstance(x, (int, float)): | |
| # return min_size <= x <= max_size | |
| # return True | |
| # filtered_df = filtered_df[filtered_df['Model Size(B)'].apply(size_filter)] | |
| # return filtered_df[COLUMN_NAMES] | |
| def search_models(df, query): | |
| if query: | |
| return df[df['Models'].str.contains(query, case=False, na=False)] | |
| return df | |
| # def get_size_range(df): | |
| # numeric_sizes = df[df['Model Size(B)'].apply(lambda x: isinstance(x, (int, float)))]['Model Size(B)'] | |
| # if len(numeric_sizes) > 0: | |
| # return float(numeric_sizes.min()), float(numeric_sizes.max()) | |
| # return 0, 1000 | |
| def get_size_range(df): | |
| sizes = df['Model Size(B)'].apply(lambda x: 1000.0 if x == 'unknown' else x) | |
| return float(sizes.min()), float(sizes.max()) | |
| def process_model_size(size): | |
| if pd.isna(size) or size == 'unk': | |
| return 'unknown' | |
| try: | |
| val = float(size) | |
| return val | |
| except (ValueError, TypeError): | |
| return 'unknown' | |
| def filter_columns_by_subjects(df, selected_subjects=None): | |
| if selected_subjects is None or len(selected_subjects) == 0: | |
| return df[COLUMN_NAMES] | |
| base_columns = ['Models', 'Model Size(B)', 'Data Source', 'Overall'] | |
| selected_columns = base_columns + selected_subjects | |
| available_columns = [col for col in selected_columns if col in df.columns] | |
| return df[available_columns] | |
| def get_subject_choices(): | |
| return SUBJECTS | |