Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import ast | |
| import re | |
| from pathlib import Path | |
| import requests | |
| def group_files_by_index(file_paths, data_type="audio"): | |
| # Regular expression pattern to extract the key from each image path | |
| if data_type == "audio": | |
| pattern = r"audio_(\d+).(png|wav)" | |
| elif data_type == "video": | |
| pattern = r"video_(\d+).(png|mkv)" | |
| else: | |
| pattern = r"img_(\d+).png" | |
| # Dictionary to store the grouped files | |
| grouped_files = {} | |
| # Iterate over each file path | |
| for file_path in file_paths: | |
| # Extract the key using the regular expression pattern | |
| match = re.search(pattern, file_path) | |
| if match: | |
| key = int(match.group(1)) | |
| # Add the file path to the corresponding group in the dictionary | |
| if key not in grouped_files: | |
| grouped_files[key] = [] | |
| grouped_files[key].append(file_path) | |
| # Sort the dictionary by keys | |
| sorted_grouped_files = dict(sorted(grouped_files.items())) | |
| return sorted_grouped_files | |
| def build_description( | |
| i, data_none, data_attack, quality_metrics=["psnr", "ssim", "lpips"] | |
| ): | |
| # TODO: handle this at data generation | |
| if isinstance(data_none["fake_det"], str): | |
| data_none["fake_det"] = ast.literal_eval(data_none["fake_det"]) | |
| if isinstance(data_none["watermark_det"], str): | |
| data_none["watermark_det"] = ast.literal_eval(data_none["watermark_det"]) | |
| if isinstance(data_attack["fake_det"], str): | |
| data_attack["fake_det"] = ast.literal_eval(data_attack["fake_det"]) | |
| if isinstance(data_attack["watermark_det"], str): | |
| data_attack["watermark_det"] = ast.literal_eval(data_attack["watermark_det"]) | |
| if i == 0: | |
| fake_det = data_none["fake_det"] | |
| return f"detected: {fake_det}" | |
| elif i == 1: | |
| # Fixed metrics | |
| det = data_none["watermark_det"] | |
| p_value = float(data_none["p_value"]) | |
| bit_acc = data_none["bit_acc"] | |
| # Dynamic metrics | |
| metrics_output = [] | |
| for metric in quality_metrics: | |
| value = float(data_none[metric]) | |
| metrics_output.append(f"{metric}: {value:.2f}") | |
| # Fixed metrics output | |
| fixed_metrics_output = ( | |
| f" detected: {det} p_value: {p_value:.2f} bit_acc: {bit_acc:.2f}" | |
| ) | |
| # Combine all outputs | |
| return " ".join(metrics_output) + f"{fixed_metrics_output}" | |
| elif i == 2: | |
| fake_det = data_attack["fake_det"] | |
| return f"det: {fake_det}" | |
| elif i == 3: | |
| det = data_attack["watermark_det"] | |
| p_value = float(data_attack["p_value"]) | |
| word_acc = data_attack["word_acc"] | |
| bit_acc = data_attack["bit_acc"] | |
| return f"word_acc: {word_acc:.2f} detected: {det} p_value: {p_value:.2f} bit_acc: {bit_acc:.2f}" | |
| def build_infos(abs_path: Path, datatype: str, dataset_name: str, db_key: str): | |
| def generate_file_patterns(prefixes, extensions): | |
| indices = [0, 1, 3, 4, 5] | |
| return [ | |
| f"{prefix}_{index:05d}.{ext}" | |
| for prefix in prefixes | |
| for index in indices | |
| for ext in extensions | |
| ] | |
| if datatype == "audio": | |
| quality_metrics = ["snr", "sisnr", "stoi", "pesq"] | |
| extensions = ["png", "wav"] | |
| datatype_abbr = "audio" | |
| eval_results_path = abs_path + f"{dataset_name}_1k/examples_eval_results.json" | |
| elif datatype == "image": | |
| quality_metrics = ["psnr", "ssim", "lpips"] | |
| extensions = ["png"] | |
| datatype_abbr = "img" | |
| eval_results_path = abs_path + f"{dataset_name}_1k/examples_eval_results.json" | |
| elif datatype == "video": | |
| quality_metrics = ["psnr", "ssim", "lpips", "msssim", "vmaf"] | |
| extensions = ["mkv"] | |
| datatype_abbr = "video" | |
| eval_results_path = abs_path + f"{dataset_name}/examples_eval_results.json" | |
| response = requests.get(eval_results_path) | |
| if response.status_code == 200: | |
| results_data = response.json() | |
| else: | |
| return {} | |
| dataset = results_data["eval"][db_key] | |
| prefixes = [ | |
| f"attacked_{datatype_abbr}", | |
| f"attacked_wmd_{datatype_abbr}", | |
| f"{datatype_abbr}", | |
| f"wmd_{datatype_abbr}", | |
| ] | |
| file_patterns = generate_file_patterns(prefixes, extensions) | |
| infos = {} | |
| for model_name in dataset.keys(): | |
| model_infos = {} | |
| default_attack_name = "none" | |
| if datatype == "audio": | |
| default_attack_name = "identity" | |
| elif datatype == "video": | |
| default_attack_name = "Identity" | |
| identity_attack_rows = dataset[model_name][default_attack_name]["default"] | |
| for attack_name, attack_variants_data in dataset[model_name].items(): | |
| for attack_variant, attack_rows in attack_variants_data.items(): | |
| if attack_variant == "default": | |
| attack = attack_name | |
| else: | |
| attack = f"{attack_name}_{attack_variant}" | |
| if len(attack_rows) == 0: | |
| model_infos[attack] = [] | |
| continue | |
| if datatype == "video": | |
| file_paths = [ | |
| f"{abs_path}{dataset_name}/examples/{datatype}/{model_name}/{attack}/{pattern}" | |
| for pattern in file_patterns | |
| ] | |
| else: | |
| file_paths = [ | |
| f"{abs_path}{dataset_name}_1k/examples/{datatype}/{model_name}/{attack}/{pattern}" | |
| for pattern in file_patterns | |
| ] | |
| all_files = [] | |
| for i, files in group_files_by_index( | |
| file_paths, | |
| data_type=datatype, | |
| ).items(): | |
| data_none = [e for e in identity_attack_rows if e["idx"] == i][0] | |
| data_attack = [e for e in attack_rows if e["idx"] == i][0] | |
| files = sorted( | |
| [(f, Path(f).stem) for f in files], key=lambda x: x[1] | |
| ) | |
| files = files[2:] + files[:2] | |
| files = [ | |
| { | |
| "image_url": f, | |
| "description": f"{n}\n{build_description(i, data_none, data_attack, quality_metrics)}", | |
| **( | |
| {"audio_url": f.replace(".png", ".wav")} | |
| if datatype == "audio" and f.endswith(".png") | |
| else {} | |
| ), | |
| } | |
| for i, (f, n) in enumerate(files) | |
| ] | |
| all_files.extend(files) | |
| model_infos[attack] = all_files | |
| infos[model_name] = model_infos | |
| return infos | |
| def image_examples_tab(abs_path: Path): | |
| dataset_name = "coco_val2014" | |
| datatype = "image" | |
| db_key = "coco_val2014" | |
| image_infos = build_infos( | |
| abs_path, | |
| datatype=datatype, | |
| dataset_name=dataset_name, | |
| db_key=db_key, | |
| ) | |
| # First combo box (category selection) | |
| # model_choice = gr.Dropdown( | |
| # choices=list(image_infos.keys()), | |
| # label="Select a Model", | |
| # value=None, | |
| # ) | |
| # Second combo box (subcategory selection) | |
| # Initialize with options from the first category by default | |
| # attack_choice = gr.Dropdown( | |
| # choices=list(image_infos["wam"].keys()), | |
| # label="Select an Attack", | |
| # value=None, | |
| # ) | |
| # # Gallery component to display images | |
| # gallery = gr.Gallery( | |
| # label="Image Gallery", | |
| # columns=4, | |
| # rows=1, | |
| # ) | |
| # Update options for the second combo box when the first one changes | |
| # def update_subcategories(selected_category): | |
| # values = list(image_infos[selected_category].keys()) | |
| # values = [(v, v) for v in values] | |
| # attack_choice.choices = values | |
| # # return gr.Dropdown.update(choices=list(image_infos[selected_category].keys())) | |
| # # Function to load images based on selections from both combo boxes | |
| # def load_images(category, subcategory): | |
| # return image_infos.get(category, {}).get(subcategory, []) | |
| # # Update gallery based on both combo box selections | |
| # model_choice.change( | |
| # fn=update_subcategories, inputs=model_choice, outputs=attack_choice | |
| # ) | |
| # attack_choice.change( | |
| # fn=load_images, inputs=[model_choice, attack_choice], outputs=gallery | |
| # ) | |
| return image_infos | |
| def video_examples_tab(abs_path: Path): | |
| dataset_name = "sav_val_full" | |
| datatype = "video" | |
| db_key = "sa-v_sav_val_videos" | |
| image_infos = build_infos( | |
| abs_path, | |
| datatype=datatype, | |
| dataset_name=dataset_name, | |
| db_key=db_key, | |
| ) | |
| return image_infos | |
| # First combo box (category selection) | |
| # model_choice = gr.Dropdown( | |
| # choices=list(image_infos.keys()), | |
| # label="Select a Model", | |
| # value=None, | |
| # ) | |
| # Second combo box (subcategory selection) | |
| # Initialize with options from the first category by default | |
| # attack_choice = gr.Dropdown( | |
| # choices=list(image_infos["videoseal_0.0"].keys()), | |
| # label="Select an Attack", | |
| # value=None, | |
| # ) | |
| # Gallery component to display images | |
| # gallery = gr.Gallery( | |
| # label="Video Gallery", | |
| # columns=4, | |
| # rows=1, | |
| # ) | |
| # Update options for the second combo box when the first one changes | |
| # def update_subcategories(selected_category): | |
| # values = list(image_infos[selected_category].keys()) | |
| # values = [(v, v) for v in values] | |
| # attack_choice.choices = values | |
| # # return gr.Dropdown.update(choices=list(image_infos[selected_category].keys())) | |
| # Function to load images based on selections from both combo boxes | |
| # def load_images(category, subcategory): | |
| # return image_infos.get(category, {}).get(subcategory, []) | |
| # # Update gallery based on both combo box selections | |
| # model_choice.change( | |
| # fn=update_subcategories, inputs=model_choice, outputs=attack_choice | |
| # ) | |
| # attack_choice.change( | |
| # fn=load_images, inputs=[model_choice, attack_choice], outputs=gallery | |
| # ) | |
| def audio_examples_tab(abs_path: Path): | |
| dataset_name = "voxpopuli" | |
| datatype = "audio" | |
| db_key = "voxpopuli" | |
| audio_infos = build_infos( | |
| abs_path, | |
| datatype=datatype, | |
| dataset_name=dataset_name, | |
| db_key=db_key, | |
| ) | |
| return audio_infos | |
| print(audio_infos) | |
| # First combo box (category selection) | |
| # model_choice = gr.Dropdown( | |
| # choices=list(audio_infos.keys()), | |
| # label="Select a Model", | |
| # value=None, | |
| # ) | |
| # Second combo box (subcategory selection) | |
| # Initialize with options from the first category by default | |
| attack_choice = gr.Dropdown( | |
| choices=list(audio_infos["audioseal"].keys()), | |
| label="Select an Attack", | |
| value=None, | |
| ) | |
| # Gallery component to display images | |
| gallery = gr.Gallery( | |
| label="Image Gallery", columns=4, rows=1, object_fit="scale-down" | |
| ) | |
| audio_player = gr.Audio(visible=False) | |
| audio_map_state = gr.State({}) | |
| # Update options for the second combo box when the first one changes | |
| def update_subcategories(selected_category): | |
| values = list(audio_infos[selected_category].keys()) | |
| values = [(v, v) for v in values] | |
| attack_choice.choices = values | |
| # return gr.Dropdown.update(choices=list(image_infos[selected_category].keys())) | |
| # Function to load images based on selections from both combo boxes | |
| def load_audios(category, subcategory): | |
| files = audio_infos.get(category, {}).get(subcategory, []) | |
| images = [f for f in files if f[0].endswith(".png")] | |
| audios = {f[0]: f[0].replace(".png", ".wav") for f in images} | |
| return images, audios | |
| def play_audio(selected_image, audios): | |
| image_path = selected_image["image"]["path"] | |
| audio_file = audios.get(image_path) | |
| return gr.update(value=audio_file, visible=audio_file is not None) | |
| def hide_audio_player(): | |
| # Hide the audio player when the preview is closed | |
| return gr.update(visible=False) | |
| def get_selected_image(select_data: gr.SelectData, audios): | |
| if select_data is None: | |
| return gr.update(visible=False) | |
| selected_image = select_data.value | |
| return play_audio(selected_image, audios) | |
| # Update gallery based on both combo box selections | |
| model_choice.change( | |
| fn=update_subcategories, inputs=model_choice, outputs=attack_choice | |
| ) | |
| attack_choice.change( | |
| fn=load_audios, | |
| inputs=[model_choice, attack_choice], | |
| outputs=[gallery, audio_map_state], | |
| ) | |
| gallery.select( | |
| fn=get_selected_image, | |
| inputs=[audio_map_state], | |
| outputs=audio_player, | |
| ) | |
| gallery.preview_close( | |
| fn=hide_audio_player, | |
| outputs=audio_player, | |
| ) | |
| return gr.Column([model_choice, attack_choice, gallery, audio_player]) | |