Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import os | |
| # # https://discuss.huggingface.co/t/issues-with-sadtalker-zerogpu-spaces-inquiry-about-community-grant/110625/10 | |
| # if os.environ.get("SPACES_ZERO_GPU") is not None: | |
| # import spaces | |
| # else: | |
| # class spaces: | |
| # @staticmethod | |
| # def GPU(func): | |
| # def wrapper(*args, **kwargs): | |
| # return func(*args, **kwargs) | |
| # return wrapper | |
| import sys | |
| from qatch.connectors.sqlite_connector import SqliteConnector | |
| from qatch.generate_dataset.orchestrator_generator import OrchestratorGenerator | |
| from qatch.evaluate_dataset.orchestrator_evaluator import OrchestratorEvaluator | |
| #from predictor.orchestrator_predictor import OrchestratorPredictor | |
| import utils_get_db_tables_info | |
| import utilities as us | |
| import time | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| import plotly.colors as pc | |
| # @spaces.GPU | |
| # def model_prediction(): | |
| # pass | |
| with open('style.css', 'r') as file: | |
| css = file.read() | |
| # DataFrame di default | |
| df_default = pd.DataFrame({ | |
| 'Name': ['Alice', 'Bob', 'Charlie'], | |
| 'Age': [25, 30, 35], | |
| 'City': ['New York', 'Los Angeles', 'Chicago'] | |
| }) | |
| models_path = "models.csv" | |
| # Variabile globale per tenere traccia dei dati correnti | |
| df_current = df_default.copy() | |
| input_data = { | |
| 'input_method': "", | |
| 'data_path': "", | |
| 'db_name': "", | |
| 'data': { | |
| 'data_frames': {}, # dictionary of dataframes | |
| 'db': None # SQLITE3 database object | |
| }, | |
| 'models': [] | |
| } | |
| def load_data(file, path, use_default): | |
| """Carica i dati da un file, un percorso o usa il DataFrame di default.""" | |
| global df_current | |
| if use_default: | |
| input_data["input_method"] = 'default' | |
| input_data["data_path"] = os.path.join(".", "data", "data_interface", "mytable.sqlite") | |
| input_data["db_name"] = os.path.splitext(os.path.basename(input_data["data_path"]))[0] | |
| input_data["data"]['data_frames'] = {'MyTable': df_current} | |
| if( input_data["data"]['data_frames']): | |
| table2primary_key = {} | |
| for table_name, df in input_data["data"]['data_frames'].items(): | |
| # Assign primary keys for each table | |
| table2primary_key[table_name] = 'id' | |
| input_data["data"]["db"] = SqliteConnector( | |
| relative_db_path=input_data["data_path"], | |
| db_name=input_data["db_name"], | |
| tables= input_data["data"]['data_frames'], | |
| table2primary_key=table2primary_key | |
| ) | |
| df_current = df_default.copy() # Ripristina i dati di default | |
| return input_data["data"]['data_frames'] | |
| selected_inputs = sum([file is not None, bool(path), use_default]) | |
| if selected_inputs > 1: | |
| return 'Errore: Selezionare solo un metodo di input alla volta.' | |
| if file is not None: | |
| try: | |
| input_data["input_method"] = 'uploaded_file' | |
| input_data["db_name"] = os.path.splitext(os.path.basename(file))[0] | |
| input_data["data_path"] = os.path.join(".", "data", "data_interface",f"{input_data['db_name']}.sqlite") | |
| input_data["data"] = us.load_data(file, input_data["db_name"]) | |
| df_current = input_data["data"]['data_frames'].get('MyTable', df_default) # Carica il DataFrame | |
| if( input_data["data"]['data_frames']): | |
| table2primary_key = {} | |
| for table_name, df in input_data["data"]['data_frames'].items(): | |
| # Assign primary keys for each table | |
| table2primary_key[table_name] = 'id' | |
| input_data["data"]["db"] = SqliteConnector( | |
| relative_db_path=input_data["data_path"], | |
| db_name=input_data["db_name"], | |
| tables= input_data["data"]['data_frames'], | |
| table2primary_key=table2primary_key | |
| ) | |
| return input_data["data"]['data_frames'] | |
| except Exception as e: | |
| return f'Errore nel caricamento del file: {e}' | |
| """ | |
| if path: | |
| if not os.path.exists(path): | |
| return 'Errore: Il percorso specificato non esiste.' | |
| try: | |
| input_data["input_method"] = 'uploaded_file' | |
| input_data["data_path"] = path | |
| input_data["db_name"] = os.path.splitext(os.path.basename(path))[0] | |
| input_data["data"] = us.load_data(input_data["data_path"], input_data["db_name"]) | |
| df_current = input_data["data"]['data_frames'].get('MyTable', df_default) # Carica il DataFrame | |
| return input_data["data"]['data_frames'] | |
| except Exception as e: | |
| return f'Errore nel caricamento del file dal percorso: {e}' | |
| """ | |
| return input_data["data"]['data_frames'] | |
| def preview_default(use_default): | |
| """Mostra il DataFrame di default se il checkbox Γ¨ selezionato.""" | |
| if use_default: | |
| return df_default # Mostra il DataFrame di default | |
| return df_current # Mostra il DataFrame corrente, che potrebbe essere stato modificato | |
| def update_df(new_df): | |
| """Aggiorna il DataFrame corrente.""" | |
| global df_current # Usa la variabile globale per aggiornarla | |
| df_current = new_df | |
| return df_current | |
| def open_accordion(target): | |
| # Apre uno e chiude l'altro | |
| if target == "reset": | |
| df_current = df_default.copy() | |
| input_data['input_method'] = "" | |
| input_data['data_path'] = "" | |
| input_data['db_name'] = "" | |
| input_data['data']['data_frames'] = {} | |
| input_data['data']['db'] = None | |
| input_data['models'] = [] | |
| return gr.update(open=True), gr.update(open=False, visible=False), gr.update(open=False, visible=False), gr.update(open=False, visible=False), gr.update(open=False, visible=False), gr.update(value=False), gr.update(value=None) | |
| elif target == "model_selection": | |
| return gr.update(open=False), gr.update(open=False), gr.update(open=True, visible=True), gr.update(open=False), gr.update(open=False) | |
| # Interfaccia Gradio | |
| with gr.Blocks(theme='d8ahazard/rd_blue', css_paths='style.css') as interface: | |
| with gr.Row(): | |
| gr.Column(scale=1) | |
| gr.Image( | |
| value="https://github.com/CristianDegni01/Automatic-LLM-Benchmark-Analysis-for-Text2SQL-GRADIO/blob/master/models_logo/QATCH.png?raw=true", | |
| show_label=False, | |
| container=False, | |
| height=200, # in pixel | |
| width=400 | |
| ) | |
| gr.Column(scale=1) | |
| data_state = gr.State(None) # Memorizza i dati caricati | |
| upload_acc = gr.Accordion("Upload your data section", open=True, visible=True) | |
| select_table_acc = gr.Accordion("Select tables", open=False, visible=False) | |
| select_model_acc = gr.Accordion("Select models", open=False, visible=False) | |
| qatch_acc = gr.Accordion("QATCH execution", open=False, visible=False) | |
| metrics_acc = gr.Accordion("Metrics", open=False, visible=False) | |
| #metrics_acc = gr.Accordion("Metrics", open=False, visible=False, render=False) | |
| ################################# | |
| # DATABASE INSERTION # | |
| ################################# | |
| with upload_acc: | |
| gr.Markdown("## Data Upload") | |
| file_input = gr.File(label="Drag and drop a file", file_types=[".csv", ".xlsx", ".sqlite"]) | |
| with gr.Row(): | |
| default_checkbox = gr.Checkbox(label="Use default DataFrame") | |
| preview_output = gr.DataFrame(interactive=True, visible=True, value=df_default) | |
| submit_button = gr.Button("Load Data", interactive=False) # Disabled by default | |
| output = gr.JSON(visible=False) # Dictionary output | |
| # Function to enable the button if there is data to load | |
| def enable_submit(file, use_default): | |
| return gr.update(interactive=bool(file or use_default)) | |
| # Function to uncheck the checkbox if a file is uploaded | |
| def deselect_default(file): | |
| if file: | |
| return gr.update(value=False) | |
| return gr.update() | |
| # Enable the button when inputs are provided | |
| file_input.change(fn=enable_submit, inputs=[file_input, default_checkbox], outputs=[submit_button]) | |
| default_checkbox.change(fn=enable_submit, inputs=[file_input, default_checkbox], outputs=[submit_button]) | |
| # Show preview of the default DataFrame when checkbox is selected | |
| default_checkbox.change(fn=preview_default, inputs=[default_checkbox], outputs=[preview_output]) | |
| preview_output.change(fn=update_df, inputs=[preview_output], outputs=[preview_output]) | |
| # Uncheck the checkbox when a file is uploaded | |
| file_input.change(fn=deselect_default, inputs=[file_input], outputs=[default_checkbox]) | |
| def handle_output(file, use_default): | |
| """Handles the output when the 'Load Data' button is pressed.""" | |
| result = load_data(file, None, use_default) | |
| if isinstance(result, dict): # If result is a dictionary of DataFrames | |
| if len(result) == 1: # If there's only one table | |
| return ( | |
| gr.update(visible=False), # Hide JSON output | |
| result, # Save the data state | |
| gr.update(visible=False), # Hide table selection | |
| result, # Maintain the data state | |
| gr.update(interactive=False), # Disable the submit button | |
| gr.update(visible=True, open=True), # Proceed to select_model_acc | |
| gr.update(visible=True, open=False) | |
| ) | |
| else: | |
| return ( | |
| gr.update(visible=False), | |
| result, | |
| gr.update(open=True, visible=True), | |
| result, | |
| gr.update(interactive=False), | |
| gr.update(visible=False), # Keep current behavior | |
| gr.update(visible=True, open=True) | |
| ) | |
| else: | |
| return ( | |
| gr.update(visible=False), | |
| None, | |
| gr.update(open=False, visible=True), | |
| None, | |
| gr.update(interactive=True), | |
| gr.update(visible=False), | |
| gr.update(visible=True, open=True) | |
| ) | |
| submit_button.click( | |
| fn=handle_output, | |
| inputs=[file_input, default_checkbox], | |
| outputs=[output, output, select_table_acc, data_state, submit_button, select_model_acc, upload_acc] | |
| ) | |
| ###################################### | |
| # TABLE SELECTION PART # | |
| ###################################### | |
| with select_table_acc: | |
| table_selector = gr.CheckboxGroup(choices=[], label="Select tables to display", value=[]) | |
| table_outputs = [gr.DataFrame(label=f"Table {i+1}", interactive=True, visible=False) for i in range(5)] | |
| selected_table_names = gr.Textbox(label="Selected tables", visible=False, interactive=False) | |
| # Model selection button (initially disabled) | |
| open_model_selection = gr.Button("Choose your models", interactive=False) | |
| def update_table_list(data): | |
| """Dynamically updates the list of available tables.""" | |
| if isinstance(data, dict) and data: | |
| table_names = list(data.keys()) # Return only the table names | |
| return gr.update(choices=table_names, value=[]) # Reset selections | |
| return gr.update(choices=[], value=[]) | |
| def show_selected_tables(data, selected_tables): | |
| """Displays only the tables selected by the user and enables the button.""" | |
| updates = [] | |
| if isinstance(data, dict) and data: | |
| available_tables = list(data.keys()) # Actually available names | |
| selected_tables = [t for t in selected_tables if t in available_tables] # Filter valid selections | |
| tables = {name: data[name] for name in selected_tables} # Filter the DataFrames | |
| for i, (name, df) in enumerate(tables.items()): | |
| updates.append(gr.update(value=df, label=f"Table: {name}", visible=True)) | |
| # If there are fewer than 5 tables, hide the other DataFrames | |
| for _ in range(len(tables), 5): | |
| updates.append(gr.update(visible=False)) | |
| else: | |
| updates = [gr.update(value=pd.DataFrame(), visible=False) for _ in range(5)] | |
| # Enable/disable the button based on selections | |
| button_state = bool(selected_tables) # True if at least one table is selected, False otherwise | |
| updates.append(gr.update(interactive=button_state)) # Update button state | |
| return updates | |
| def show_selected_table_names(selected_tables): | |
| """Displays the names of the selected tables when the button is pressed.""" | |
| if selected_tables: | |
| return gr.update(value=", ".join(selected_tables), visible=False) | |
| return gr.update(value="", visible=False) | |
| # Automatically updates the checkbox list when `data_state` changes | |
| data_state.change(fn=update_table_list, inputs=[data_state], outputs=[table_selector]) | |
| # Updates the visible tables and the button state based on user selections | |
| table_selector.change(fn=show_selected_tables, inputs=[data_state, table_selector], outputs=table_outputs + [open_model_selection]) | |
| # Shows the list of selected tables when "Choose your models" is clicked | |
| open_model_selection.click(fn=show_selected_table_names, inputs=[table_selector], outputs=[selected_table_names]) | |
| open_model_selection.click(open_accordion, inputs=gr.State("model_selection"), outputs=[upload_acc, select_table_acc, select_model_acc, qatch_acc, metrics_acc]) | |
| #################################### | |
| # MODEL SELECTION PART # | |
| #################################### | |
| with select_model_acc: | |
| gr.Markdown("**Model Selection**") | |
| # Assume that `us.read_models_csv` also returns the image path | |
| model_list_dict = us.read_models_csv(models_path) | |
| model_list = [model["code"] for model in model_list_dict] | |
| model_images = [model["image_path"] for model in model_list_dict] | |
| model_checkboxes = [] | |
| rows = [] | |
| # Dynamically create checkboxes with images (3 per row) | |
| for i in range(0, len(model_list), 3): | |
| with gr.Row(): | |
| cols = [] | |
| for j in range(3): | |
| if i + j < len(model_list): | |
| model = model_list[i + j] | |
| image_path = model_images[i + j] | |
| with gr.Column(): | |
| gr.Image(image_path, show_label=False) | |
| checkbox = gr.Checkbox(label=model, value=False) | |
| model_checkboxes.append(checkbox) | |
| cols.append(checkbox) | |
| rows.append(cols) | |
| selected_models_output = gr.JSON(visible=False) | |
| # Function to get selected models | |
| def get_selected_models(*model_selections): | |
| selected_models = [model for model, selected in zip(model_list, model_selections) if selected] | |
| input_data['models'] = selected_models | |
| button_state = bool(selected_models) # True if at least one model is selected, False otherwise | |
| return selected_models, gr.update(open=True, visible=True), gr.update(interactive=button_state) | |
| # Submit button (initially disabled) | |
| submit_models_button = gr.Button("Submit Models", interactive=False) | |
| # Link checkboxes to selection events | |
| for checkbox in model_checkboxes: | |
| checkbox.change( | |
| fn=get_selected_models, | |
| inputs=model_checkboxes, | |
| outputs=[selected_models_output, select_model_acc, submit_models_button] | |
| ) | |
| submit_models_button.click( | |
| fn=lambda *args: (get_selected_models(*args), gr.update(open=False, visible=True), gr.update(open=True, visible=True)), | |
| inputs=model_checkboxes, | |
| outputs=[selected_models_output, select_model_acc, qatch_acc] | |
| ) | |
| def enable_disable(enable): | |
| return ( | |
| *[gr.update(interactive=enable) for _ in model_checkboxes], | |
| gr.update(interactive=enable), | |
| gr.update(interactive=enable), | |
| gr.update(interactive=enable), | |
| gr.update(interactive=enable), | |
| gr.update(interactive=enable), | |
| gr.update(interactive=enable), | |
| *[gr.update(interactive=enable) for _ in table_outputs], | |
| gr.update(interactive=enable) | |
| ) | |
| reset_data = gr.Button("Back to upload data section") | |
| submit_models_button.click( | |
| fn=enable_disable, | |
| inputs=[gr.State(False)], | |
| outputs=[ | |
| *model_checkboxes, | |
| submit_models_button, | |
| preview_output, | |
| submit_button, | |
| file_input, | |
| default_checkbox, | |
| table_selector, | |
| *table_outputs, | |
| open_model_selection | |
| ] | |
| ) | |
| reset_data.click(open_accordion, inputs=gr.State("reset"), outputs=[upload_acc, select_table_acc, select_model_acc, qatch_acc, metrics_acc, default_checkbox, file_input]) | |
| reset_data.click( | |
| fn=enable_disable, | |
| inputs=[gr.State(True)], | |
| outputs=[ | |
| *model_checkboxes, | |
| submit_models_button, | |
| preview_output, | |
| submit_button, | |
| file_input, | |
| default_checkbox, | |
| table_selector, | |
| *table_outputs, | |
| open_model_selection | |
| ] | |
| ) | |
| ############################# | |
| # QATCH EXECUTION # | |
| ############################# | |
| with qatch_acc: | |
| def change_text(text): | |
| return text | |
| loading_symbols= {1:"π", | |
| 2: "π π", | |
| 3: "π π π", | |
| 4: "π π π π", | |
| 5: "π π π π π", | |
| 6: "π π π π π π", | |
| 7: "π π π π π π π", | |
| 8: "π π π π π π π π", | |
| 9: "π π π π π π π π π", | |
| 10:"π π π π π π π π π π", | |
| } | |
| def generate_loading_text(percent): | |
| num_symbols = (round(percent) % 11) + 1 | |
| symbols = loading_symbols.get(num_symbols, "π") | |
| mirrored_symbols = f'<span class="mirrored">{symbols.strip()}</span>' | |
| css_symbols = f'<span class="fish">{symbols.strip()}</span>' | |
| return f"<div class='barcontainer'>{css_symbols} <span class='loading'>Generation {percent}%</span>{mirrored_symbols}</div>" | |
| #return f"{css_symbols}"+f"# Loading {percent}% #"+f"{mirrored_symbols}" | |
| def qatch_flow(): | |
| orchestrator_generator = OrchestratorGenerator() | |
| # TODO: add to target_df column target_df["columns_used"], tables selection | |
| # print(input_data['data']['db']) | |
| target_df = orchestrator_generator.generate_dataset(connector=input_data['data']['db']) | |
| schema_text = utils_get_db_tables_info.utils_extract_db_schema_as_string( | |
| db_id = input_data["db_name"], | |
| base_path = input_data["data_path"], | |
| normalize=False, | |
| sql=None | |
| ) | |
| # TODO: QUERY PREDICTION | |
| predictions_dict = {model: pd.DataFrame(columns=['id', 'question', 'predicted_sql', 'time', 'query', 'db_path']) for model in model_list} | |
| metrics_conc = pd.DataFrame() | |
| for model in input_data["models"]: | |
| model_image_path = next((m["image_path"] for m in model_list_dict if m["code"] == model), None) | |
| yield gr.Image(model_image_path), gr.Markdown(), gr.Markdown(), gr.Markdown(), metrics_conc, *[predictions_dict[model] for model in model_list] | |
| for index, row in target_df.iterrows(): | |
| percent_complete = round(((index+1) / len(target_df)) * 100, 2) | |
| load_text = f"{generate_loading_text(percent_complete)}" | |
| question = row['question'] | |
| display_question = f"<div class='loading' style ='font-size: 1.7rem;'>Natural Language: </div> <div class='sqlquery'>{row['question']}</div>" | |
| # yield gr.Textbox(question), gr.Textbox(), *[predictions_dict[model] for model in input_data["models"]], None | |
| yield gr.Image(), gr.Markdown(load_text), gr.Markdown(display_question), gr.Markdown(), metrics_conc, *[predictions_dict[model] for model in model_list] | |
| start_time = time.time() | |
| # Simulate prediction | |
| time.sleep(0.4) | |
| prediction = "Prediction_placeholder" | |
| display_prediction = f"<div class='loading' style ='font-size: 1.7rem;'>Generated SQL: </div><div class='sqlquery'>{prediction}</div>" | |
| # Run real prediction here | |
| # prediction = predictor.run(model, schema_text, question) | |
| end_time = time.time() | |
| # Create a new row as dataframe | |
| new_row = pd.DataFrame([{ | |
| 'id': index, | |
| 'question': question, | |
| 'predicted_sql': prediction, | |
| 'time': end_time - start_time, | |
| 'query': row["query"], | |
| 'db_path': input_data["data_path"] | |
| }]).dropna(how="all") # Remove only completely empty rows | |
| # TODO: use a for loop | |
| for col in target_df.columns: | |
| if col not in new_row.columns: | |
| new_row[col] = row[col] | |
| # Update model's prediction dataframe incrementally | |
| if not new_row.empty: | |
| predictions_dict[model] = pd.concat([predictions_dict[model], new_row], ignore_index=True) | |
| # yield gr.Textbox(), gr.Textbox(prediction), *[predictions_dict[model] for model in input_data["models"]], None | |
| yield gr.Image(), gr.Markdown(load_text), gr.Markdown(), gr.Markdown(display_prediction), metrics_conc, *[predictions_dict[model] for model in model_list] | |
| yield gr.Image(), gr.Markdown(load_text), gr.Markdown(), gr.Markdown(display_prediction), metrics_conc, *[predictions_dict[model] for model in model_list] | |
| # END | |
| evaluator = OrchestratorEvaluator() | |
| for model in input_data["models"]: | |
| metrics_df_model = evaluator.evaluate_df( | |
| df=predictions_dict[model], | |
| target_col_name="query", | |
| prediction_col_name="predicted_sql", | |
| db_path_name="db_path" | |
| ) | |
| metrics_df_model['model'] = model | |
| metrics_conc = pd.concat([metrics_conc, metrics_df_model], ignore_index=True) | |
| if 'valid_efficiency_score' not in metrics_conc.columns: | |
| metrics_conc['valid_efficiency_score'] = metrics_conc['VES'] | |
| yield gr.Image(), gr.Markdown(), gr.Markdown(), gr.Markdown(), metrics_conc, *[predictions_dict[model] for model in model_list] | |
| # Loading Bar | |
| with gr.Row(): | |
| # progress = gr.Progress() | |
| variable = gr.Markdown() | |
| # NL -> MODEL -> Generated Query | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Column(): | |
| question_display = gr.Markdown() | |
| with gr.Column(): | |
| gr.Markdown("<div class='leftarrow'>‴</div>") | |
| with gr.Column(): | |
| model_logo = gr.Image(visible=True, show_label=False) | |
| with gr.Column(): | |
| with gr.Column(): | |
| prediction_display = gr.Markdown() | |
| with gr.Column(): | |
| gr.Markdown("<div class='rightarrow'>‴</div>") | |
| dataframe_per_model = {} | |
| with gr.Tabs() as model_tabs: | |
| tab_dict = {} | |
| for model in model_list: | |
| with gr.TabItem(model, visible=(model in input_data["models"])) as tab: | |
| gr.Markdown(f"**Results for {model}**") | |
| tab_dict[model] = tab | |
| dataframe_per_model[model] = gr.DataFrame() | |
| # download_pred_model = gr.DownloadButton(label="Download Prediction per Model", visible=False) | |
| def change_tab(): | |
| return [gr.update(visible=(model in input_data["models"])) for model in model_list] | |
| submit_models_button.click( | |
| change_tab, | |
| inputs=[], | |
| outputs=[tab_dict[model] for model in model_list] # Update TabItem visibility | |
| ) | |
| selected_models_display = gr.JSON(label="Final input data", visible=False) | |
| metrics_df = gr.DataFrame(visible=False) | |
| metrics_df_out = gr.DataFrame(visible=False) | |
| submit_models_button.click( | |
| fn=qatch_flow, | |
| inputs=[], | |
| outputs=[model_logo, variable, question_display, prediction_display, metrics_df] + list(dataframe_per_model.values()) | |
| ) | |
| submit_models_button.click( | |
| fn=lambda: gr.update(value=input_data), | |
| outputs=[selected_models_display] | |
| ) | |
| # Works for METRICS | |
| metrics_df.change(fn=change_text, inputs=[metrics_df], outputs=[metrics_df_out]) | |
| proceed_to_metrics_button = gr.Button("Proceed to Metrics") | |
| proceed_to_metrics_button.click( | |
| fn=lambda: (gr.update(open=False, visible=True), gr.update(open=True, visible=True)), | |
| outputs=[qatch_acc, metrics_acc] | |
| ) | |
| def allow_download(metrics_df_out): | |
| path = os.path.join(".", "data", "data_results", "results.csv") | |
| metrics_df_out.to_csv(path, index=False) | |
| return gr.update(value=path, visible=True) | |
| download_metrics = gr.DownloadButton(label="Download Metrics Evaluation", visible=False) | |
| submit_models_button.click( | |
| fn=lambda: gr.update(visible=False), | |
| outputs=[download_metrics] | |
| ) | |
| #TODO WHY? | |
| # download_metrics.click( | |
| # fn=lambda: gr.update(open=True, visible=True), | |
| # outputs=[download_metrics] | |
| # ) | |
| metrics_df_out.change(fn=allow_download, inputs=[metrics_df_out], outputs=[download_metrics]) | |
| reset_data = gr.Button("Back to upload data section") | |
| reset_data.click(open_accordion, inputs=gr.State("reset"), outputs=[upload_acc, select_table_acc, select_model_acc, qatch_acc, metrics_acc, default_checkbox, file_input]) | |
| #WHY NOT WORKING? | |
| reset_data.click( | |
| fn=lambda: gr.update(visible=False), | |
| outputs=[download_metrics] | |
| ) | |
| reset_data.click( | |
| fn=enable_disable, | |
| inputs=[gr.State(True)], | |
| outputs=[ | |
| *model_checkboxes, | |
| submit_models_button, | |
| preview_output, | |
| submit_button, | |
| file_input, | |
| default_checkbox, | |
| table_selector, | |
| *table_outputs, | |
| open_model_selection | |
| ] | |
| ) | |
| ########################################## | |
| # METRICS VISUALIZATION SECTION # | |
| ########################################## | |
| with metrics_acc: | |
| #confirmation_text = gr.Markdown("## Metrics successfully loaded") | |
| data_path = 'test_results.csv' | |
| def function_metrics(metrics_df_out): | |
| def load_data_csv_es(): | |
| return pd.read_csv(data_path) | |
| #return metrics_df_out | |
| def calculate_average_metrics(df, selected_metrics): | |
| df['avg_metric'] = df[selected_metrics].mean(axis=1) | |
| return df | |
| def generate_model_colors(): | |
| """Generates a unique color map for models in the dataset.""" | |
| df = load_data_csv_es() | |
| unique_models = df['model'].unique() # Extract unique models | |
| num_models = len(unique_models) | |
| # Use the Plotly color scale (you can change it if needed) | |
| color_palette = pc.qualitative.Plotly # ['#636EFA', '#EF553B', '#00CC96', ...] | |
| # If there are more models than colors, cycle through them | |
| colors = {model: color_palette[i % len(color_palette)] for i, model in enumerate(unique_models)} | |
| return colors | |
| MODEL_COLORS = generate_model_colors() | |
| # BAR CHART FOR AVERAGE METRICS WITH UPDATE FUNCTION | |
| def plot_metric(df, selected_metrics, group_by, selected_models): | |
| df = df[df['model'].isin(selected_models)] | |
| df = calculate_average_metrics(df, selected_metrics) | |
| # Ensure the group_by value is always valid | |
| if group_by not in [["tbl_name", "model"], ["model"]]: | |
| group_by = ["tbl_name", "model"] # Default | |
| avg_metrics = df.groupby(group_by)['avg_metric'].mean().reset_index() | |
| fig = px.bar( | |
| avg_metrics, | |
| x=group_by[0], | |
| y='avg_metric', | |
| color='model', | |
| color_discrete_map=MODEL_COLORS, | |
| barmode='group', | |
| title=f'Average metric per {group_by[0]} π', | |
| labels={group_by[0]: group_by[0].capitalize(), 'avg_metric': 'Average Metric'}, | |
| template='plotly_dark' | |
| ) | |
| return gr.Plot(fig, visible=True) | |
| def update_plot(selected_metrics, group_by, selected_models): | |
| df = load_data_csv_es() | |
| return plot_metric(df, selected_metrics, group_by, selected_models) | |
| # RADAR CHART FOR AVERAGE METRICS PER MODEL WITH UPDATE FUNCTION | |
| def plot_radar(df, selected_models): | |
| # Filter only selected models | |
| df = df[df['model'].isin(selected_models)] | |
| # Select relevant metrics | |
| selected_metrics = ["cell_precision", "cell_recall", "execution_accuracy", "tuple_cardinality", "tuple_constraint"] | |
| # Compute average metrics per test_category and model | |
| df = calculate_average_metrics(df, selected_metrics) | |
| avg_metrics = df.groupby(['model', 'test_category'])['avg_metric'].mean().reset_index() | |
| # Check if data is available | |
| if avg_metrics.empty: | |
| print("Error: No data available to compute averages.") | |
| return go.Figure() | |
| fig = go.Figure() | |
| categories = avg_metrics['test_category'].unique() | |
| for model in selected_models: | |
| model_data = avg_metrics[avg_metrics['model'] == model] | |
| # Build a list of values for each category (if a value is missing, set it to 0) | |
| values = [ | |
| model_data[model_data['test_category'] == cat]['avg_metric'].values[0] | |
| if cat in model_data['test_category'].values else 0 | |
| for cat in categories | |
| ] | |
| fig.add_trace(go.Scatterpolar( | |
| r=values, | |
| theta=categories, | |
| fill='toself', | |
| name=model, | |
| line=dict(color=MODEL_COLORS.get(model, "gray")) | |
| )) | |
| fig.update_layout( | |
| polar=dict(radialaxis=dict(visible=True, range=[0, max(avg_metrics['avg_metric'].max(), 0.5)])), # Set the radar range | |
| title='βοΈ Radar Plot of Metrics per Model (Average per Category) βοΈ ', | |
| template='plotly_dark', | |
| width=700, height=700 | |
| ) | |
| return fig | |
| def update_radar(selected_models): | |
| df = load_data_csv_es() | |
| return plot_radar(df, selected_models) | |
| # LINE CHART FOR CUMULATIVE TIME WITH UPDATE FUNCTION | |
| def plot_cumulative_flow(df, selected_models): | |
| df = df[df['model'].isin(selected_models)] | |
| fig = go.Figure() | |
| for model in selected_models: | |
| model_df = df[df['model'] == model].copy() | |
| # Calculate cumulative time | |
| model_df['cumulative_time'] = model_df['time'].cumsum() | |
| # Calculate cumulative number of queries over time | |
| model_df['cumulative_queries'] = range(1, len(model_df) + 1) | |
| # Select a color for the model | |
| color = MODEL_COLORS.get(model, "gray") # Assigned model color | |
| fillcolor = color.replace("rgb", "rgba").replace(")", ", 0.2)") # πΉ Makes the area semi-transparent | |
| #color = f"rgba({hash(model) % 256}, {hash(model * 2) % 256}, {hash(model * 3) % 256}, 1)" | |
| fig.add_trace(go.Scatter( | |
| x=model_df['cumulative_time'], | |
| y=model_df['cumulative_queries'], | |
| mode='lines+markers', | |
| name=model, | |
| line=dict(width=2, color=color) | |
| )) | |
| # Adds the underlying colored area (same color but transparent) | |
| """ | |
| fig.add_trace(go.Scatter( | |
| x=model_df['cumulative_time'], | |
| y=model_df['cumulative_queries'], | |
| fill='tozeroy', | |
| mode='none', | |
| showlegend=False, # Hides the area in the legend | |
| fillcolor=fillcolor | |
| )) | |
| """ | |
| fig.update_layout( | |
| title="Cumulative Query Flow Chart π", | |
| xaxis_title="Cumulative Time (s)", | |
| yaxis_title="Number of Queries Completed", | |
| template='plotly_dark', | |
| legend_title="Models" | |
| ) | |
| return fig | |
| def update_query_rate(selected_models): | |
| df = load_data_csv_es() | |
| return plot_cumulative_flow(df, selected_models) | |
| # RANKING FOR THE TOP 3 MODELS WITH UPDATE FUNCTION | |
| def ranking_text(df, selected_models, ranking_type): | |
| #df = load_data_csv_es() | |
| df = df[df['model'].isin(selected_models)] | |
| df['valid_efficiency_score'] = pd.to_numeric(df['valid_efficiency_score'], errors='coerce') | |
| if ranking_type == "valid_efficiency_score": | |
| rank_df = df.groupby('model')['valid_efficiency_score'].mean().reset_index() | |
| #rank_df = df.groupby('model')['valid_efficiency_score'].mean().reset_index() | |
| ascending_order = False # Higher is better | |
| elif ranking_type == "time": | |
| rank_df = df.groupby('model')['time'].sum().reset_index() | |
| rank_df["Ranking Value"] = rank_df["time"].round(2).astype(str) + " s" # Adds "s" for seconds | |
| ascending_order = True # For time, lower is better | |
| elif ranking_type == "metrics": | |
| selected_metrics = ["cell_precision", "cell_recall", "execution_accuracy", "tuple_cardinality", "tuple_constraint"] | |
| df = calculate_average_metrics(df, selected_metrics) | |
| rank_df = df.groupby('model')['avg_metric'].mean().reset_index() | |
| ascending_order = False # Higher is better | |
| if ranking_type != "time": | |
| rank_df.rename(columns={rank_df.columns[1]: "Ranking Value"}, inplace=True) | |
| rank_df["Ranking Value"] = rank_df["Ranking Value"].round(2) # Round values except for time | |
| # Sort based on the selected criterion | |
| rank_df = rank_df.sort_values(by="Ranking Value", ascending=ascending_order).reset_index(drop=True) | |
| # Select only the top 3 models | |
| rank_df = rank_df.head(3) | |
| # Add medal icons for the top 3 | |
| medals = ["π₯", "π₯", "π₯"] | |
| rank_df.insert(0, "Rank", medals[:len(rank_df)]) | |
| # Build the formatted ranking string | |
| ranking_str = "## π Model Ranking\n" | |
| for _, row in rank_df.iterrows(): | |
| ranking_str += f"<span style='font-size:18px;'>{row['Rank']} {row['model']} ({row['Ranking Value']})</span><br>\n" | |
| return ranking_str | |
| def update_ranking_text(selected_models, ranking_type): | |
| df = load_data_csv_es() | |
| return ranking_text(df, selected_models, ranking_type) | |
| # RANKING FOR THE 3 WORST RESULTS WITH UPDATE FUNCTION | |
| def worst_cases_text(df, selected_models): | |
| df = df[df['model'].isin(selected_models)] | |
| selected_metrics = ["cell_precision", "cell_recall", "execution_accuracy", "tuple_cardinality", "tuple_constraint"] | |
| df = calculate_average_metrics(df, selected_metrics) | |
| worst_cases_df = df.groupby(['model', 'tbl_name', 'test_category', 'question', 'query', 'predicted_sql'])['avg_metric'].mean().reset_index() | |
| worst_cases_df = worst_cases_df.sort_values(by="avg_metric", ascending=True).reset_index(drop=True) | |
| worst_cases_top_3 = worst_cases_df.head(3) | |
| worst_cases_top_3["avg_metric"] = worst_cases_top_3["avg_metric"].round(2) | |
| worst_str = "## β Top 3 Worst Cases\n" | |
| medals = ["π₯", "π₯", "π₯"] | |
| for i, row in worst_cases_top_3.iterrows(): | |
| worst_str += ( | |
| f"<span style='font-size:18px;'><b>{medals[i]} {row['model']} - {row['tbl_name']} - {row['test_category']}</b> ({row['avg_metric']})</span> \n" | |
| f"<span style='font-size:16px;'>- <b>Question:</b> {row['question']}</span> \n" | |
| f"<span style='font-size:16px;'>- <b>Original Query:</b> `{row['query']}`</span> \n" | |
| f"<span style='font-size:16px;'>- <b>Predicted SQL:</b> `{row['predicted_sql']}`</span> \n\n" | |
| ) | |
| return worst_str | |
| def update_worst_cases_text(selected_models): | |
| df = load_data_csv_es() | |
| return worst_cases_text(df, selected_models) | |
| metrics = ["cell_precision", "cell_recall", "execution_accuracy", "tuple_cardinality", "tuple_constraint"] | |
| group_options = { | |
| "Table": ["tbl_name", "model"], | |
| "Model": ["model"] | |
| } | |
| df_initial = load_data_csv_es() | |
| models = df_initial['model'].unique().tolist() | |
| #with gr.Blocks(theme=gr.themes.Default(primary_hue='blue')) as demo: | |
| gr.Markdown("""## π Model Performance Analysis π | |
| Select one or more metrics to calculate the average and visualize histograms and radar plots. | |
| """) | |
| # Options selection section | |
| with gr.Row(): | |
| metric_multiselect = gr.CheckboxGroup(choices=metrics, label="Select metrics", value=metrics) | |
| model_multiselect = gr.CheckboxGroup(choices=models, label="Select models", value=models) | |
| group_radio = gr.Radio(choices=list(group_options.keys()), label="Select grouping", value="Table") | |
| output_plot = gr.Plot(visible=False) | |
| query_rate_plot = gr.Plot(value=update_query_rate(models)) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| radar_plot = gr.Plot(value=update_radar(models)) | |
| with gr.Column(scale=1): | |
| ranking_type_radio = gr.Radio( | |
| ["valid_efficiency_score", "time", "metrics"], | |
| label="Choose ranking criteria", | |
| value="valid_efficiency_score" | |
| ) | |
| ranking_text_display = gr.Markdown(value=update_ranking_text(models, "valid_efficiency_score")) | |
| worst_cases_display = gr.Markdown(value=update_worst_cases_text(models)) | |
| # Callback functions for updating charts | |
| def on_change(selected_metrics, selected_group, selected_models): | |
| return update_plot(selected_metrics, group_options[selected_group], selected_models) | |
| def on_radar_change(selected_models): | |
| return update_radar(selected_models) | |
| #metrics_df_out.change(on_change, inputs=[metric_multiselect, group_radio, model_multiselect], outputs=output_plot) | |
| proceed_to_metrics_button.click(on_change, inputs=[metric_multiselect, group_radio, model_multiselect], outputs=output_plot) | |
| proceed_to_metrics_button.click(update_query_rate, inputs=[model_multiselect], outputs=query_rate_plot) | |
| metric_multiselect.change(on_change, inputs=[metric_multiselect, group_radio, model_multiselect], outputs=output_plot) | |
| group_radio.change(on_change, inputs=[metric_multiselect, group_radio, model_multiselect], outputs=output_plot) | |
| model_multiselect.change(on_change, inputs=[metric_multiselect, group_radio, model_multiselect], outputs=output_plot) | |
| model_multiselect.change(update_radar, inputs=model_multiselect, outputs=radar_plot) | |
| model_multiselect.change(update_ranking_text, inputs=[model_multiselect, ranking_type_radio], outputs=ranking_text_display) | |
| ranking_type_radio.change(update_ranking_text, inputs=[model_multiselect, ranking_type_radio], outputs=ranking_text_display) | |
| model_multiselect.change(update_worst_cases_text, inputs=model_multiselect, outputs=worst_cases_display) | |
| model_multiselect.change(update_query_rate, inputs=[model_multiselect], outputs=query_rate_plot) | |
| reset_data = gr.Button("Back to upload data section") | |
| reset_data.click(open_accordion, inputs=gr.State("reset"), outputs=[upload_acc, select_table_acc, select_model_acc, qatch_acc, metrics_acc, default_checkbox, file_input]) | |
| reset_data.click( | |
| fn=lambda: gr.update(visible=False), | |
| outputs=[download_metrics] | |
| ) | |
| reset_data.click( | |
| fn=lambda: gr.update(visible=False), | |
| outputs=[download_metrics] | |
| ) | |
| reset_data.click( | |
| fn=enable_disable, | |
| inputs=[gr.State(True)], | |
| outputs=[ | |
| *model_checkboxes, | |
| submit_models_button, | |
| preview_output, | |
| submit_button, | |
| file_input, | |
| default_checkbox, | |
| table_selector, | |
| *table_outputs, | |
| open_model_selection | |
| ] | |
| ) | |
| interface.launch() |