Spaces:
Paused
Paused
| # Imports | |
| import gradio as gr | |
| import spaces | |
| import torch | |
| import os | |
| import math | |
| import gc | |
| import librosa | |
| import tempfile | |
| from PIL import Image, ImageSequence | |
| from decord import VideoReader, cpu | |
| from moviepy.editor import VideoFileClip | |
| from transformers import AutoModel, AutoTokenizer, AutoProcessor | |
| # Variables | |
| DEVICE = "auto" | |
| if DEVICE == "auto": | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"[SYSTEM] | Using {DEVICE} type compute device.") | |
| DEFAULT_INPUT = "Describe in one short sentence." | |
| MAX_FRAMES = 64 | |
| AUDIO_SR = 16000 | |
| model_name = "openbmb/MiniCPM-o-2_6" | |
| repo = AutoModel.from_pretrained(model_name, trust_remote_code=True, attn_implementation="sdpa", torch_dtype=torch.bfloat16).to(DEVICE) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) | |
| processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True) | |
| css = ''' | |
| .gradio-container{max-width: 560px !important} | |
| h1{text-align:center} | |
| footer { | |
| visibility: hidden | |
| } | |
| ''' | |
| input_prefixes = { | |
| "Image": "(A image file called β has been attached, describe the image content) ", | |
| "GIF": "(A GIF file called β has been attached, describe the GIF content) ", | |
| "Video": "(A audio video file called β has been attached, describe the video content and the audio content) ", | |
| "Audio": "(A audio file called β has been attached, describe the audio content) ", | |
| } | |
| filetypes = { | |
| "Image": [".jpg", ".jpeg", ".png", ".bmp"], | |
| "GIF": [".gif"], | |
| "Video": [".mp4", ".mov", ".avi", ".mkv"], | |
| "Audio": [".wav", ".mp3", ".flac", ".aac"], | |
| } | |
| # Functions | |
| def infer_filetype(ext): | |
| return next((k for k, v in filetypes.items() if ext in v), None) | |
| def uniform_sample(seq, n): | |
| step = max(len(seq) // n, 1) | |
| return seq[::step][:n] | |
| def frames_from_video(path): | |
| vr = VideoReader(path, ctx = cpu(0)) | |
| idx = uniform_sample(range(len(vr)), MAX_FRAMES) | |
| batch = vr.get_batch(idx).asnumpy() | |
| return [Image.fromarray(frame.astype("uint8")) for frame in batch] | |
| def audio_from_video(path): | |
| clip = VideoFileClip(path) | |
| with tempfile.NamedTemporaryFile(suffix = ".wav", delete = True) as tmp: | |
| clip.audio.write_audiofile(tmp.name, codec = "pcm_s16le", | |
| fps = AUDIO_SR, verbose = False, logger = None) | |
| audio_np, _ = librosa.load(tmp.name, sr = AUDIO_SR, mono = True) | |
| clip.close() | |
| return audio_np | |
| def load_audio(path): | |
| audio_np, _ = librosa.load(path, sr = AUDIO_SR, mono = True) | |
| return audio_np | |
| def build_video_omni(path, prefix, instruction): | |
| frames = frames_from_video(path) | |
| audio = audio_from_video(path) | |
| contents = [prefix + instruction] | |
| total = max(len(frames), math.ceil(len(audio) / AUDIO_SR)) | |
| for i in range(total): | |
| frame = frames[i] if i < len(frames) else frames[-1] | |
| chunk = audio[AUDIO_SR * i : AUDIO_SR * (i + 1)] | |
| contents.extend(["<unit>", frame, chunk]) | |
| return contents | |
| def build_image_omni(path, prefix, instruction): | |
| image = Image.open(path).convert("RGB") | |
| return [prefix + instruction, image] | |
| def build_gif_omni(path, prefix, instruction): | |
| img = Image.open(path) | |
| frames = [f.copy().convert("RGB") for f in ImageSequence.Iterator(img)] | |
| frames = uniform_sample(frames, MAX_FRAMES) | |
| return [prefix + instruction, *frames] | |
| def build_audio_omni(path, prefix, instruction): | |
| audio = load_audio(path) | |
| return [prefix + instruction, audio] | |
| def generate(input, | |
| instruction = DEFAULT_INPUT, | |
| sampling = False, | |
| temperature = 0.7, | |
| top_p = 0.8, | |
| top_k = 100, | |
| repetition_penalty = 1.05, | |
| max_tokens = 512): | |
| if not input: | |
| return "no input provided." | |
| extension = os.path.splitext(input)[1].lower() | |
| filetype = infer_filetype(extension) | |
| if not filetype: | |
| return "unsupported file type." | |
| filename = os.path.basename(input) | |
| prefix = input_prefixes[filetype].replace("β", filename) | |
| builder_map = { | |
| "Image": build_image_omni, | |
| "GIF" : build_gif_omni, | |
| "Video": build_video_omni, | |
| "Audio": build_audio_omni | |
| } | |
| omni_content = builder_map[filetype](input, prefix, instruction) | |
| sys_msg = repo.get_sys_prompt(mode = "omni", language = "en") | |
| msgs = [sys_msg, { "role": "user", "content": omni_content }] | |
| output = repo.chat( | |
| msgs = msgs, | |
| tokenizer = tokenizer, | |
| sampling = sampling, | |
| temperature = temperature, | |
| top_p = top_p, | |
| top_k = top_k, | |
| repetition_penalty = repetition_penalty, | |
| max_new_tokens = max_tokens, | |
| omni_input = True, | |
| use_image_id = False, | |
| max_slice_nums = 2 | |
| ) | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| return output | |
| def cloud(): | |
| print("[CLOUD] | Space maintained.") | |
| # Initialize | |
| with gr.Blocks(css=css) as main: | |
| with gr.Column(): | |
| input = gr.File(label="Input", file_types=["image", "video", "audio"], type="filepath") | |
| instruction = gr.Textbox(lines=1, value=DEFAULT_INPUT, label="Instruction") | |
| sampling = gr.Checkbox(value=False, label="Sampling") | |
| temperature = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=0.7, label="Temperature") | |
| top_p = gr.Slider(minimum=0, maximum=1, step=0.01, value=0.8, label="Top P") | |
| top_k = gr.Slider(minimum=0, maximum=1000, step=1, value=100, label="Top K") | |
| repetition_penalty = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=1.05, label="Repetition Penalty") | |
| max_tokens = gr.Slider(minimum=1, maximum=4096, step=1, value=512, label="Max Tokens") | |
| submit = gr.Button("βΆ") | |
| maintain = gr.Button("βοΈ") | |
| with gr.Column(): | |
| output = gr.Textbox(lines=1, value="", label="Output") | |
| submit.click(fn=generate, inputs=[input, instruction, sampling, temperature, top_p, top_k, repetition_penalty, max_tokens], outputs=[output], queue=False) | |
| maintain.click(cloud, inputs=[], outputs=[], queue=False) | |
| main.launch(show_api=True) |