# Copyright (c) 2025 Ye Liu. Licensed under the BSD-3-Clause License. import random import re import decord import nncore import numpy as np import pysrt import torch from decord import VideoReader from PIL import Image def load_image(path): image = Image.open(path).convert('RGB') image = torch.from_numpy(np.array(image)).unsqueeze(0) return image def load_video(path, sample_frames=-1): frame_mode = nncore.is_dir(path) if frame_mode: paths = nncore.ls(path, ext=('jpg', 'png'), join_path=True) paths.sort(key=lambda p: int(re.sub(r'^\D*', '', nncore.pure_name(p)))) vlen = len(paths) else: decord.bridge.set_bridge('torch') vr = VideoReader(path, num_threads=1) vlen = len(vr) if sample_frames > 0 and vlen > sample_frames: inds = np.arange(0, vlen, (vlen - 1) / (sample_frames - 1))[:sample_frames].round().astype(int).tolist() assert len(inds) == sample_frames else: inds = list(range(vlen)) if frame_mode: images = [paths[i] for i in inds] frames = torch.cat([load_image(i) for i in images]) else: frames = vr.get_batch(inds) images = [Image.fromarray(t.numpy()) for t in frames] return frames, images def load_frames(paths, sample_frames=-1, sample_type='uniform', sample_for_llm_only=False): assert sample_type in ('uniform', 'random') vlen = len(paths) if isinstance(sample_frames, str): sep = [int(n) for n in sample_frames.split(',')] assert len(sep) in (1, 2) sample_frames = int(random.randint(*sep)) if len(sep) > 1 else int(sep[0]) # NOTE: some videos and images are shorter than sample_frames if sample_frames > 0 and vlen > sample_frames: if sample_type == 'uniform': inds = np.arange(0, vlen, (vlen - 1) / (sample_frames - 1))[:sample_frames].round().astype(int).tolist() else: seps = np.arange(0, vlen, (vlen - 1) / sample_frames)[:sample_frames + 1].round().astype(int).tolist() inds = [random.choice(range(sep, max(sep + 1, seps[i + 1]))) for i, sep in enumerate(seps[:-1])] assert len(inds) == sample_frames else: inds = list(range(len(paths))) if sample_for_llm_only: frames = torch.cat([load_image(p) for p in paths]) else: frames = torch.cat([load_image(paths[i]) for i in inds]) paths = [paths[i] for i in inds] return frames, paths, inds def load_frames_with_inds(path, keep, single_frame_mode=False, sample_frames=-1, sample_type='uniform', sample_for_llm_only=False, num_threads=0): assert sample_type in ('uniform', 'random') frame_mode = nncore.is_dir(path) if frame_mode: paths = nncore.ls(path, ext='jpg', join_path=True) paths.sort(key=lambda p: int(re.sub(r'^\D*', '', nncore.pure_name(p)))) else: decord.bridge.set_bridge('torch') vr = VideoReader(path, num_threads=num_threads) if single_frame_mode: vlen = len(paths) if frame_mode else len(vr) assert vlen > 1 and len(keep) == 1 imap = list(range(vlen)) else: vlen = len(keep) imap = keep if isinstance(sample_frames, str): sep = [int(n) for n in sample_frames.split(',')] assert len(sep) in (1, 2) sample_frames = int(random.randint(*sep)) if len(sep) > 1 else int(sep[0]) # some videos and images are shorter than sample_frames if sample_frames > 0 and vlen > sample_frames: if sample_type == 'uniform': inds = np.arange(0, vlen, (vlen - 1) / (sample_frames - 1))[:sample_frames].round().astype(int).tolist() else: seps = np.arange(0, vlen, (vlen - 1) / sample_frames)[:sample_frames + 1].round().astype(int).tolist() inds = [random.choice(range(sep, max(sep + 1, seps[i + 1]))) for i, sep in enumerate(seps[:-1])] if single_frame_mode: # ensure that keep is in the sampled indices dist = [abs(keep[0] - i) for i in inds] inds[dist.index(min(dist))] = keep[0] assert len(inds) == sample_frames else: inds = list(range(vlen)) if frame_mode: images = [paths[imap[i]] for i in inds] else: img_tensor = vr.get_batch([imap[i] for i in inds]) images = [Image.fromarray(t.numpy()) for t in img_tensor] if single_frame_mode: frames = load_image(paths[keep[0]]) if frame_mode else vr.get_batch(keep) elif sample_for_llm_only: frames = torch.cat([load_image(p) for p in paths]) if frame_mode else vr.get_batch(imap) else: frames = torch.cat([load_image(paths[imap[i]]) for i in inds]) if frame_mode else img_tensor.clone() return frames, images, inds def load_frames_with_inds_keep(path, all_frame_inds, frame_idx, sample_frames=-1, sample_type='uniform', sample_for_llm_only=False, num_threads=0): assert sample_type in ('uniform', 'random') frame_mode = nncore.is_dir(path) if frame_mode: paths = nncore.ls(path, ext='jpg', join_path=True) paths.sort(key=lambda p: int(re.sub(r'^\D*', '', nncore.pure_name(p)))) else: decord.bridge.set_bridge('torch') vr = VideoReader(path, num_threads=num_threads) vlen = len(all_frame_inds) imap = all_frame_inds if isinstance(sample_frames, str): sep = [int(n) for n in sample_frames.split(',')] assert len(sep) in (1, 2) sample_frames = int(random.randint(*sep)) if len(sep) > 1 else int(sep[0]) # some videos and images are shorter than sample_frames if sample_frames > 0 and vlen > sample_frames: if sample_type == 'uniform': inds = np.arange(0, vlen, (vlen - 1) / (sample_frames - 1))[:sample_frames].round().astype(int).tolist() else: seps = np.arange(0, vlen, (vlen - 1) / sample_frames)[:sample_frames + 1].round().astype(int).tolist() inds = [random.choice(range(sep, max(sep + 1, seps[i + 1]))) for i, sep in enumerate(seps[:-1])] # ensure that keep is in the sampled indices keep = all_frame_inds.index(frame_idx) dist = [abs(keep - i) for i in inds] inds[dist.index(min(dist))] = keep assert len(inds) == sample_frames else: inds = list(range(vlen)) if frame_mode: images = [paths[imap[i]] for i in inds] else: img_tensor = vr.get_batch([imap[i] for i in inds]) images = [Image.fromarray(t.numpy()) for t in img_tensor] if sample_for_llm_only: frames = torch.cat([load_image(p) for p in paths]) if frame_mode else vr.get_batch(imap) else: frames = torch.cat([load_image(paths[imap[i]]) for i in inds]) if frame_mode else img_tensor.clone() return frames, images, inds def load_frames_with_stride(path, every_n_frames=4, sample_frames=-1, sample_type='uniform', sample_for_llm_only=False, num_threads=0): assert sample_type in ('uniform', 'random') decord.bridge.set_bridge('torch') vr = VideoReader(path, num_threads=num_threads) keep = list(range(0, len(vr), every_n_frames)) vlen = len(keep) if isinstance(sample_frames, str): sep = [int(n) for n in sample_frames.split(',')] assert len(sep) in (1, 2) sample_frames = int(random.randint(*sep)) if len(sep) > 1 else int(sep[0]) # some videos and images are shorter than sample_frames if sample_frames > 0 and vlen > sample_frames: if sample_type == 'uniform': inds = np.arange(0, vlen, (vlen - 1) / (sample_frames - 1))[:sample_frames].round().astype(int).tolist() else: seps = np.arange(0, vlen, (vlen - 1) / sample_frames)[:sample_frames + 1].round().astype(int).tolist() inds = [random.choice(range(sep, max(sep + 1, seps[i + 1]))) for i, sep in enumerate(seps[:-1])] assert len(inds) == sample_frames else: inds = list(range(vlen)) img_tensor = vr.get_batch([keep[i] for i in inds]) images = [Image.fromarray(t.numpy()) for t in img_tensor] frames = vr.get_batch(keep) if sample_for_llm_only else img_tensor.clone() return frames, images, inds def load_subtitle(path): subs = pysrt.open(path) parsed = [] for sub in subs: s, e = sub.start.to_time(), sub.end.to_time() s = (s.hour * 60 + s.minute) * 60 + s.second + s.microsecond / 1000000 e = (e.hour * 60 + e.minute) * 60 + e.second + e.microsecond / 1000000 parsed.append((s, e, sub.text)) return parsed def get_duration(path, num_threads=1): # sometimes the video is loaded as a list of frames if isinstance(path, list): return len(path) vr = VideoReader(path, num_threads=num_threads) duration = len(vr) / vr.get_avg_fps() return duration