Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import os | |
| import json | |
| import threading | |
| from pathlib import Path | |
| from moviepy.editor import VideoFileClip | |
| import hashlib | |
| import random | |
| import string | |
| from PIL import Image | |
| PHYSICAL_LAWS = [ | |
| "Violation of Newton's Law: Objects move without any external force.", | |
| "Violation of the Law of Conservation of Mass or Solid Constitutive Law: Objects deform or distort irregularly.", | |
| "Violation of Fluid Constitutive Law: Liquids flow in an unnatural or irregular manner.", | |
| "Violation of Non-physical Penetration: Objects unnaturally pass through each other.", | |
| "Violation of Gravity: Objects behave inconsistently with gravity, such as floating in the air.", | |
| "No violation!" | |
| ] | |
| # List of commonsense violations | |
| COMMON_SENSE = [ | |
| "Poor Aesthetics: Visually unappealing or low-quality content.", | |
| "Temporal Inconsistency: Flickering, choppiness, or sudden appearance/disappearance of irrelevant objects.", | |
| "No violation!" | |
| ] | |
| # Example images for physical law violations | |
| EXAMPLE_IMAGES = { | |
| "newtons_law": "test_images/law_violation1.jpg", | |
| "mass_conservation": "test_images/law_violation2.jpg", | |
| "fluid.": "test_images/law_violation3.jpg", | |
| "penetration": "test_images/law_violation4.jpg", | |
| "gravity": "test_images/law_violation5.jpg" | |
| } | |
| def string_to_md5(input_string, max_digits=12): | |
| return hashlib.md5(input_string.encode()).hexdigest()[:max_digits] | |
| def generate_random_id(length=6): | |
| return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length)) | |
| class VideoAnnotator: | |
| def __init__(self, videos, annotation_base_dir, max_resolution=(640, 480)): | |
| self.annotation_base_dir = Path(annotation_base_dir) | |
| self.max_resolution = max_resolution | |
| self.videos = videos | |
| self.current_index = 0 | |
| self.file_locks = {} | |
| self.current_labeler = None | |
| self.current_labeler_file = None | |
| def get_annotation_file_path(self, labeler_email): | |
| md5_email = string_to_md5(labeler_email, max_digits=12) | |
| # random_id = generate_random_id() | |
| # file_name = f"md5-{md5_email}.{random_id}.json" | |
| file_name = f"md5-{md5_email}.json" | |
| return self.annotation_base_dir / file_name | |
| def load_annotations(self, labeler_email): | |
| file_path = self.get_annotation_file_path(labeler_email) | |
| if file_path.exists(): | |
| with open(file_path, 'r') as f: | |
| return json.load(f) | |
| return {} | |
| def save_annotations(self, labeler_email, annotations): | |
| file_path = self.get_annotation_file_path(labeler_email) | |
| self.annotation_base_dir.mkdir(parents=True, exist_ok=True) | |
| if file_path not in self.file_locks: | |
| self.file_locks[file_path] = threading.Lock() | |
| with self.file_locks[file_path]: | |
| with open(file_path, 'w') as f: | |
| json.dump(annotations, f, indent=2) | |
| def get_current_video(self): | |
| if self.videos: | |
| video_path = self.videos[self.current_index] | |
| resized_path = self.resize_video_if_needed(video_path) | |
| return str(resized_path.resolve()) | |
| return None | |
| def resize_video_if_needed(self, video_path): | |
| from moviepy.video.io.ffmpeg_writer import ffmpeg_write_video | |
| clip = VideoFileClip(str(video_path)) | |
| width, height = clip.size | |
| if width > self.max_resolution[0] or height > self.max_resolution[1]: | |
| resized_clip = clip.resize(height=self.max_resolution[1]) | |
| cleaned_name = video_path.name.replace(" ", "_") | |
| resized_path = video_path.with_name(f"resized_{cleaned_name}") | |
| fps = clip.fps if clip.fps else 8.0 | |
| ffmpeg_write_video(resized_clip, str(resized_path), fps, codec="libx264") | |
| return resized_path | |
| return video_path | |
| def update_annotation(self, video_name, labeler_email, instruction_check, law_annotations, commonsense): | |
| video_name = postprocess_name_for_gradio(video_name) | |
| annotations = self.load_annotations(labeler_email) | |
| if instruction_check and video_name not in annotations: | |
| annotations[video_name] = { | |
| "labeler": labeler_email, | |
| "law_details": law_annotations, | |
| "commonsense": commonsense, | |
| "instruction": instruction_check | |
| } | |
| self.save_annotations(labeler_email, annotations) | |
| def next_video(self): | |
| if self.videos: | |
| self.current_index = min(self.current_index + 1, len(self.videos) - 1) | |
| return self.get_current_video() | |
| def prev_video(self): | |
| if self.videos: | |
| self.current_index = max(self.current_index - 1, 0) | |
| return self.get_current_video() | |
| def jump_to_video(self, index): | |
| if self.videos: | |
| self.current_index = max(0, min(index, len(self.videos) - 1)) | |
| return self.get_current_video() | |
| def set_current_labeler(self, labeler_email): | |
| self.current_labeler = labeler_email | |
| self.current_labeler_file = self.get_annotation_file_path(labeler_email) | |
| def postprocess_name_for_gradio(name): | |
| return name.replace("–","").replace("+","").replace("-","").replace("t2v","").replace("(", "").replace(")","").replace(",","").replace("_","").replace(".","") | |
| def get_cur_data(instruction_data, video_name): | |
| video_name = postprocess_name_for_gradio(video_name) | |
| if "resized_" in video_name: | |
| clean_name = video_name.replace("resized_", "") | |
| clean_name = "_".join(clean_name.split("_")[2:]) | |
| else: | |
| clean_name = video_name | |
| # print(clean_name, instruction_data.keys()) | |
| for k in instruction_data.keys(): | |
| if k in clean_name: | |
| real_name = k | |
| cur_data = instruction_data[real_name] | |
| return cur_data | |
| def create_interface(instruction_data, videos, annotation_base_dir): | |
| annotator = VideoAnnotator(videos, annotation_base_dir) | |
| def update_video(): | |
| video_path = annotator.get_current_video() | |
| if video_path is None: | |
| return (None, annotator.current_labeler or "", "[system] Video not in benchmark", "[system] Video not in benchmark", *[False for _ in PHYSICAL_LAWS], *[False for _ in COMMON_SENSE]) | |
| video_name = Path(video_path).name | |
| cur_data = get_cur_data(instruction_data, video_name) | |
| current_annotations = {} | |
| if annotator.current_labeler: | |
| annotations = annotator.load_annotations(annotator.current_labeler) | |
| current_annotations = annotations.get( | |
| postprocess_name_for_gradio(video_name), | |
| {"labeler": annotator.current_labeler, "law_details": {law: False for law in PHYSICAL_LAWS}, "commonsense": {cs: False for cs in COMMON_SENSE}, "instruction": None} | |
| ) | |
| else: | |
| current_annotations = {"labeler": "", "law_details": {law: False for law in PHYSICAL_LAWS}, "commonsense": {cs: False for cs in COMMON_SENSE},"instruction": None} | |
| first_frame = cur_data["text_first_frame"] | |
| num_annotations = str(len(annotations)) if 'annotations' in locals() else "0" | |
| text_instruction = cur_data["text_instruction"] | |
| # Flatten the outputs | |
| outputs = [ | |
| video_path, | |
| current_annotations["labeler"] or "", | |
| num_annotations, | |
| current_annotations["instruction"], | |
| text_instruction | |
| ] | |
| # Add individual law checkbox values | |
| outputs.extend([current_annotations["law_details"].get(law, False) for law in PHYSICAL_LAWS]) | |
| # Add individual commonsense checkbox values | |
| outputs.extend([current_annotations["commonsense"].get(cs, False) for cs in COMMON_SENSE]) | |
| return outputs | |
| def save_current_annotation(video_path, labeler_email, instruction_check, law_values, commonsense_values, skipped: bool=False): | |
| if not skipped: | |
| if video_path is None: | |
| return "No video loaded to save annotations." | |
| if not labeler_email: | |
| return "Please enter a valid labeler email before saving annotations." | |
| video_name = Path(video_path).name | |
| law_annotations = {law: bool(value) for law, value in zip(PHYSICAL_LAWS, law_values)} | |
| commonsense_annotations = {cs: bool(value) for cs, value in zip(COMMON_SENSE, commonsense_values)} | |
| annotator.set_current_labeler(labeler_email) | |
| annotator.update_annotation(video_name, labeler_email, instruction_check, law_annotations, commonsense_annotations) | |
| return f"Annotation saved successfully for {labeler_email}!" | |
| else: | |
| video_name = Path(video_path).name | |
| law_annotations = {law: bool(value) for law, value in zip(PHYSICAL_LAWS, law_values)} | |
| commonsense_annotations = {cs: bool(value) for cs, value in zip(COMMON_SENSE, commonsense_values)} | |
| annotator.set_current_labeler(labeler_email) | |
| annotator.update_annotation(video_name, labeler_email, instruction_check, law_annotations, commonsense_annotations) | |
| return f"Annotation saved successfully for {labeler_email}!" | |
| def load_anns_callback(labeler_email): | |
| """ | |
| Load annotations for the given labeler email and jump to the next unlabeled video. | |
| Returns the updated interface state. | |
| """ | |
| if not labeler_email: | |
| return update_video() | |
| # Set the current labeler | |
| annotator.set_current_labeler(labeler_email) | |
| # Load existing annotations | |
| annotations = annotator.load_annotations(labeler_email) | |
| # Find the first video that hasn't been annotated | |
| next_unannotated_index = None | |
| for i, video in enumerate(annotator.videos): | |
| video_name = postprocess_name_for_gradio("resized_" + Path(video).name) | |
| if video_name not in annotations: | |
| next_unannotated_index = i | |
| break | |
| # If we found an unannotated video, jump to it | |
| if next_unannotated_index is not None: | |
| annotator.jump_to_video(next_unannotated_index) | |
| video_path = annotator.get_current_video() | |
| video_name = Path(video_path).name | |
| cur_data = get_cur_data(instruction_data, video_name) | |
| # Prepare default state for the new video | |
| return [ | |
| video_path, # video | |
| labeler_email, # labeler | |
| str(len(annotations)), # num_annotations | |
| None, # instruction_check (default value) | |
| cur_data["text_instruction"], # text_instruction | |
| *[False for _ in PHYSICAL_LAWS], # law checkboxes | |
| *[False for _ in COMMON_SENSE] # commonsense checkboxes | |
| ] | |
| else: | |
| # If all videos are annotated, stay at current video but update the interface | |
| current_video = annotator.get_current_video() | |
| if current_video: | |
| video_name = Path(current_video).name | |
| current_annotations = annotations.get( | |
| postprocess_name_for_gradio(video_name), | |
| { | |
| "labeler": labeler_email, | |
| "law_details": {law: False for law in PHYSICAL_LAWS}, | |
| "commonsense": {cs: False for cs in COMMON_SENSE}, | |
| "instruction": "3" | |
| } | |
| ) | |
| cur_data = get_cur_data(instruction_data, video_name) | |
| return [ | |
| current_video, | |
| labeler_email, | |
| str(len(annotations)), | |
| current_annotations["instruction"], | |
| cur_data["text_instruction"], | |
| *[current_annotations["law_details"].get(law, False) for law in PHYSICAL_LAWS], | |
| *[current_annotations["commonsense"].get(cs, False) for cs in COMMON_SENSE] | |
| ] | |
| else: | |
| # Fallback for empty video list | |
| return [ | |
| None, | |
| labeler_email, | |
| "0", | |
| None, | |
| "[system] No videos available", | |
| *[False for _ in PHYSICAL_LAWS], | |
| *[False for _ in COMMON_SENSE] | |
| ] | |
| def check_inputs(labeler_email, instruction_check): | |
| """Helper function to check input validity""" | |
| if not labeler_email: | |
| return False, "Please enter your email before proceeding." | |
| if not instruction_check: | |
| return False, "Please select whether the video follows the instruction before proceeding." | |
| return True, "" | |
| def confirm_callback(video_path, labeler_email, instruction_check, *checkbox_values): | |
| pass | |
| def skip_callback(video_path, labeler_email, instruction_check, *checkbox_values): | |
| ## save annotations with a flag skipped | |
| num_laws = len(PHYSICAL_LAWS) | |
| law_values = checkbox_values[:num_laws] | |
| commonsense_values = checkbox_values[num_laws:] | |
| breakpoint() | |
| save_current_annotation(video_path, labeler_email, instruction_check, law_values, commonsense_values, skipped=True) | |
| annotator.next_video() | |
| return update_video() | |
| def next_video_callback(video_path, labeler_email, instruction_check, *checkbox_values): | |
| breakpoint() | |
| # First check inputs | |
| is_valid, message = check_inputs(labeler_email, instruction_check) | |
| if not is_valid: | |
| # Return current state with error message | |
| gr.Warning(message) | |
| return update_video() | |
| # Split checkbox values into law and commonsense values | |
| num_laws = len(PHYSICAL_LAWS) | |
| law_values = checkbox_values[:num_laws] | |
| commonsense_values = checkbox_values[num_laws:] | |
| save_current_annotation(video_path, labeler_email, instruction_check, law_values, commonsense_values) | |
| annotator.next_video() | |
| return update_video() | |
| def prev_video_callback(video_path, labeler_email, instruction_check, *checkbox_values): | |
| # First check inputs | |
| is_valid, message = check_inputs(labeler_email, instruction_check) | |
| if not is_valid: | |
| # Return current state with error message | |
| gr.Warning(message) | |
| return update_video() | |
| # Split checkbox values into law and commonsense values | |
| num_laws = len(PHYSICAL_LAWS) | |
| law_values = checkbox_values[:num_laws] | |
| commonsense_values = checkbox_values[num_laws:] | |
| save_current_annotation(video_path, labeler_email, instruction_check, law_values, commonsense_values) | |
| annotator.prev_video() | |
| return update_video() | |
| with gr.Blocks() as interface: | |
| # gr.Markdown("# Video Annotation Interface") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| video = gr.Video(label="Current Video", format="mp4", height=450, width=800) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| labeler = gr.Textbox( | |
| label="Labeler ID (your email)", | |
| placeholder="Enter your email", | |
| interactive=True, | |
| ) | |
| with gr.Column(scale=1): | |
| num_annotations = gr.Textbox( | |
| label="Annotations Count", | |
| placeholder="0", | |
| interactive=False, | |
| ) | |
| text_instruction = gr.Textbox(label="Text prompt", interactive=False) | |
| instruction_check = gr.Radio( | |
| label="Task1: Does this video follow the instruction?", | |
| choices=[ | |
| "0: Not at all!!!", | |
| "1: Correct object, wrong motion (or vice versa).", | |
| "2: Follow instruction, fail task.", | |
| "3: Follow instruction, complete task." | |
| ], | |
| type="value", | |
| value="3" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| skip_btn = gr.Button("Skip! Video Corrupted") | |
| with gr.Column(scale=1): | |
| confirm_btn = gr.Button("Confirm!") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| prev_btn = gr.Button("Previous Video") | |
| with gr.Column(scale=1): | |
| next_btn = gr.Button("Next Video") | |
| load_btn = gr.Button("Load Annotations") | |
| with gr.Column(scale=1): | |
| gr.Markdown("Task2: [Based on your first impression] Select the major <span style='color: blue;'>commonsense violations</span> in the video: <span style='color: red;'>[multiple (0-2) choices]</span>") | |
| commonsense_checkboxes = [] | |
| for cs in COMMON_SENSE: | |
| commonsense_checkboxes.append(gr.Checkbox(label=cs)) | |
| gr.Markdown("Task3: Please select all physics laws the video <span style='color: blue;'>violates</span>: <span style='color: red;'>[multiple (0-5) choices]</span>") | |
| law_checkboxes = [] | |
| for i, law in enumerate(PHYSICAL_LAWS): | |
| checkbox = gr.Checkbox(label=law, interactive=True) | |
| law_checkboxes.append(checkbox) | |
| # if i < len(PHYSICAL_LAWS) - 1: | |
| # image_path = os.path.join(os.path.abspath(__file__).rsplit("/", 1)[0], list(EXAMPLE_IMAGES.values())[i]) | |
| if i != len(PHYSICAL_LAWS) - 1: | |
| image_path = list(EXAMPLE_IMAGES.values())[i] | |
| image = Image.open(image_path).convert("RGB") | |
| gr.Image(value=image, label=f"Example {i+1}", show_label=True, height=68, width=700) | |
| # Create a flat list of all inputs | |
| all_inputs = [video, labeler, instruction_check] + law_checkboxes + commonsense_checkboxes | |
| # Create a flat list of all outputs | |
| all_outputs = [video, labeler, num_annotations, instruction_check, text_instruction] + law_checkboxes + commonsense_checkboxes | |
| # Set up event handlers with flattened inputs and outputs | |
| skip_btn.click( | |
| skip_callback, | |
| inputs=all_inputs, | |
| outputs=all_outputs | |
| ) | |
| load_btn.click( | |
| load_anns_callback, | |
| inputs=[labeler], | |
| outputs=all_outputs | |
| ) | |
| next_btn.click( | |
| next_video_callback, | |
| inputs=all_inputs, | |
| outputs=all_outputs | |
| ) | |
| prev_btn.click( | |
| prev_video_callback, | |
| inputs=all_inputs, | |
| outputs=all_outputs | |
| ) | |
| interface.load( | |
| fn=update_video, | |
| inputs=None, | |
| outputs=all_outputs | |
| ) | |
| return interface | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Annotation") | |
| parser.add_argument("--domain", type=str, default="robotics", help="") | |
| parser.add_argument("--src", type=str, default="CogVideo-T2V", help="") | |
| # Parse the arguments | |
| args = parser.parse_args() | |
| domains = ["robotics", "humans", "general", "av", "game"] | |
| src = ["CogVideo-I2V", "CogVideo-T2V", "Open-Sora-I2V", "Open-Sora-T2V", "Pandora", "TurboT2V", "Open-Sora-Plan-I2V", "Open-Sora-Plan-T2V"] | |
| assert args.domain in domains, f"{args.domain} not in available domain." | |
| assert args.src in src, f"{args.src} not in available model src." | |
| instruction_base_path = "domains" | |
| src_video_map = { | |
| "CogVideo-I2V": "/home/yunhaof/workspace/datasets/outputs_v2", | |
| "CogVideo-T2V": "/home/yunhaof/workspace/datasets/outputs_v2", | |
| "Pandora": "/lustre/fsw/portfolios/nvr/users/dachengl/VILA-EWM/outputs", | |
| "Open-Sora-I2V": "/lustre/fsw/portfolios/nvr/users/dachengl/Open-Sora/outputs", | |
| "Open-Sora-T2V": "/lustre/fsw/portfolios/nvr/users/dachengl/Open-Sora/outputs", | |
| "TurboT2V": "", | |
| "Open-Sora-Plan-I2V": "/home/yunhaof/workspace/projects/Open-Sora-Plan/ewm_benchmark/gradio_videos", | |
| "Open-Sora-Plan-T2V": "/home/yunhaof/workspace/projects/Open-Sora-Plan/ewm_benchmark/gradio_videos" | |
| } | |
| # Adhoc solution to naming mismatch | |
| domain_name_map = { | |
| "humans": "humans", | |
| "game": "game", | |
| "general": "general", | |
| "av": "av", | |
| "robotics": "robotics" | |
| } | |
| cur_domain = domain_name_map[args.domain] | |
| # video_folder = "/lustre/fsw/portfolios/nvr/users/dachengl/CogVideo/outputs" | |
| video_folder = Path(src_video_map[args.src]) | |
| # print("Processing the 100 videos for the current annotation.") | |
| videos = [] | |
| if args.src == "CogVideo-I2V": | |
| for v in video_folder.glob("*.mp4"): | |
| if "t2v" not in v.stem and "resized_" not in v.stem and f"{cur_domain}_" in v.stem: | |
| videos.append(v) | |
| elif args.src == "CogVideo-T2V": | |
| for v in video_folder.glob("*.mp4"): | |
| if "t2v" in v.stem and "resized_" not in v.stem and f"{cur_domain}_" in v.stem: | |
| videos.append(v) | |
| elif args.src == "Pandora": | |
| for v in video_folder.glob("*.mp4"): | |
| if "resized_" not in v.stem and f"{cur_domain}_" in v.stem: | |
| videos.append(v) | |
| elif args.src == "Open-Sora-I2V": | |
| for v in video_folder.glob("*.mp4"): | |
| if "t2v" not in v.stem and "resized_" not in v.stem and f"{cur_domain}_" in v.stem: | |
| videos.append(v) | |
| elif args.src == "Open-Sora-T2V": | |
| for v in video_folder.glob("*.mp4"): | |
| if "t2v" in v.stem and "resized_" not in v.stem and f"{cur_domain}_" in v.stem: | |
| videos.append(v) | |
| elif args.src == "Open-Sora-Plan-I2V": | |
| for v in video_folder.glob("*.mp4"): | |
| if "t2v" not in v.stem and "resized_" not in v.stem and f"{cur_domain}_" in v.stem: | |
| videos.append(v) | |
| elif args.src == "Open-Sora-Plan-T2V": | |
| for v in video_folder.glob("*.mp4"): | |
| if "t2v" in v.stem and "resized_" not in v.stem and f"{cur_domain}_" in v.stem: | |
| videos.append(v) | |
| elif args.src == "TurboT2V": | |
| for v in video_folder.glob("*.mp4"): | |
| if "t2v" in v.stem and "resized_" not in v.stem and f"{cur_domain}_" in v.stem: | |
| videos.append(v) | |
| videos = sorted(videos) | |
| print(f"Number of videos: {len(videos)}") | |
| instruction_file = f"domains/{args.domain}/dataset_v2/instruction_ewm.json" | |
| annotation_base = "annotations" | |
| os.makedirs(annotation_base, exist_ok=True) | |
| annotation_dir = os.path.join(annotation_base, f"{args.domain}_{args.src}") | |
| instruction_data = {} | |
| with open(instruction_file, "r") as f: | |
| instructions = json.load(f) | |
| for instruction in instructions: | |
| file_name = os.path.basename(instruction["video_path"]) | |
| # gradio will eliminate - | |
| file_name = postprocess_name_for_gradio(file_name)#.replace("-", "").replace("_t2v","") | |
| instruction_data[file_name] = instruction | |
| # perform a check that these videos will appear on the instruction, with or without the resized_ | |
| for _video in videos: | |
| try: | |
| _ = get_cur_data(instruction_data, postprocess_name_for_gradio(Path(_video).name))#.replace("-", "").replace("_t2v","")) | |
| except: | |
| print(f"parsing name {_video} fails, you may want to look at the name in instruction_ewm.json") | |
| assert False | |
| try: | |
| _ = get_cur_data(instruction_data, "resized_" + postprocess_name_for_gradio(Path(_video).name))# .replace("-", "").replace("_t2v","")) | |
| except: | |
| print(f"parsing name resized_{_video} fails, you may want to look at the name in instruction_ewm.json") | |
| assert False | |
| iface = create_interface(instruction_data, videos, annotation_dir) | |
| iface.launch(share=True, allowed_paths=[src_video_map[args.src]]) | |