import os import random import uuid import json import time import asyncio from threading import Thread from typing import Iterable import gradio as gr import spaces import torch import numpy as np from PIL import Image import cv2 from transformers import ( Qwen2VLForConditionalGeneration, Qwen2_5_VLForConditionalGeneration, AutoModelForImageTextToText, AutoProcessor, TextIteratorStreamer, ) from transformers.image_utils import load_image from gradio.themes import Soft from gradio.themes.utils import colors, fonts, sizes # --- Theme and CSS Definition --- # Define the Thistle color palette colors.thistle = colors.Color( name="thistle", c50="#F9F5F9", c100="#F0E8F1", c200="#E7DBE8", c300="#DECEE0", c400="#D2BFD8", c500="#D8BFD8", # Thistle base color c600="#B59CB7", c700="#927996", c800="#6F5675", c900="#4C3454", c950="#291233", ) colors.red_gray = colors.Color( name="red_gray", c50="#f7eded", c100="#f5dcdc", c200="#efb4b4", c300="#e78f8f", c400="#d96a6a", c500="#c65353", c600="#b24444", c700="#8f3434", c800="#732d2d", c900="#5f2626", c950="#4d2020", ) class ThistleTheme(Soft): def __init__( self, *, primary_hue: colors.Color | str = colors.gray, secondary_hue: colors.Color | str = colors.thistle, # Use the new color neutral_hue: colors.Color | str = colors.slate, text_size: sizes.Size | str = sizes.text_lg, font: fonts.Font | str | Iterable[fonts.Font | str] = ( fonts.GoogleFont("Outfit"), "Arial", "sans-serif", ), font_mono: fonts.Font | str | Iterable[fonts.Font | str] = ( fonts.GoogleFont("IBM Plex Mono"), "ui-monospace", "monospace", ), ): super().__init__( primary_hue=primary_hue, secondary_hue=secondary_hue, neutral_hue=neutral_hue, text_size=text_size, font=font, font_mono=font_mono, ) super().set( background_fill_primary="*primary_50", background_fill_primary_dark="*primary_900", body_background_fill="linear-gradient(135deg, *primary_200, *primary_100)", body_background_fill_dark="linear-gradient(135deg, *primary_900, *primary_800)", button_primary_text_color="black", button_primary_text_color_hover="white", button_primary_background_fill="linear-gradient(90deg, *secondary_400, *secondary_500)", button_primary_background_fill_hover="linear-gradient(90deg, *secondary_500, *secondary_600)", button_primary_background_fill_dark="linear-gradient(90deg, *secondary_600, *secondary_700)", button_primary_background_fill_hover_dark="linear-gradient(90deg, *secondary_500, *secondary_600)", button_secondary_text_color="black", button_secondary_text_color_hover="white", button_secondary_background_fill="linear-gradient(90deg, *primary_300, *primary_300)", button_secondary_background_fill_hover="linear-gradient(90deg, *primary_400, *primary_400)", button_secondary_background_fill_dark="linear-gradient(90deg, *primary_500, *primary_600)", button_secondary_background_fill_hover_dark="linear-gradient(90deg, *primary_500, *primary_500)", slider_color="*secondary_400", slider_color_dark="*secondary_600", block_title_text_weight="600", block_border_width="3px", block_shadow="*shadow_drop_lg", button_primary_shadow="*shadow_drop_lg", button_large_padding="11px", color_accent_soft="*primary_100", block_label_background_fill="*primary_200", ) # Instantiate the new theme thistle_theme = ThistleTheme() css = """ #main-title h1 { font-size: 2.3em !important; } #output-title h2 { font-size: 2.1em !important; } :root { --color-grey-50: #f9fafb; --banner-background: var(--secondary-400); --banner-text-color: var(--primary-100); --banner-background-dark: var(--secondary-800); --banner-text-color-dark: var(--primary-100); --banner-chrome-height: calc(16px + 43px); --chat-chrome-height-wide-no-banner: 320px; --chat-chrome-height-narrow-no-banner: 450px; --chat-chrome-height-wide: calc(var(--chat-chrome-height-wide-no-banner) + var(--banner-chrome-height)); --chat-chrome-height-narrow: calc(var(--chat-chrome-height-narrow-no-banner) + var(--banner-chrome-height)); } .banner-message { background-color: var(--banner-background); padding: 5px; margin: 0; border-radius: 5px; border: none; } .banner-message-text { font-size: 13px; font-weight: bolder; color: var(--banner-text-color) !important; } body.dark .banner-message { background-color: var(--banner-background-dark) !important; } body.dark .gradio-container .contain .banner-message .banner-message-text { color: var(--banner-text-color-dark) !important; } .toast-body { background-color: var(--color-grey-50); } .html-container:has(.css-styles) { padding: 0; margin: 0; } .css-styles { height: 0; } .model-message { text-align: end; } .model-dropdown-container { display: flex; align-items: center; gap: 10px; padding: 0; } .user-input-container .multimodal-textbox{ border: none !important; } .control-button { height: 51px; } button.cancel { border: var(--button-border-width) solid var(--button-cancel-border-color); background: var(--button-cancel-background-fill); color: var(--button-cancel-text-color); box-shadow: var(--button-cancel-shadow); } button.cancel:hover, .cancel[disabled] { background: var(--button-cancel-background-fill-hover); color: var(--button-cancel-text-color-hover); } .opt-out-message { top: 8px; } .opt-out-message .html-container, .opt-out-checkbox label { font-size: 14px !important; padding: 0 !important; margin: 0 !important; color: var(--neutral-400) !important; } div.block.chatbot { height: calc(100svh - var(--chat-chrome-height-wide)) !important; max-height: 900px !important; } div.no-padding { padding: 0 !important; } @media (max-width: 1280px) { div.block.chatbot { height: calc(100svh - var(--chat-chrome-height-wide)) !important; } } @media (max-width: 1024px) { .responsive-row { flex-direction: column; } .model-message { text-align: start; font-size: 10px !important; } .model-dropdown-container { flex-direction: column; align-items: flex-start; } div.block.chatbot { height: calc(100svh - var(--chat-chrome-height-narrow)) !important; } } @media (max-width: 400px) { .responsive-row { flex-direction: column; } .model-message { text-align: start; font-size: 10px !important; } .model-dropdown-container { flex-direction: column; align-items: flex-start; } div.block.chatbot { max-height: 360px !important; } } @media (max-height: 932px) { .chatbot { max-height: 500px !important; } } @media (max-height: 1280px) { div.block.chatbot { max-height: 800px !important; } } """ # Constants for text generation MAX_MAX_NEW_TOKENS = 2048 DEFAULT_MAX_NEW_TOKENS = 1024 MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") print("CUDA_VISIBLE_DEVICES=", os.environ.get("CUDA_VISIBLE_DEVICES")) print("torch.__version__ =", torch.__version__) print("torch.version.cuda =", torch.version.cuda) print("cuda available:", torch.cuda.is_available()) print("cuda device count:", torch.cuda.device_count()) if torch.cuda.is_available(): print("current device:", torch.cuda.current_device()) print("device name:", torch.cuda.get_device_name(torch.cuda.current_device())) print("Using device:", device) # --- Model Loading --- # Load Nanonets-OCR-s MODEL_ID_V = "nanonets/Nanonets-OCR-s" processor_v = AutoProcessor.from_pretrained(MODEL_ID_V, trust_remote_code=True) model_v = Qwen2_5_VLForConditionalGeneration.from_pretrained( MODEL_ID_V, trust_remote_code=True, torch_dtype=torch.float16 ).to(device).eval() # Load Qwen2-VL-OCR-2B-Instruct MODEL_ID_X = "prithivMLmods/Qwen2-VL-OCR-2B-Instruct" processor_x = AutoProcessor.from_pretrained(MODEL_ID_X, trust_remote_code=True) model_x = Qwen2VLForConditionalGeneration.from_pretrained( MODEL_ID_X, trust_remote_code=True, torch_dtype=torch.float16 ).to(device).eval() # Load Aya-Vision-8b MODEL_ID_A = "CohereForAI/aya-vision-8b" processor_a = AutoProcessor.from_pretrained(MODEL_ID_A, trust_remote_code=True) model_a = AutoModelForImageTextToText.from_pretrained( MODEL_ID_A, trust_remote_code=True, torch_dtype=torch.float16 ).to(device).eval() # Load olmOCR-7B-0725 MODEL_ID_W = "allenai/olmOCR-7B-0725" processor_w = AutoProcessor.from_pretrained(MODEL_ID_W, trust_remote_code=True) model_w = Qwen2_5_VLForConditionalGeneration.from_pretrained( MODEL_ID_W, trust_remote_code=True, torch_dtype=torch.float16 ).to(device).eval() # Load RolmOCR MODEL_ID_M = "reducto/RolmOCR" processor_m = AutoProcessor.from_pretrained(MODEL_ID_M, trust_remote_code=True) model_m = Qwen2_5_VLForConditionalGeneration.from_pretrained( MODEL_ID_M, trust_remote_code=True, torch_dtype=torch.float16 ).to(device).eval() def downsample_video(video_path): """ Downsamples the video to evenly spaced frames. Each frame is returned as a PIL image along with its timestamp. """ vidcap = cv2.VideoCapture(video_path) total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = vidcap.get(cv2.CAP_PROP_FPS) frames = [] frame_indices = np.linspace(0, total_frames - 1, 10, dtype=int) for i in frame_indices: vidcap.set(cv2.CAP_PROP_POS_FRAMES, i) success, image = vidcap.read() if success: image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(image) timestamp = round(i / fps, 2) frames.append((pil_image, timestamp)) vidcap.release() return frames @spaces.GPU def generate_image(model_name: str, text: str, image: Image.Image, max_new_tokens: int = 1024, temperature: float = 0.6, top_p: float = 0.9, top_k: int = 50, repetition_penalty: float = 1.2): """ Generates responses using the selected model for image input. Yields raw text and Markdown-formatted text. """ if model_name == "RolmOCR-7B": processor = processor_m model = model_m elif model_name == "Qwen2-VL-OCR-2B": processor = processor_x model = model_x elif model_name == "Nanonets-OCR-s": processor = processor_v model = model_v elif model_name == "Aya-Vision-8B": processor = processor_a model = model_a elif model_name == "olmOCR-7B-0725": processor = processor_w model = model_w else: yield "Invalid model selected.", "Invalid model selected." return if image is None: yield "Please upload an image.", "Please upload an image." return messages = [{ "role": "user", "content": [ {"type": "image"}, {"type": "text", "text": text}, ] }] prompt_full = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = processor( text=[prompt_full], images=[image], return_tensors="pt", padding=True, truncation=True, max_length=MAX_INPUT_TOKEN_LENGTH ).to(device) streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True) generation_kwargs = {**inputs, "streamer": streamer, "max_new_tokens": max_new_tokens} thread = Thread(target=model.generate, kwargs=generation_kwargs) thread.start() buffer = "" for new_text in streamer: buffer += new_text buffer = buffer.replace("<|im_end|>", "") time.sleep(0.01) yield buffer, buffer @spaces.GPU def generate_video(model_name: str, text: str, video_path: str, max_new_tokens: int = 1024, temperature: float = 0.6, top_p: float = 0.9, top_k: int = 50, repetition_penalty: float = 1.2): """ Generates responses using the selected model for video input. Yields raw text and Markdown-formatted text. """ if model_name == "RolmOCR-7B": processor = processor_m model = model_m elif model_name == "Qwen2-VL-OCR-2B": processor = processor_x model = model_x elif model_name == "Nanonets-OCR-s": processor = processor_v model = model_v elif model_name == "Aya-Vision-8B": processor = processor_a model = model_a elif model_name == "olmOCR-7B-0725": processor = processor_w model = model_w else: yield "Invalid model selected.", "Invalid model selected." return if video_path is None: yield "Please upload a video.", "Please upload a video." return frames_with_ts = downsample_video(video_path) images_for_processor = [frame for frame, ts in frames_with_ts] messages = [{"role": "user", "content": [{"type": "text", "text": text}]}] for frame in images_for_processor: messages[0]["content"].insert(0, {"type": "image"}) prompt_full = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = processor( text=[prompt_full], images=images_for_processor, return_tensors="pt", padding=True, truncation=True, max_length=MAX_INPUT_TOKEN_LENGTH ).to(device) streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True) generation_kwargs = { **inputs, "streamer": streamer, "max_new_tokens": max_new_tokens, "do_sample": True, "temperature": temperature, "top_p": top_p, "top_k": top_k, "repetition_penalty": repetition_penalty, } thread = Thread(target=model.generate, kwargs=generation_kwargs) thread.start() buffer = "" for new_text in streamer: buffer += new_text buffer = buffer.replace("<|im_end|>", "") time.sleep(0.01) yield buffer, buffer # Define examples for image and video inference image_examples = [ ["Extract the full page.", "images/ocr.png"], ["Extract the content.", "images/4.png"], ["Convert this page to doc [table] precisely for markdown.", "images/0.png"] ] video_examples = [ ["Explain the Ad in Detail.", "videos/1.mp4"], ] # Create the Gradio Interface with gr.Blocks(css=css, theme=thistle_theme) as demo: gr.Markdown("# **Multimodal OCR**", elem_id="main-title") with gr.Row(): with gr.Column(scale=2): with gr.Tabs(): with gr.TabItem("Image Inference"): image_query = gr.Textbox(label="Query Input", placeholder="Enter your query here...") image_upload = gr.Image(type="pil", label="Upload Image", height=290) image_submit = gr.Button("Submit", variant="primary") gr.Examples( examples=image_examples, inputs=[image_query, image_upload] ) with gr.TabItem("Video Inference"): video_query = gr.Textbox(label="Query Input", placeholder="Enter your query here...") video_upload = gr.Video(label="Upload Video", height=290) video_submit = gr.Button("Submit", variant="primary") gr.Examples( examples=video_examples, inputs=[video_query, video_upload] ) with gr.Accordion("Advanced options", open=False): max_new_tokens = gr.Slider(label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS) temperature = gr.Slider(label="Temperature", minimum=0.1, maximum=4.0, step=0.1, value=0.6) top_p = gr.Slider(label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9) top_k = gr.Slider(label="Top-k", minimum=1, maximum=1000, step=1, value=50) repetition_penalty = gr.Slider(label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.2) with gr.Column(scale=3): gr.Markdown("## Output", elem_id="output-title") output = gr.Textbox(label="Raw Output Stream", interactive=False, lines=11, show_copy_button=True) with gr.Accordion("(Result.md)", open=False): markdown_output = gr.Markdown(label="(Result.Md)", latex_delimiters=[ {"left": "$$", "right": "$$", "display": True}, {"left": "$", "right": "$", "display": False} ]) model_choice = gr.Radio( choices=["olmOCR-7B-0725", "Nanonets-OCR-s", "RolmOCR-7B", "Aya-Vision-8B", "Qwen2-VL-OCR-2B"], label="Select Model", value="olmOCR-7B-0725" ) image_submit.click( fn=generate_image, inputs=[model_choice, image_query, image_upload, max_new_tokens, temperature, top_p, top_k, repetition_penalty], outputs=[output, markdown_output] ) video_submit.click( fn=generate_video, inputs=[model_choice, video_query, video_upload, max_new_tokens, temperature, top_p, top_k, repetition_penalty], outputs=[output, markdown_output] ) if __name__ == "__main__": demo.queue(max_size=50).launch(mcp_server=True, ssr_mode=False, show_error=True)