import os from typing import Tuple, Optional import cv2 import gradio as gr import numpy as np #import spaces import supervision as sv import torch from PIL import Image from tqdm import tqdm from utils.video import generate_unique_name, create_directory, delete_directory from utils.florence import load_florence_model, run_florence_inference, \ FLORENCE_DETAILED_CAPTION_TASK, \ FLORENCE_CAPTION_TO_PHRASE_GROUNDING_TASK, FLORENCE_OPEN_VOCABULARY_DETECTION_TASK from utils.modes import IMAGE_INFERENCE_MODES, IMAGE_OPEN_VOCABULARY_DETECTION_MODE, \ IMAGE_CAPTION_GROUNDING_MASKS_MODE, VIDEO_INFERENCE_MODES from utils.sam import load_sam_image_model, run_sam_inference, load_sam_video_model MARKDOWN = """ # Florence2 + SAM2 🔥
This demo integrates Florence2 and SAM2 by creating a two-stage inference pipeline. In the first stage, Florence2 performs tasks such as object detection, open-vocabulary object detection, image captioning, or phrase grounding. In the second stage, SAM2 performs object segmentation on the image. """ IMAGE_PROCESSING_EXAMPLES = [ [IMAGE_OPEN_VOCABULARY_DETECTION_MODE, "https://media.roboflow.com/notebooks/examples/dog-2.jpeg", 'straw, white napkin, black napkin, hair'], [IMAGE_OPEN_VOCABULARY_DETECTION_MODE, "https://media.roboflow.com/notebooks/examples/dog-3.jpeg", 'tail'], [IMAGE_CAPTION_GROUNDING_MASKS_MODE, "https://media.roboflow.com/notebooks/examples/dog-2.jpeg", None], [IMAGE_CAPTION_GROUNDING_MASKS_MODE, "https://media.roboflow.com/notebooks/examples/dog-3.jpeg", None], ] VIDEO_PROCESSING_EXAMPLES = [ ["videos/clip-07-camera-1.mp4", "player in white outfit, player in black outfit, ball, rim"], ["videos/clip-07-camera-2.mp4", "player in white outfit, player in black outfit, ball, rim"], ["videos/clip-07-camera-3.mp4", "player in white outfit, player in black outfit, ball, rim"] ] VIDEO_SCALE_FACTOR = 0.5 VIDEO_TARGET_DIRECTORY = "tmp" create_directory(directory_path=VIDEO_TARGET_DIRECTORY) DEVICE = torch.device("cuda") # DEVICE = torch.device("cpu") torch.autocast(device_type="cuda", dtype=torch.bfloat16).__enter__() if torch.cuda.get_device_properties(0).major >= 8: torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True FLORENCE_MODEL, FLORENCE_PROCESSOR = load_florence_model(device=DEVICE) SAM_IMAGE_MODEL = load_sam_image_model(device=DEVICE) SAM_VIDEO_MODEL = load_sam_video_model(device=DEVICE) COLORS = ['#FF1493', '#00BFFF', '#FF6347', '#FFD700', '#32CD32', '#8A2BE2'] COLOR_PALETTE = sv.ColorPalette.from_hex(COLORS) BOX_ANNOTATOR = sv.BoxAnnotator(color=COLOR_PALETTE, color_lookup=sv.ColorLookup.INDEX) LABEL_ANNOTATOR = sv.LabelAnnotator( color=COLOR_PALETTE, color_lookup=sv.ColorLookup.INDEX, text_position=sv.Position.CENTER_OF_MASS, text_color=sv.Color.from_hex("#000000"), border_radius=5 ) MASK_ANNOTATOR = sv.MaskAnnotator( color=COLOR_PALETTE, color_lookup=sv.ColorLookup.INDEX ) def annotate_image(image, detections): output_image = image.copy() output_image = MASK_ANNOTATOR.annotate(output_image, detections) output_image = BOX_ANNOTATOR.annotate(output_image, detections) output_image = LABEL_ANNOTATOR.annotate(output_image, detections) return output_image def on_mode_dropdown_change(text): return [ gr.Textbox(visible=text == IMAGE_OPEN_VOCABULARY_DETECTION_MODE), gr.Textbox(visible=text == IMAGE_CAPTION_GROUNDING_MASKS_MODE), ] #@spaces.GPU @torch.inference_mode() @torch.autocast(device_type="cuda", dtype=torch.bfloat16) def process_image( mode_dropdown, image_input, text_input ) -> Tuple[Optional[Image.Image], Optional[str]]: if not image_input: gr.Info("Please upload an image.") return None, None if mode_dropdown == IMAGE_OPEN_VOCABULARY_DETECTION_MODE: if not text_input: gr.Info("Please enter a text prompt.") return None, None texts = [prompt.strip() for prompt in text_input.split(",")] detections_list = [] for text in texts: _, result = run_florence_inference( model=FLORENCE_MODEL, processor=FLORENCE_PROCESSOR, device=DEVICE, image=image_input, task=FLORENCE_OPEN_VOCABULARY_DETECTION_TASK, text=text ) detections = sv.Detections.from_lmm( lmm=sv.LMM.FLORENCE_2, result=result, resolution_wh=image_input.size ) detections = run_sam_inference(SAM_IMAGE_MODEL, image_input, detections) detections_list.append(detections) detections = sv.Detections.merge(detections_list) detections = run_sam_inference(SAM_IMAGE_MODEL, image_input, detections) return annotate_image(image_input, detections), None if mode_dropdown == IMAGE_CAPTION_GROUNDING_MASKS_MODE: _, result = run_florence_inference( model=FLORENCE_MODEL, processor=FLORENCE_PROCESSOR, device=DEVICE, image=image_input, task=FLORENCE_DETAILED_CAPTION_TASK ) caption = result[FLORENCE_DETAILED_CAPTION_TASK] _, result = run_florence_inference( model=FLORENCE_MODEL, processor=FLORENCE_PROCESSOR, device=DEVICE, image=image_input, task=FLORENCE_CAPTION_TO_PHRASE_GROUNDING_TASK, text=caption ) detections = sv.Detections.from_lmm( lmm=sv.LMM.FLORENCE_2, result=result, resolution_wh=image_input.size ) detections = run_sam_inference(SAM_IMAGE_MODEL, image_input, detections) return annotate_image(image_input, detections), caption #@spaces.GPU(duration=300) @torch.inference_mode() @torch.autocast(device_type="cuda", dtype=torch.bfloat16) def process_video( video_input, text_input, progress=gr.Progress(track_tqdm=True) ) -> Optional[str]: if not video_input: gr.Info("Please upload a video.") return None if not text_input: gr.Info("Please enter a text prompt.") return None frame_generator = sv.get_video_frames_generator(video_input) frame = next(frame_generator) frame = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) texts = [prompt.strip() for prompt in text_input.split(",")] detections_list = [] for text in texts: _, result = run_florence_inference( model=FLORENCE_MODEL, processor=FLORENCE_PROCESSOR, device=DEVICE, image=frame, task=FLORENCE_OPEN_VOCABULARY_DETECTION_TASK, text=text ) detections = sv.Detections.from_lmm( lmm=sv.LMM.FLORENCE_2, result=result, resolution_wh=frame.size ) detections = run_sam_inference(SAM_IMAGE_MODEL, frame, detections) detections_list.append(detections) detections = sv.Detections.merge(detections_list) detections = run_sam_inference(SAM_IMAGE_MODEL, frame, detections) if len(detections.mask) == 0: gr.Info( "No objects of class {text_input} found in the first frame of the video. " "Trim the video to make the object appear in the first frame or try a " "different text prompt." ) return None name = generate_unique_name() frame_directory_path = os.path.join(VIDEO_TARGET_DIRECTORY, name) frames_sink = sv.ImageSink( target_dir_path=frame_directory_path, image_name_pattern="{:05d}.jpeg" ) video_info = sv.VideoInfo.from_video_path(video_input) video_info.width = int(video_info.width * VIDEO_SCALE_FACTOR) video_info.height = int(video_info.height * VIDEO_SCALE_FACTOR) frames_generator = sv.get_video_frames_generator(video_input) with frames_sink: for frame in tqdm( frames_generator, total=video_info.total_frames, desc="splitting video into frames" ): frame = sv.scale_image(frame, VIDEO_SCALE_FACTOR) frames_sink.save_image(frame) inference_state = SAM_VIDEO_MODEL.init_state( video_path=frame_directory_path, device=DEVICE ) for mask_index, mask in enumerate(detections.mask): _, object_ids, mask_logits = SAM_VIDEO_MODEL.add_new_mask( inference_state=inference_state, frame_idx=0, obj_id=mask_index, mask=mask ) video_path = os.path.join(VIDEO_TARGET_DIRECTORY, f"{name}.mp4") frames_generator = sv.get_video_frames_generator(video_input) masks_generator = SAM_VIDEO_MODEL.propagate_in_video(inference_state) with sv.VideoSink(video_path, video_info=video_info) as sink: for frame, (_, tracker_ids, mask_logits) in zip(frames_generator, masks_generator): frame = sv.scale_image(frame, VIDEO_SCALE_FACTOR) masks = (mask_logits > 0.0).cpu().numpy().astype(bool) if len(masks.shape) == 4: masks = np.squeeze(masks, axis=1) detections = sv.Detections( xyxy=sv.mask_to_xyxy(masks=masks), mask=masks, class_id=np.array(tracker_ids) ) annotated_frame = frame.copy() annotated_frame = MASK_ANNOTATOR.annotate( scene=annotated_frame, detections=detections) annotated_frame = BOX_ANNOTATOR.annotate( scene=annotated_frame, detections=detections) sink.write_frame(annotated_frame) delete_directory(frame_directory_path) return video_path with gr.Blocks() as demo: gr.Markdown(MARKDOWN) with gr.Tab("Image"): image_processing_mode_dropdown_component = gr.Dropdown( choices=IMAGE_INFERENCE_MODES, value=IMAGE_INFERENCE_MODES[0], label="Mode", info="Select a mode to use.", interactive=True ) with gr.Row(): with gr.Column(): image_processing_image_input_component = gr.Image( type='pil', label='Upload image') image_processing_text_input_component = gr.Textbox( label='Text prompt', placeholder='Enter comma separated text prompts') image_processing_submit_button_component = gr.Button( value='Submit', variant='primary') with gr.Column(): image_processing_image_output_component = gr.Image( type='pil', label='Image output') image_processing_text_output_component = gr.Textbox( label='Caption output', visible=False) with gr.Row(): gr.Examples( fn=process_image, examples=IMAGE_PROCESSING_EXAMPLES, inputs=[ image_processing_mode_dropdown_component, image_processing_image_input_component, image_processing_text_input_component ], outputs=[ image_processing_image_output_component, image_processing_text_output_component ], run_on_click=True ) with gr.Tab("Video"): video_processing_mode_dropdown_component = gr.Dropdown( choices=VIDEO_INFERENCE_MODES, value=VIDEO_INFERENCE_MODES[0], label="Mode", info="Select a mode to use.", interactive=True ) with gr.Row(): with gr.Column(): video_processing_video_input_component = gr.Video( label='Upload video') video_processing_text_input_component = gr.Textbox( label='Text prompt', placeholder='Enter comma separated text prompts') video_processing_submit_button_component = gr.Button( value='Submit', variant='primary') with gr.Column(): video_processing_video_output_component = gr.Video( label='Video output') with gr.Row(): gr.Examples( fn=process_video, examples=VIDEO_PROCESSING_EXAMPLES, inputs=[ video_processing_video_input_component, video_processing_text_input_component ], outputs=video_processing_video_output_component, run_on_click=True ) image_processing_submit_button_component.click( fn=process_image, inputs=[ image_processing_mode_dropdown_component, image_processing_image_input_component, image_processing_text_input_component ], outputs=[ image_processing_image_output_component, image_processing_text_output_component ] ) image_processing_text_input_component.submit( fn=process_image, inputs=[ image_processing_mode_dropdown_component, image_processing_image_input_component, image_processing_text_input_component ], outputs=[ image_processing_image_output_component, image_processing_text_output_component ] ) image_processing_mode_dropdown_component.change( on_mode_dropdown_change, inputs=[image_processing_mode_dropdown_component], outputs=[ image_processing_text_input_component, image_processing_text_output_component ] ) video_processing_submit_button_component.click( fn=process_video, inputs=[ video_processing_video_input_component, video_processing_text_input_component ], outputs=video_processing_video_output_component ) video_processing_text_input_component.submit( fn=process_video, inputs=[ video_processing_video_input_component, video_processing_text_input_component ], outputs=video_processing_video_output_component ) demo.launch(debug=False, show_error=True, share=True)