Spaces:
Running
Running
| from pathlib import Path | |
| import json | |
| import os | |
| import gradio as gr | |
| from huggingface_hub import snapshot_download | |
| from gradio_leaderboard import Leaderboard, SelectColumns | |
| import pandas as pd | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| from ttsds.benchmarks.benchmark import BenchmarkCategory | |
| from ttsds import BenchmarkSuite | |
| from src.envs import API, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, QUEUE_REPO, REPO_ID, RESULTS_REPO, TOKEN, TAGS | |
| from src.texts import LLM_BENCHMARKS_TEXT, EVALUATION_QUEUE_TEXT, CITATION_TEXT | |
| from src.css_html_js import custom_css | |
| def filter_dfs(tags, lb): | |
| global f_b_df, f_a_df | |
| is_agg = False | |
| if "Environment" in lb.columns: | |
| is_agg = True | |
| if is_agg: | |
| lb = f_a_df.copy() | |
| else: | |
| lb = f_b_df.copy() | |
| if tags and len(lb) > 0: | |
| lb = lb[lb["Tags"].apply(lambda x: any(tag in x for tag in tags))] | |
| lb = rounded_df(lb) | |
| return lb | |
| def change_mean(env, lb): | |
| global f_b_df, f_a_df | |
| lb = f_a_df.copy() | |
| if env: | |
| mean_cols = [col for col in lb.columns if str(col) not in ["Mean", "Environment", "Model", "Tags"]] | |
| else: | |
| mean_cols = [col for col in lb.columns if str(col) not in ["Mean", "Model", "Tags"]] | |
| lb["Mean"] = lb[mean_cols].mean(axis=1) | |
| lb = rounded_df(lb) | |
| return lb | |
| def restart_space(): | |
| API.restart_space(repo_id=REPO_ID) | |
| def submit_eval(model_name, model_tags, web_url, hf_url, code_url, paper_url, inference_details, file_path): | |
| model_id = model_name.lower().replace(" ", "_") | |
| # check if model already exists | |
| if Path(f"{EVAL_REQUESTS_PATH}/{model_id}.json").exists(): | |
| return "Model already exists in the evaluation queue" | |
| # check which urls are valid | |
| if web_url and not web_url.startswith("http"): | |
| return "Please enter a valid URL" | |
| if hf_url and not hf_url.startswith("http"): | |
| return "Please enter a valid URL" | |
| if code_url and not code_url.startswith("http"): | |
| return "Please enter a valid URL" | |
| if paper_url and not paper_url.startswith("http"): | |
| return "Please enter a valid URL" | |
| # move file to correct location | |
| if not file_path.endswith(".tar.gz"): | |
| return "Please upload a .tar.gz file" | |
| Path(file_path).rename(f"{EVAL_REQUESTS_PATH}/{model_id}.tar.gz") | |
| # build display name - use web_url to link text if available, and emojis for the other urls | |
| display_name = model_name + " " | |
| if web_url: | |
| display_name = f"[{display_name}]({web_url}) " | |
| if hf_url: | |
| display_name += f"[π€]({hf_url})" | |
| if code_url: | |
| display_name += f"[π»]({code_url})" | |
| if paper_url: | |
| display_name += f"[π]({paper_url})" | |
| request_obj = { | |
| "model_name": model_name, | |
| "display_name": display_name, | |
| "model_tags": model_tags, | |
| "web_url": web_url, | |
| "hf_url": hf_url, | |
| "code_url": code_url, | |
| "paper_url": paper_url, | |
| "inference_details": inference_details, | |
| "status": "pending", | |
| } | |
| try: | |
| with open(f"{EVAL_REQUESTS_PATH}/{model_id}.json", "w") as f: | |
| json.dump(request_obj, f) | |
| API.upload_file( | |
| path_or_fileobj=f"{EVAL_REQUESTS_PATH}/{model_id}.json", | |
| path_in_repo=f"{model_id}.json", | |
| repo_id=QUEUE_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Add {model_name} to evaluation queue", | |
| ) | |
| API.upload_file( | |
| path_or_fileobj=f"{EVAL_REQUESTS_PATH}/{model_id}.tar.gz", | |
| path_in_repo=f"{model_id}.tar.gz", | |
| repo_id=QUEUE_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Add {model_name} to evaluation queue", | |
| ) | |
| except error as e: | |
| os.remove(f"{EVAL_REQUESTS_PATH}/{model_id}.json") | |
| return f"Error: {e}" | |
| return "Model submitted successfully π" | |
| ### Space initialisation | |
| try: | |
| print(EVAL_REQUESTS_PATH) | |
| snapshot_download( | |
| repo_id=QUEUE_REPO, | |
| local_dir=EVAL_REQUESTS_PATH, | |
| repo_type="dataset", | |
| tqdm_class=None, | |
| etag_timeout=30, | |
| token=TOKEN, | |
| ) | |
| except Exception: | |
| restart_space() | |
| try: | |
| print(EVAL_RESULTS_PATH) | |
| snapshot_download( | |
| repo_id=RESULTS_REPO, | |
| local_dir=EVAL_RESULTS_PATH, | |
| repo_type="dataset", | |
| tqdm_class=None, | |
| etag_timeout=30, | |
| token=TOKEN, | |
| ) | |
| except Exception: | |
| restart_space() | |
| def rounded_df(df): | |
| df = df.copy() | |
| for col in df.columns: | |
| if isinstance(df[col].values[0], float): | |
| df[col] = df[col].apply(lambda x: round(x, 2)) | |
| return df | |
| results_df = pd.read_csv(EVAL_RESULTS_PATH + "/results.csv") | |
| agg_df = BenchmarkSuite.aggregate_df(results_df) | |
| agg_df = agg_df.pivot(index="dataset", columns="benchmark_category", values="score") | |
| agg_df.rename(columns={"OVERALL": "General"}, inplace=True) | |
| agg_df.columns = [x.capitalize() for x in agg_df.columns] | |
| mean_cols = [col for col in agg_df.columns if str(col) not in ["Mean", "Environment", "Model", "Tags"]] | |
| agg_df["Mean"] = agg_df[mean_cols].mean(axis=1) | |
| # make sure mean is the first column | |
| agg_df = agg_df[["Mean"] + [col for col in agg_df.columns if col != "Mean"]] | |
| agg_df["Tags"] = "" | |
| agg_df.reset_index(inplace=True) | |
| agg_df.rename(columns={"dataset": "Model"}, inplace=True) | |
| agg_df.sort_values("Mean", ascending=False, inplace=True) | |
| benchmark_df = results_df.pivot(index="dataset", columns="benchmark_name", values="score") | |
| # get benchmark name order by category | |
| benchmark_order = list(results_df.sort_values("benchmark_category")["benchmark_name"].unique()) | |
| benchmark_df = benchmark_df[benchmark_order] | |
| benchmark_df = benchmark_df.reset_index() | |
| benchmark_df.rename(columns={"dataset": "Model"}, inplace=True) | |
| # set index | |
| benchmark_df.set_index("Model", inplace=True) | |
| benchmark_df["Mean"] = benchmark_df.mean(axis=1) | |
| # make sure mean is the first column | |
| benchmark_df = benchmark_df[["Mean"] + [col for col in benchmark_df.columns if col != "Mean"]] | |
| benchmark_df["Tags"] = "" | |
| benchmark_df.reset_index(inplace=True) | |
| benchmark_df.sort_values("Mean", ascending=False, inplace=True) | |
| # get details for each model | |
| model_detail_files = Path(EVAL_REQUESTS_PATH).glob("*.json") | |
| model_details = {} | |
| for model_detail_file in model_detail_files: | |
| with open(model_detail_file) as f: | |
| model_detail = json.load(f) | |
| model_details[model_detail_file.stem] = model_detail | |
| # replace .tar.gz | |
| benchmark_df["Model"] = benchmark_df["Model"].apply(lambda x: x.replace(".tar.gz", "")) | |
| agg_df["Model"] = agg_df["Model"].apply(lambda x: x.replace(".tar.gz", "")) | |
| benchmark_df["Tags"] = benchmark_df["Model"].apply(lambda x: model_details.get(x, {}).get("model_tags", "")) | |
| agg_df["Tags"] = agg_df["Model"].apply(lambda x: model_details.get(x, {}).get("model_tags", "")) | |
| benchmark_df["Model"] = benchmark_df["Model"].apply(lambda x: model_details.get(x, {}).get("display_name", x)) | |
| agg_df["Model"] = agg_df["Model"].apply(lambda x: model_details.get(x, {}).get("display_name", x)) | |
| f_b_df = benchmark_df.copy() | |
| f_a_df = agg_df.copy() | |
| def init_leaderboard(dataframe): | |
| if dataframe is None or dataframe.empty: | |
| raise ValueError("Leaderboard DataFrame is empty or None.") | |
| df_types = [] | |
| for col in dataframe.columns: | |
| if col == "Model": | |
| df_types.append("markdown") | |
| elif col == "Tags": | |
| df_types.append("markdown") | |
| else: | |
| df_types.append("number") | |
| cols = list(dataframe.columns) | |
| cols.remove("Tags") | |
| return Leaderboard( | |
| value=rounded_df(dataframe), | |
| select_columns=SelectColumns( | |
| default_selection=cols, | |
| cant_deselect=["Model", "Mean"], | |
| label="Select Columns to Display:", | |
| ), | |
| search_columns=["Model", "Tags"], | |
| filter_columns=[], | |
| interactive=False, | |
| datatype=df_types, | |
| ) | |
| app = gr.Blocks(css=custom_css, title="TTS Benchmark Leaderboard") | |
| with app: | |
| with gr.Tabs(elem_classes="tab-buttons") as tabs: | |
| with gr.TabItem("π TTSDS Scores", elem_id="llm-benchmark-tab-table", id=0): | |
| with gr.Group(): | |
| env = gr.Checkbox(value=True, label="Exclude environment from mean.") | |
| gr.Markdown("**Environment** measures how well the system can reproduce noise in the training data. This doesn't correlate with human judgements for 'naturalness'") | |
| tags = gr.Dropdown( | |
| TAGS, | |
| value=[], | |
| multiselect=True, | |
| label="Tags", | |
| info="Select tags to filter the leaderboard. You can suggest new tags here: https://huggingface.co/spaces/ttsds/benchmark/discussions/1", | |
| ) | |
| leaderboard = init_leaderboard(f_a_df) | |
| tags.change(filter_dfs, [tags, leaderboard], [leaderboard]) | |
| env.change(change_mean, [env, leaderboard], [leaderboard]) | |
| with gr.TabItem("π Individual Benchmarks", elem_id="llm-benchmark-tab-table", id=1): | |
| tags = gr.Dropdown( | |
| TAGS, | |
| value=[], | |
| multiselect=True, | |
| label="Tags", | |
| info="Select tags to filter the leaderboard", | |
| ) | |
| leaderboard = init_leaderboard(f_b_df) | |
| tags.change(filter_dfs, [tags, leaderboard], [leaderboard]) | |
| with gr.TabItem("π About", elem_id="llm-benchmark-tab-table", id=2): | |
| gr.Markdown(LLM_BENCHMARKS_TEXT, elem_classes="markdown-text") | |
| with gr.TabItem("π Submit here!", elem_id="llm-benchmark-tab-table", id=3): | |
| with gr.Column(): | |
| with gr.Row(): | |
| gr.Markdown(EVALUATION_QUEUE_TEXT, elem_classes="markdown-text") | |
| with gr.Row(): | |
| gr.Markdown("# βοΈβ¨ Submit a TTS dataset here!", elem_classes="markdown-text") | |
| with gr.Row(): | |
| with gr.Column(): | |
| model_name_textbox = gr.Textbox(label="Model name") | |
| model_tags_dropdown = gr.Dropdown( | |
| label="Model tags", | |
| choices=TAGS, | |
| multiselect=True, | |
| ) | |
| website_url_textbox = gr.Textbox(label="Website URL (optional)") | |
| hf_url_textbox = gr.Textbox(label="Huggingface URL (optional)") | |
| code_url_textbox = gr.Textbox(label="Code URL (optional)") | |
| paper_url_textbox = gr.Textbox(label="Paper URL (optional)") | |
| inference_details_textbox = gr.TextArea(label="Inference details (optional)") | |
| file_input = gr.File(file_types=[".gz"], interactive=True, label=".tar.gz TTS dataset") | |
| submit_button = gr.Button("Submit Eval") | |
| submission_result = gr.Markdown() | |
| submit_button.click( | |
| submit_eval, | |
| [ | |
| model_name_textbox, | |
| model_tags_dropdown, | |
| website_url_textbox, | |
| hf_url_textbox, | |
| code_url_textbox, | |
| paper_url_textbox, | |
| inference_details_textbox, | |
| file_input, | |
| ], | |
| submission_result, | |
| ) | |
| with gr.Row(): | |
| with gr.Accordion("Citation", open=False): | |
| gr.Markdown(f"Copy the BibTeX citation to cite this source:\n\n```bibtext\n{CITATION_TEXT}\n```") | |
| scheduler = BackgroundScheduler() | |
| scheduler.add_job(restart_space, "interval", seconds=5*86400) | |
| scheduler.start() | |
| app.queue(default_concurrency_limit=40).launch() | |