Spaces:
Running
on
Zero
Running
on
Zero
| # Copyright (c) 2025 Ye Liu. Licensed under the BSD-3-Clause License. | |
| import argparse | |
| import copy | |
| import json | |
| from contextlib import nullcontext | |
| import nncore | |
| import torch | |
| from videomind.constants import GROUNDER_PROMPT, PLANNER_PROMPT, VERIFIER_PROMPT | |
| from videomind.dataset.hybrid import DATASETS | |
| from videomind.dataset.utils import process_vision_info | |
| from videomind.model.builder import build_model | |
| from videomind.utils.io import get_duration, load_subtitle | |
| from videomind.utils.parser import parse_query, parse_span | |
| def parse_args(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--dataset') | |
| parser.add_argument('--pred_path') | |
| parser.add_argument('--model_gnd_path') | |
| parser.add_argument('--model_ver_path') | |
| parser.add_argument('--model_pla_path') | |
| parser.add_argument('--model_ans_path') | |
| parser.add_argument('--split', default='test', choices=['train', 'valid', 'test']) | |
| parser.add_argument('--style', default='mcq', choices=['mcq', 'options', 'direct']) | |
| parser.add_argument('--use_subtitle', action='store_true') | |
| parser.add_argument('--auto_rephrasing', action='store_true') | |
| parser.add_argument('--auto_planning', action='store_true') | |
| parser.add_argument('--num_threads', type=int, default=1) | |
| parser.add_argument('--device', default='auto') | |
| parser.add_argument('--chunk', type=int, default=1) | |
| parser.add_argument('--index', type=int, default=0) | |
| args = parser.parse_args() | |
| return args | |
| if __name__ == '__main__': | |
| args = parse_args() | |
| if args.chunk > 1: | |
| pred_path = nncore.join(args.pred_path, f'output_{args.index}.json') | |
| else: | |
| pred_path = nncore.join(args.pred_path, 'output.json') | |
| print(f'Dataset: {args.dataset}({args.split}) Chunk: {args.chunk} Index: {args.index} Output Path: {pred_path}') | |
| # NOTE: | |
| # 1. grounder is always true so no need to store | |
| # 2. answerer would always be used (when set to false, the base model would be used as the answerer) | |
| adapter_state = dict(planner=False, verifier=False, answerer=False) | |
| print('Initializing role *grounder*') | |
| model, processor = build_model(args.model_gnd_path, device=args.device) | |
| device = next(model.parameters()).device | |
| if args.model_pla_path is not None: | |
| adapter_path = nncore.join(args.model_pla_path, 'planner') | |
| if nncore.is_dir(adapter_path): | |
| print('Initializing role *planner*') | |
| model.load_adapter(adapter_path, adapter_name='planner') | |
| adapter_state['planner'] = True | |
| if args.model_ver_path is not None: | |
| adapter_path = nncore.join(args.model_ver_path, 'verifier') | |
| if nncore.is_dir(adapter_path): | |
| print('Initializing role *verifier*') | |
| model.load_adapter(adapter_path, adapter_name='verifier') | |
| adapter_state['verifier'] = True | |
| if args.model_ans_path is not None: | |
| adapter_path = nncore.join(args.model_ans_path, 'answerer') | |
| if nncore.is_dir(adapter_path): | |
| print('Initializing role *answerer*') | |
| model.load_adapter(adapter_path, adapter_name='answerer') | |
| adapter_state['answerer'] = True | |
| annos = DATASETS.get(args.dataset).load_annos(split=args.split) | |
| annos = [annos[i::args.chunk] for i in range(args.chunk)][args.index] | |
| dumps = [] | |
| for i in nncore.ProgressBar(range(len(annos))): | |
| anno = copy.deepcopy(annos[i]) | |
| dump = copy.deepcopy(annos[i]) | |
| video_path, duration, span = anno['video_path'], anno.get('duration'), anno.get('span') | |
| if duration is None: | |
| duration = get_duration(video_path, num_threads=args.num_threads) | |
| dump['duration'] = duration | |
| print() | |
| print(video_path) | |
| print(duration) | |
| # sometimes the sample is for grounding only | |
| do_answering = all(k in anno for k in ('question', 'options')) | |
| if do_answering: | |
| question, options, ans = anno['question'], anno['options'], anno['ans'] | |
| if args.style in ('mcq', 'options'): | |
| prompt = question + '\nOptions:' | |
| for idx, opt in enumerate(options): | |
| prompt += f"\n({chr(ord('A') + idx)}) {opt.capitalize()}" | |
| prompt += '\nPlease only give the best option.' | |
| else: | |
| prompt = question | |
| print(prompt) | |
| print(options) | |
| print(ans) | |
| else: | |
| question = anno['query'] | |
| print(question) | |
| # do grounding by default | |
| do_grounding = True | |
| # initialize grounding query as question | |
| query = question | |
| # initialize agent list | |
| dump['agents'] = [] | |
| if adapter_state['planner'] and (args.auto_rephrasing or args.auto_planning): | |
| print('=============== planner ===============') | |
| dump['agents'].append('planner') | |
| messages = [{ | |
| 'role': | |
| 'user', | |
| 'content': [{ | |
| 'type': 'video', | |
| 'video': video_path, | |
| 'num_threads': args.num_threads, | |
| 'min_pixels': 36 * 28 * 28, | |
| 'max_pixels': 64 * 28 * 28, | |
| 'max_frames': 100, | |
| 'fps': 1.0 | |
| }, { | |
| 'type': 'text', | |
| 'text': PLANNER_PROMPT.format(question) | |
| }] | |
| }] | |
| text = processor.apply_chat_template(messages, add_generation_prompt=True) | |
| print(text) | |
| images, videos = process_vision_info(messages) | |
| data = processor(text=[text], images=images, videos=videos, return_tensors='pt') | |
| data = data.to(device) | |
| model.base_model.disable_adapter_layers() | |
| model.base_model.enable_adapter_layers() | |
| model.set_adapter('planner') | |
| output_ids = model.generate( | |
| **data, | |
| do_sample=False, | |
| temperature=None, | |
| top_p=None, | |
| top_k=None, | |
| repetition_penalty=None, | |
| max_new_tokens=256) | |
| assert data.input_ids.size(0) == output_ids.size(0) == 1 | |
| output_ids = output_ids[0, data.input_ids.size(1):] | |
| if output_ids[-1] == processor.tokenizer.eos_token_id: | |
| output_ids = output_ids[:-1] | |
| response = processor.decode(output_ids, clean_up_tokenization_spaces=False) | |
| print(response) | |
| dump['planner_response'] = response | |
| try: | |
| parsed = json.loads(response) | |
| action = parsed[0] if isinstance(parsed, list) else parsed | |
| if args.auto_rephrasing and action['type'].lower() == 'grounder' and action['value']: | |
| query = action['value'] | |
| dump['planner_parsed_query'] = query | |
| elif args.auto_planning and action['type'].lower() == 'answerer': | |
| do_grounding = False | |
| except Exception: | |
| print('WARNING: Failed to parse planner response') | |
| if do_grounding: | |
| print('=============== grounder ===============') | |
| dump['agents'].append('grounder') | |
| query = parse_query(query) | |
| messages = [{ | |
| 'role': | |
| 'user', | |
| 'content': [{ | |
| 'type': 'video', | |
| 'video': video_path, | |
| 'num_threads': args.num_threads, | |
| 'min_pixels': 36 * 28 * 28, | |
| 'max_pixels': 64 * 28 * 28, | |
| 'max_frames': 150, | |
| 'fps': 1.0 | |
| }, { | |
| 'type': 'text', | |
| 'text': GROUNDER_PROMPT.format(query) | |
| }] | |
| }] | |
| text = processor.apply_chat_template(messages, add_generation_prompt=True) | |
| print(text) | |
| images, videos = process_vision_info(messages) | |
| data = processor(text=[text], images=images, videos=videos, return_tensors='pt') | |
| data = data.to(device) | |
| model.base_model.disable_adapter_layers() | |
| model.base_model.enable_adapter_layers() | |
| model.set_adapter('grounder') | |
| output_ids = model.generate( | |
| **data, | |
| do_sample=False, | |
| temperature=None, | |
| top_p=None, | |
| top_k=None, | |
| repetition_penalty=None, | |
| max_new_tokens=256) | |
| assert data.input_ids.size(0) == output_ids.size(0) == 1 | |
| output_ids = output_ids[0, data.input_ids.size(1):] | |
| if output_ids[-1] == processor.tokenizer.eos_token_id: | |
| output_ids = output_ids[:-1] | |
| response = processor.decode(output_ids, clean_up_tokenization_spaces=False) | |
| print(response) | |
| dump['grounder_response'] = response | |
| dump['grounder_success'] = len(model.reg) > 0 | |
| if dump['grounder_success']: | |
| # 1. extract timestamps and confidences | |
| blob = model.reg[0].cpu().float() | |
| pred, conf = blob[:, :2] * duration, blob[:, -1].tolist() | |
| # 2. clamp timestamps | |
| pred = pred.clamp(min=0, max=duration) | |
| # 3. round timestamps to units | |
| unit = getattr(DATASETS.get(args.dataset), 'UNIT', 0.001) | |
| pred = torch.round(pred / unit).long() * unit | |
| # 4. sort timestamps | |
| inds = (pred[:, 1] - pred[:, 0] < 0).nonzero()[:, 0] | |
| pred[inds] = pred[inds].roll(1) | |
| # 5. convert timestamps to list | |
| pred = pred.tolist() | |
| else: | |
| print('WARNING: Failed to parse grounder response') | |
| if adapter_state['verifier']: | |
| pred = [[i * duration / 6, (i + 2) * duration / 6] for i in range(5)] | |
| conf = [0] * 5 | |
| else: | |
| pred = [[0, duration]] | |
| conf = [0] | |
| print(pred[0], span, duration) | |
| dump['pred'] = pred | |
| dump['conf'] = conf | |
| if do_grounding and adapter_state['verifier'] and len(pred) > 1: | |
| print('=============== verifier ===============') | |
| dump['agents'].append('verifier') | |
| # using top-5 predictions | |
| probs = [] | |
| for cand in pred[:5]: | |
| s0, e0 = parse_span(cand, duration, 2) | |
| offset = (e0 - s0) / 2 | |
| s1, e1 = parse_span([s0 - offset, e0 + offset], duration) | |
| # percentage of s0, e0 within s1, e1 | |
| s = (s0 - s1) / (e1 - s1) | |
| e = (e0 - s1) / (e1 - s1) | |
| messages = [{ | |
| 'role': | |
| 'user', | |
| 'content': [{ | |
| 'type': 'video', | |
| 'video': video_path, | |
| 'num_threads': args.num_threads, | |
| 'video_start': s1, | |
| 'video_end': e1, | |
| 'min_pixels': 36 * 28 * 28, | |
| 'max_pixels': 64 * 28 * 28, | |
| 'max_frames': 64, | |
| 'fps': 2.0 | |
| }, { | |
| 'type': 'text', | |
| 'text': VERIFIER_PROMPT.format(question) | |
| }] | |
| }] | |
| text = processor.apply_chat_template(messages, add_generation_prompt=True) | |
| print(text) | |
| images, videos = process_vision_info(messages) | |
| data = processor(text=[text], images=images, videos=videos, return_tensors='pt') | |
| # ===== insert segment start/end tokens ===== | |
| video_grid_thw = data['video_grid_thw'][0] | |
| num_frames, window = int(video_grid_thw[0]), int(video_grid_thw[1] * video_grid_thw[2] / 4) | |
| assert num_frames * window * 4 == data['pixel_values_videos'].size(0) | |
| pos_s, pos_e = round(s * num_frames), round(e * num_frames) | |
| pos_s, pos_e = min(max(0, pos_s), num_frames), min(max(0, pos_e), num_frames) | |
| assert pos_s <= pos_e, (num_frames, s, e) | |
| base_idx = torch.nonzero(data['input_ids'][0] == model.config.vision_start_token_id).item() | |
| pos_s, pos_e = pos_s * window + base_idx + 1, pos_e * window + base_idx + 2 | |
| input_ids = data['input_ids'][0].tolist() | |
| input_ids.insert(pos_s, model.config.seg_s_token_id) | |
| input_ids.insert(pos_e, model.config.seg_e_token_id) | |
| data['input_ids'] = torch.LongTensor([input_ids]) | |
| data['attention_mask'] = torch.ones_like(data['input_ids']) | |
| # =========================================== | |
| data = data.to(device) | |
| model.base_model.disable_adapter_layers() | |
| model.base_model.enable_adapter_layers() | |
| model.set_adapter('verifier') | |
| with torch.inference_mode(): | |
| logits = model(**data).logits[0, -1].softmax(dim=-1) | |
| # NOTE: magic numbers here | |
| # In Qwen2-VL vocab: 9454 -> Yes, 2753 -> No | |
| score = (logits[9454] - logits[2753]).sigmoid().item() | |
| probs.append(score) | |
| ranks = torch.Tensor(probs).argsort(descending=True).tolist() | |
| print(probs) | |
| print(ranks) | |
| pred = [pred[idx] for idx in ranks] | |
| conf = [conf[idx] for idx in ranks] | |
| print(pred[0], span, duration) | |
| dump['probs'] = probs | |
| dump['ranks'] = ranks | |
| dump['pred_ori'] = dump['pred'] | |
| dump['conf_ori'] = dump['conf'] | |
| dump['pred'] = pred | |
| dump['conf'] = conf | |
| if do_answering: | |
| print('=============== answerer ===============') | |
| dump['agents'].append('answerer') | |
| # choose the potential best moment | |
| selected = pred[0] if 'pred' in dump else [0, duration] | |
| min_len = getattr(DATASETS.get(args.dataset), 'MIN_LEN', 32) | |
| s, e = parse_span(selected, duration, min_len) | |
| print([s, e], span, duration) | |
| if args.use_subtitle and 'subtitle_path' in anno and nncore.is_file(anno['subtitle_path']): | |
| # use only the first 100 subtitles to save memory | |
| subs = load_subtitle(anno['subtitle_path'])[:100] | |
| subs = [f'{round(a - s, 1)}s - {round(b - s, 1)}s, {t}\n' for a, b, t in subs if a >= s and b <= e] | |
| subs = ''.join(subs) | |
| prompt = f'You are given a video with {round(e - s, 1)} seconds long.\nSubtitles:\n{subs}' + prompt | |
| messages = [{ | |
| 'role': | |
| 'user', | |
| 'content': [{ | |
| 'type': 'video', | |
| 'video': video_path, | |
| 'num_threads': args.num_threads, | |
| 'video_start': s, | |
| 'video_end': e, | |
| 'min_pixels': 128 * 28 * 28, | |
| 'max_pixels': 256 * 28 * 28, | |
| 'max_frames': 32, | |
| 'fps': 2.0 | |
| }, { | |
| 'type': 'text', | |
| 'text': prompt | |
| }] | |
| }] | |
| text = processor.apply_chat_template(messages, add_generation_prompt=True) | |
| text += 'Best Option: (' if args.style == 'mcq' else '' | |
| print(text) | |
| images, videos = process_vision_info(messages) | |
| data = processor(text=[text], images=images, videos=videos, return_tensors='pt') | |
| data = data.to(device) | |
| if adapter_state['answerer']: | |
| model.base_model.disable_adapter_layers() | |
| model.base_model.enable_adapter_layers() | |
| model.set_adapter('answerer') | |
| context = nullcontext | |
| else: | |
| context = model.disable_adapter | |
| with context(): | |
| output_ids = model.generate( | |
| **data, | |
| do_sample=False, | |
| temperature=None, | |
| top_p=None, | |
| top_k=None, | |
| repetition_penalty=None, | |
| max_new_tokens=256) | |
| assert data.input_ids.size(0) == output_ids.size(0) == 1 | |
| output_ids = output_ids[0, data.input_ids.size(1):] | |
| if output_ids[-1] == processor.tokenizer.eos_token_id: | |
| output_ids = output_ids[:-1] | |
| response = processor.decode(output_ids, clean_up_tokenization_spaces=False) | |
| print(response) | |
| dump['answerer_response'] = response | |
| dump['response'] = response | |
| dumps.append(dump) | |
| nncore.dump(dumps, pred_path) | |