|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
|
from PIL import Image |
|
|
import torch |
|
|
|
|
|
|
|
|
from ..managers.ltx_manager import ltx_manager_singleton |
|
|
|
|
|
|
|
|
from ltx_video.utils.prompt_enhance_utils import I2V_CINEMATIC_PROMPT |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class Deformes3DThinker: |
|
|
""" |
|
|
The tactical specialist that now directly implements the prompt enhancement |
|
|
logic, using the models provided by the LTX pipeline. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
|
|
|
pipeline = ltx_manager_singleton.prompt_enhancement_pipeline |
|
|
if not pipeline: |
|
|
raise RuntimeError("Deformes3DThinker could not access the LTX pipeline.") |
|
|
|
|
|
|
|
|
self.caption_model = pipeline.prompt_enhancer_image_caption_model |
|
|
self.caption_processor = pipeline.prompt_enhancer_image_caption_processor |
|
|
self.llm_model = pipeline.prompt_enhancer_llm_model |
|
|
self.llm_tokenizer = pipeline.prompt_enhancer_llm_tokenizer |
|
|
|
|
|
|
|
|
if not all([self.caption_model, self.caption_processor, self.llm_model, self.llm_tokenizer]): |
|
|
logger.warning("Deformes3DThinker initialized, but one or more enhancement models were not loaded by the LTX pipeline. Fallback will be used.") |
|
|
else: |
|
|
logger.info("Deformes3DThinker initialized and successfully linked to LTX enhancement models.") |
|
|
|
|
|
@torch.no_grad() |
|
|
def get_enhanced_motion_prompt(self, global_prompt: str, story_history: str, |
|
|
past_keyframe_path: str, present_keyframe_path: str, future_keyframe_path: str, |
|
|
past_scene_desc: str, present_scene_desc: str, future_scene_desc: str) -> str: |
|
|
""" |
|
|
Generates a refined motion prompt by directly executing the enhancement pipeline logic. |
|
|
""" |
|
|
|
|
|
if not all([self.caption_model, self.caption_processor, self.llm_model, self.llm_tokenizer]): |
|
|
logger.warning("Enhancement models not available. Using fallback prompt.") |
|
|
return f"A cinematic transition from '{present_scene_desc}' to '{future_scene_desc}'." |
|
|
|
|
|
try: |
|
|
present_image = Image.open(present_keyframe_path).convert("RGB") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
image_captions = self._generate_image_captions([present_image]) |
|
|
|
|
|
|
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": I2V_CINEMATIC_PROMPT}, |
|
|
{"role": "user", "content": f"user_prompt: {future_scene_desc}\nimage_caption: {image_captions[0]}"}, |
|
|
] |
|
|
|
|
|
|
|
|
enhanced_prompt = self._generate_and_decode_prompts(messages) |
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"Deformes3DThinker received enhanced prompt: '{enhanced_prompt}'") |
|
|
return enhanced_prompt |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"The Film Director (Deformes3D Thinker) failed during enhancement: {e}. Using fallback.", exc_info=True) |
|
|
return f"A smooth, continuous cinematic transition from '{present_scene_desc}' to '{future_scene_desc}'." |
|
|
|
|
|
def _generate_image_captions(self, images: list[Image.Image]) -> list[str]: |
|
|
""" |
|
|
Lógica interna para gerar captions, copiada do LTX utils. |
|
|
""" |
|
|
|
|
|
task_prompt = "<MORE_DETAILED_CAPTION>" |
|
|
inputs = self.caption_processor( |
|
|
text=[task_prompt] * len(images), images=images, return_tensors="pt" |
|
|
).to(self.caption_model.device) |
|
|
|
|
|
generated_ids = self.caption_model.generate( |
|
|
input_ids=inputs["input_ids"], |
|
|
pixel_values=inputs["pixel_values"], |
|
|
max_new_tokens=1024, |
|
|
num_beams=3, |
|
|
) |
|
|
|
|
|
|
|
|
generated_text = self.caption_processor.batch_decode(generated_ids, skip_special_tokens=False)[0] |
|
|
processed_result = self.caption_processor.post_process_generation( |
|
|
generated_text, |
|
|
task=task_prompt, |
|
|
image_size=(images[0].width, images[0].height) |
|
|
) |
|
|
return [processed_result[task_prompt]] |
|
|
|
|
|
def _generate_and_decode_prompts(self, messages: list[dict]) -> str: |
|
|
""" |
|
|
Lógica interna para gerar prompt com o LLM, copiada do LTX utils. |
|
|
""" |
|
|
text = self.llm_tokenizer.apply_chat_template( |
|
|
messages, tokenize=False, add_generation_prompt=True |
|
|
) |
|
|
model_inputs = self.llm_tokenizer([text], return_tensors="pt").to(self.llm_model.device) |
|
|
|
|
|
output_ids = self.llm_model.generate(**model_inputs, max_new_tokens=256) |
|
|
|
|
|
input_ids_len = model_inputs.input_ids.shape[1] |
|
|
decoded_prompts = self.llm_tokenizer.batch_decode( |
|
|
output_ids[:, input_ids_len:], skip_special_tokens=True |
|
|
) |
|
|
return decoded_prompts[0].strip() |
|
|
|
|
|
|
|
|
try: |
|
|
deformes3d_thinker_singleton = Deformes3DThinker() |
|
|
except Exception as e: |
|
|
|
|
|
deformes3d_thinker_singleton = None |
|
|
raise e |