|
|
|
|
|
import os |
|
|
import tempfile |
|
|
import subprocess |
|
|
import sqlite3 |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import Optional, Dict, Any, List |
|
|
import logging |
|
|
import wave |
|
|
import shutil |
|
|
|
|
|
logger = logging.getLogger("videogenerator") |
|
|
|
|
|
try: |
|
|
import torch |
|
|
TORCH_AVAILABLE = True |
|
|
except Exception: |
|
|
TORCH_AVAILABLE = False |
|
|
torch = None |
|
|
|
|
|
try: |
|
|
from PIL import Image |
|
|
PIL_AVAILABLE = True |
|
|
except Exception: |
|
|
PIL_AVAILABLE = False |
|
|
|
|
|
try: |
|
|
import ffmpeg |
|
|
FFMPEG_AVAILABLE = True |
|
|
except Exception: |
|
|
FFMPEG_AVAILABLE = False |
|
|
|
|
|
try: |
|
|
from TTS.api import TTS |
|
|
TTS_AVAILABLE = True |
|
|
except Exception: |
|
|
TTS_AVAILABLE = False |
|
|
|
|
|
|
|
|
class VideoGenerator: |
|
|
""" |
|
|
Offline text-to-video generator with local enhancement (ESRGAN + RIFE). |
|
|
No external API or internet required. |
|
|
IMPORTANT: If you want the returned URLs to be reachable by your frontend, |
|
|
configure your web server or app to serve the `workdir` directory under the |
|
|
public URL prefix defined by the environment variable VIDEO_PUBLIC_URL |
|
|
(default: /static/video_sandbox). Example (FastAPI): |
|
|
app.mount("/static/video_sandbox", StaticFiles(directory="/tmp/video_sandbox"), name="videos") |
|
|
""" |
|
|
|
|
|
def __init__(self, workdir="video_sandbox", db_name="history.db"): |
|
|
|
|
|
base_dir = Path(os.getenv("VIDEO_SANDBOX_DIR", "/tmp/video_sandbox")) |
|
|
|
|
|
self.workdir = base_dir.resolve() |
|
|
self.output_dir = self.workdir / "output" |
|
|
self.frames_dir = self.workdir / "frames" |
|
|
self.audio_dir = self.workdir / "audio" |
|
|
self.db_path = self.workdir / db_name |
|
|
|
|
|
|
|
|
self.public_base_url = os.getenv("VIDEO_PUBLIC_URL", "/static/video_sandbox").rstrip("/") |
|
|
|
|
|
|
|
|
for d in [self.workdir, self.output_dir, self.frames_dir, self.audio_dir]: |
|
|
d.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
self._init_db() |
|
|
self.device = "cuda" if (TORCH_AVAILABLE and torch and torch.cuda.is_available()) else "cpu" |
|
|
logger.info(f"VideoGenerator initialized with device: {self.device}, public_base_url: {self.public_base_url}") |
|
|
|
|
|
|
|
|
|
|
|
def _init_db(self): |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute( |
|
|
"""CREATE TABLE IF NOT EXISTS history ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
prompt TEXT, |
|
|
image_path TEXT, |
|
|
audio_path TEXT, |
|
|
output_path TEXT, |
|
|
enhanced_path TEXT, |
|
|
created_at TEXT |
|
|
)""" |
|
|
) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
def _save_history(self, prompt, image_path, audio_path, output_path, enhanced_path=None): |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute( |
|
|
"INSERT INTO history (prompt, image_path, audio_path, output_path, enhanced_path, created_at) VALUES (?, ?, ?, ?, ?, ?)", |
|
|
(prompt, image_path, audio_path, output_path, enhanced_path, datetime.now().isoformat()), |
|
|
) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
|
|
|
def _generate_frames_from_text(self, prompt, num_frames=16, resolution=(512, 512)): |
|
|
""" |
|
|
Generate creative video frames with realistic scenes based on prompt. |
|
|
Simulates beings (humans, animals) and objects with motion and detail. |
|
|
""" |
|
|
frames = [] |
|
|
from PIL import ImageDraw, ImageFont |
|
|
import math |
|
|
import random |
|
|
|
|
|
|
|
|
prompt_lower = prompt.lower() |
|
|
|
|
|
|
|
|
has_human = any(word in prompt_lower for word in ["person", "man", "woman", "human", "people", "walking", "running"]) |
|
|
has_animal = any(word in prompt_lower for word in ["dog", "cat", "bird", "animal", "flying", "swimming"]) |
|
|
has_nature = any(word in prompt_lower for word in ["tree", "forest", "mountain", "sky", "cloud", "sunset", "sunrise"]) |
|
|
has_city = any(word in prompt_lower for word in ["city", "building", "car", "street", "urban", "skyline"]) |
|
|
|
|
|
|
|
|
scene_type = "abstract" |
|
|
if has_human: |
|
|
scene_type = "human" |
|
|
elif has_animal: |
|
|
scene_type = "animal" |
|
|
elif has_nature: |
|
|
scene_type = "nature" |
|
|
elif has_city: |
|
|
scene_type = "city" |
|
|
|
|
|
for i in range(num_frames): |
|
|
img = Image.new("RGB", resolution, (0, 0, 0)) |
|
|
draw = ImageDraw.Draw(img) |
|
|
progress = i / max(1, num_frames - 1) |
|
|
|
|
|
|
|
|
if scene_type == "nature": |
|
|
|
|
|
for y in range(resolution[1]): |
|
|
sky_progress = y / resolution[1] |
|
|
r = int(135 + 50 * sky_progress) |
|
|
g = int(206 - 50 * sky_progress) |
|
|
b = int(235 - 30 * sky_progress) |
|
|
draw.line([(0, y), (resolution[0], y)], fill=(r, g, b)) |
|
|
|
|
|
|
|
|
ground_y = int(resolution[1] * 0.7) |
|
|
draw.rectangle([0, ground_y, resolution[0], resolution[1]], fill=(34, 139, 34)) |
|
|
|
|
|
|
|
|
for tree_x in range(50, resolution[0], 100): |
|
|
trunk_x = tree_x + int(20 * math.sin(progress * 2 * math.pi)) |
|
|
draw.rectangle([trunk_x, ground_y-60, trunk_x+20, ground_y], fill=(101, 67, 33)) |
|
|
draw.ellipse([trunk_x-30, ground_y-100, trunk_x+50, ground_y-40], fill=(0, 128, 0)) |
|
|
|
|
|
|
|
|
for cloud_x in range(100, resolution[0], 150): |
|
|
cloud_offset = int(progress * 50) |
|
|
cx = (cloud_x + cloud_offset) % resolution[0] |
|
|
cy = 80 + int(10 * math.sin(progress * math.pi)) |
|
|
draw.ellipse([cx-40, cy-20, cx+40, cy+20], fill=(255, 255, 255)) |
|
|
draw.ellipse([cx-20, cy-15, cx+60, cy+25], fill=(255, 255, 255)) |
|
|
|
|
|
elif scene_type == "city": |
|
|
|
|
|
for y in range(resolution[1] // 2): |
|
|
sky_val = int(100 + 100 * (y / (resolution[1] // 2))) |
|
|
draw.line([(0, y), (resolution[0], y)], fill=(sky_val, sky_val, sky_val + 50)) |
|
|
|
|
|
|
|
|
for bldg_x in range(0, resolution[0], 80): |
|
|
height = random.randint(150, 300) |
|
|
y_start = resolution[1] - height |
|
|
draw.rectangle([bldg_x, y_start, bldg_x+70, resolution[1]], |
|
|
fill=(random.randint(100, 150), random.randint(100, 150), random.randint(100, 150))) |
|
|
for win_y in range(y_start + 20, resolution[1], 30): |
|
|
for win_x in range(bldg_x + 10, bldg_x + 60, 20): |
|
|
light = random.choice([True, False]) |
|
|
color = (255, 255, 200) if light else (50, 50, 50) |
|
|
draw.rectangle([win_x, win_y, win_x+10, win_y+15], fill=color) |
|
|
|
|
|
|
|
|
car_x = int(progress * resolution[0]) |
|
|
car_y = resolution[1] - 40 |
|
|
draw.rectangle([car_x, car_y, car_x+60, car_y+25], fill=(255, 0, 0)) |
|
|
draw.ellipse([car_x+10, car_y+20, car_x+25, car_y+35], fill=(0, 0, 0)) |
|
|
draw.ellipse([car_x+45, car_y+20, car_x+60, car_y+35], fill=(0, 0, 0)) |
|
|
|
|
|
elif scene_type == "human": |
|
|
draw.rectangle([0, 0, resolution[0], resolution[1]], fill=(200, 220, 255)) |
|
|
ground_y = int(resolution[1] * 0.75) |
|
|
draw.rectangle([0, ground_y, resolution[0], resolution[1]], fill=(150, 150, 150)) |
|
|
|
|
|
person_x = int(100 + progress * (resolution[0] - 200)) |
|
|
person_y = ground_y - 100 |
|
|
leg_offset = int(20 * math.sin(progress * 10)) |
|
|
|
|
|
draw.ellipse([person_x+15, person_y, person_x+45, person_y+30], fill=(255, 220, 177)) |
|
|
draw.rectangle([person_x+20, person_y+30, person_x+40, person_y+70], fill=(0, 0, 255)) |
|
|
draw.line([(person_x+20, person_y+40), (person_x+5, person_y+60)], fill=(255, 220, 177), width=5) |
|
|
draw.line([(person_x+40, person_y+40), (person_x+55, person_y+60)], fill=(255, 220, 177), width=5) |
|
|
draw.line([(person_x+25, person_y+70), (person_x+20+leg_offset, ground_y)], fill=(0, 0, 139), width=5) |
|
|
draw.line([(person_x+35, person_y+70), (person_x+40-leg_offset, ground_y)], fill=(0, 0, 139), width=5) |
|
|
|
|
|
elif scene_type == "animal": |
|
|
for y in range(resolution[1]): |
|
|
val = int(180 + 50 * (y / resolution[1])) |
|
|
draw.line([(0, y), (resolution[0], y)], fill=(val, val-20, val-40)) |
|
|
|
|
|
ground_y = int(resolution[1] * 0.8) |
|
|
draw.rectangle([0, ground_y, resolution[0], resolution[1]], fill=(139, 90, 43)) |
|
|
|
|
|
animal_x = int(50 + progress * (resolution[0] - 150)) |
|
|
animal_y = ground_y - 60 |
|
|
|
|
|
draw.ellipse([animal_x, animal_y, animal_x+80, animal_y+40], fill=(139, 69, 19)) |
|
|
draw.ellipse([animal_x+60, animal_y-20, animal_x+100, animal_y+20], fill=(139, 69, 19)) |
|
|
draw.polygon([(animal_x+65, animal_y-20), (animal_x+70, animal_y-35), (animal_x+75, animal_y-20)], fill=(101, 67, 33)) |
|
|
draw.polygon([(animal_x+85, animal_y-20), (animal_x+90, animal_y-35), (animal_x+95, animal_y-20)], fill=(101, 67, 33)) |
|
|
|
|
|
leg_anim = int(5 * math.sin(progress * 15)) |
|
|
for leg_x in [animal_x+10, animal_x+30, animal_x+50, animal_x+70]: |
|
|
draw.rectangle([leg_x, animal_y+40, leg_x+8, ground_y+leg_anim], fill=(101, 67, 33)) |
|
|
|
|
|
tail_angle = 20 * math.sin(progress * 10) |
|
|
tail_end_x = animal_x - 20 + int(tail_angle) |
|
|
tail_end_y = animal_y + 10 |
|
|
draw.line([(animal_x, animal_y+20), (tail_end_x, tail_end_y)], fill=(101, 67, 33), width=5) |
|
|
|
|
|
else: |
|
|
for y in range(resolution[1]): |
|
|
color_val = int(y / resolution[1] * 255) |
|
|
r = int(50 + 100 * progress + color_val // 3) |
|
|
g = int(100 + 80 * progress + color_val // 3) |
|
|
b = int(150 + 50 * progress + color_val // 3) |
|
|
draw.line([(0, y), (resolution[0], y)], fill=(min(255, r), min(255, g), min(255, b))) |
|
|
|
|
|
for j in range(5): |
|
|
x = int((j * 100 + progress * 200) % resolution[0]) |
|
|
y = int(resolution[1] // 2 + 50 * math.sin(progress * 2 * math.pi + j)) |
|
|
radius = 20 + int(10 * math.sin(progress * math.pi + j)) |
|
|
color = ( |
|
|
int(255 * abs(math.sin(progress * math.pi + j))), |
|
|
int(255 * abs(math.cos(progress * math.pi + j))), |
|
|
int(255 * abs(math.sin(progress * 2 * math.pi + j))) |
|
|
) |
|
|
draw.ellipse([x-radius, y-radius, x+radius, y+radius], fill=color) |
|
|
|
|
|
try: |
|
|
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 32) |
|
|
except: |
|
|
font = ImageFont.load_default() |
|
|
|
|
|
text = prompt[:40] if len(prompt) <= 40 else prompt[:37] + "..." |
|
|
bbox = draw.textbbox((0, 0), text, font=font) |
|
|
text_width = bbox[2] - bbox[0] |
|
|
x = (resolution[0] - text_width) // 2 |
|
|
y = 20 |
|
|
|
|
|
for offset in [(-1,-1), (-1,1), (1,-1), (1,1)]: |
|
|
draw.text((x+offset[0], y+offset[1]), text, font=font, fill=(0, 0, 0)) |
|
|
draw.text((x, y), text, font=font, fill=(255, 255, 255)) |
|
|
|
|
|
frames.append(img) |
|
|
|
|
|
return frames |
|
|
|
|
|
def _combine_frames_to_video(self, frames, out_path, fps=8): |
|
|
if not FFMPEG_AVAILABLE: |
|
|
raise RuntimeError("ffmpeg-python not available") |
|
|
|
|
|
|
|
|
tmp_dir = tempfile.mkdtemp(dir=str(self.frames_dir)) |
|
|
try: |
|
|
for i, frame in enumerate(frames): |
|
|
frame_path = os.path.join(tmp_dir, f"frame_{i:03d}.png") |
|
|
frame.save(frame_path) |
|
|
|
|
|
( |
|
|
ffmpeg |
|
|
.input(os.path.join(tmp_dir, "frame_%03d.png"), framerate=fps) |
|
|
.output(out_path, vcodec='libx264', pix_fmt='yuv420p') |
|
|
.overwrite_output() |
|
|
.run(quiet=True, capture_stdout=True, capture_stderr=True) |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to combine frames: {e}") |
|
|
raise |
|
|
finally: |
|
|
|
|
|
try: |
|
|
shutil.rmtree(tmp_dir, ignore_errors=True) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
return out_path |
|
|
|
|
|
def _synthesize_audio(self, text, out_path): |
|
|
"""Synthesize audio with better quality and error handling""" |
|
|
if not TTS_AVAILABLE: |
|
|
logger.warning("TTS not available, creating audio with beeps") |
|
|
duration = max(2.0, len(text.split()) * 0.5) |
|
|
|
|
|
|
|
|
if shutil.which("ffmpeg"): |
|
|
try: |
|
|
subprocess.run([ |
|
|
"ffmpeg", "-f", "lavfi", |
|
|
"-i", f"sine=frequency=440:duration={duration}", |
|
|
"-ar", "22050", |
|
|
"-y", out_path |
|
|
], capture_output=True, check=True, timeout=30) |
|
|
logger.info(f"Created tone audio at {out_path}") |
|
|
return out_path |
|
|
except Exception as e: |
|
|
logger.warning(f"ffmpeg tone generation failed: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
sample_rate = 22050 |
|
|
n_channels = 1 |
|
|
sampwidth = 2 |
|
|
n_frames = int(duration * sample_rate) |
|
|
with wave.open(out_path, "wb") as wf: |
|
|
wf.setnchannels(n_channels) |
|
|
wf.setsampwidth(sampwidth) |
|
|
wf.setframerate(sample_rate) |
|
|
wf.writeframes(b'\x00\x00' * n_frames) |
|
|
logger.info(f"Created silent WAV at {out_path}") |
|
|
return out_path |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to create audio: {e}") |
|
|
raise |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info("Synthesizing audio with TTS...") |
|
|
tts = TTS(model_name="tts_models/en/ljspeech/tacotron2-DDC", progress_bar=False) |
|
|
tts.tts_to_file(text=text, file_path=out_path) |
|
|
logger.info(f"TTS synthesis successful: {out_path}") |
|
|
except Exception as e: |
|
|
logger.error(f"TTS synthesis failed: {e}, falling back to tone") |
|
|
|
|
|
if shutil.which("ffmpeg"): |
|
|
duration = max(2.0, len(text.split()) * 0.5) |
|
|
subprocess.run([ |
|
|
"ffmpeg", "-f", "lavfi", |
|
|
"-i", f"sine=frequency=440:duration={duration}", |
|
|
"-ar", "22050", |
|
|
"-y", out_path |
|
|
], capture_output=True, check=True, timeout=30) |
|
|
|
|
|
return out_path |
|
|
|
|
|
def _merge_audio_video(self, video_path, audio_path, out_path): |
|
|
"""Merge audio and video with better error handling""" |
|
|
if not FFMPEG_AVAILABLE: |
|
|
raise RuntimeError("ffmpeg-python not available") |
|
|
|
|
|
try: |
|
|
logger.info(f"Merging video {video_path} with audio {audio_path}") |
|
|
video_in = ffmpeg.input(video_path) |
|
|
audio_in = ffmpeg.input(audio_path) |
|
|
( |
|
|
ffmpeg |
|
|
.output(video_in, audio_in, out_path, |
|
|
vcodec='libx264', |
|
|
acodec='aac', |
|
|
audio_bitrate='128k', |
|
|
shortest=None, |
|
|
**{'b:v': '2M'}) |
|
|
.overwrite_output() |
|
|
.run(capture_stdout=True, capture_stderr=True) |
|
|
) |
|
|
logger.info(f"Successfully merged video and audio into {out_path}") |
|
|
except ffmpeg.Error as e: |
|
|
logger.error(f"FFmpeg error: {e.stderr.decode() if e.stderr else str(e)}") |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to merge audio and video: {e}") |
|
|
raise |
|
|
|
|
|
return out_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def enhance_video(self, input_path, scale=2, smooth=True): |
|
|
""" |
|
|
Uses Real-ESRGAN and RIFE (local binaries) for upscale and motion smoothing. |
|
|
Requires realesrgan-ncnn-vulkan and rife-ncnn-vulkan in workdir. |
|
|
""" |
|
|
realesrgan_bin = str(self.workdir / "realesrgan" / "realesrgan-ncnn-vulkan") |
|
|
rife_bin = str(self.workdir / "rife" / "rife-ncnn-vulkan") |
|
|
|
|
|
|
|
|
if os.name == "nt": |
|
|
realesrgan_bin += ".exe" |
|
|
rife_bin += ".exe" |
|
|
|
|
|
input_p = Path(input_path) |
|
|
upscaled = str(input_p.with_name(input_p.stem + "_upscaled" + input_p.suffix)) |
|
|
smoothed = str(input_p.with_name(input_p.stem + "_smoothed" + input_p.suffix)) |
|
|
|
|
|
if os.path.exists(realesrgan_bin): |
|
|
try: |
|
|
result = subprocess.run( |
|
|
[realesrgan_bin, "-i", input_path, "-o", upscaled, "-s", str(scale)], |
|
|
capture_output=True, timeout=300 |
|
|
) |
|
|
if result.returncode != 0: |
|
|
logger.warning(f"ESRGAN failed: {result.stderr.decode(errors='ignore')}") |
|
|
upscaled = input_path |
|
|
except Exception as e: |
|
|
logger.warning(f"ESRGAN enhancement failed: {e}") |
|
|
upscaled = input_path |
|
|
else: |
|
|
logger.info("ESRGAN binary not found, skipping upscaling") |
|
|
upscaled = input_path |
|
|
|
|
|
if smooth and os.path.exists(rife_bin): |
|
|
try: |
|
|
result = subprocess.run( |
|
|
[rife_bin, "-i", upscaled, "-o", smoothed], |
|
|
capture_output=True, timeout=300 |
|
|
) |
|
|
if result.returncode != 0: |
|
|
logger.warning(f"RIFE failed: {result.stderr.decode(errors='ignore')}") |
|
|
return upscaled |
|
|
return smoothed |
|
|
except Exception as e: |
|
|
logger.warning(f"RIFE smoothing failed: {e}") |
|
|
return upscaled |
|
|
else: |
|
|
logger.info("RIFE binary not found or smoothing disabled") |
|
|
|
|
|
return upscaled |
|
|
|
|
|
|
|
|
def _fs_path_to_public_url(self, fs_path: Optional[str]) -> Optional[str]: |
|
|
""" |
|
|
Convert an absolute filesystem path under self.workdir into a public URL |
|
|
using self.public_base_url. If the path is not under workdir, returns None. |
|
|
""" |
|
|
if not fs_path: |
|
|
return None |
|
|
try: |
|
|
p = Path(fs_path).resolve() |
|
|
rel = p.relative_to(self.workdir) |
|
|
|
|
|
return f"{self.public_base_url}/{rel.as_posix()}" |
|
|
except Exception: |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def generate( |
|
|
self, |
|
|
prompt: str, |
|
|
image_path: str = None, |
|
|
audio_path: str = None, |
|
|
output_name: str = None, |
|
|
num_frames: int = 16, |
|
|
fps: int = 8, |
|
|
enhance=True, |
|
|
duration_minutes: float = None, |
|
|
): |
|
|
"""Generate video with support for longer durations up to 10 minutes""" |
|
|
if duration_minutes: |
|
|
duration_minutes = min(duration_minutes, 10) |
|
|
num_frames = int(duration_minutes * 60 * fps) |
|
|
logger.info(f"Generating {duration_minutes} minute video with {num_frames} frames at {fps} fps") |
|
|
try: |
|
|
if not output_name: |
|
|
output_name = f"video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4" |
|
|
output_name_safe = Path(output_name).name |
|
|
output_path = str(self.output_dir / output_name_safe) |
|
|
|
|
|
frames = self._generate_frames_from_text(prompt, num_frames) |
|
|
raw_video = self._combine_frames_to_video(frames, out_path=output_path, fps=fps) |
|
|
|
|
|
if not audio_path: |
|
|
audio_out = str(self.audio_dir / f"{Path(output_name_safe).stem}.wav") |
|
|
self._synthesize_audio(prompt, audio_out) |
|
|
audio_path = audio_out |
|
|
|
|
|
final_out = str(self.output_dir / f"final_{Path(output_name_safe).name}") |
|
|
self._merge_audio_video(raw_video, audio_path, final_out) |
|
|
|
|
|
enhanced_path = None |
|
|
if enhance: |
|
|
try: |
|
|
enhanced_path = self.enhance_video(final_out) |
|
|
except Exception as e: |
|
|
logger.warning(f"Enhancement step failed: {e}") |
|
|
enhanced_path = None |
|
|
|
|
|
|
|
|
self._save_history(prompt, image_path, audio_path, final_out, enhanced_path) |
|
|
|
|
|
|
|
|
video_url = self._fs_path_to_public_url(final_out) or "" |
|
|
enhanced_url = self._fs_path_to_public_url(enhanced_path) or "" |
|
|
audio_url = self._fs_path_to_public_url(audio_path) or "" |
|
|
|
|
|
return { |
|
|
"video": final_out or "", |
|
|
"video_url": video_url, |
|
|
"enhanced": enhanced_path or "", |
|
|
"enhanced_url": enhanced_url, |
|
|
"audio": audio_path or "", |
|
|
"audio_url": audio_url, |
|
|
"frames": len(frames), |
|
|
"status": "success" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Video generation failed: {e}") |
|
|
|
|
|
return { |
|
|
"video": "", |
|
|
"video_url": "", |
|
|
"enhanced": "", |
|
|
"enhanced_url": "", |
|
|
"audio": "", |
|
|
"audio_url": "", |
|
|
"frames": 0, |
|
|
"status": "error", |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
|
|
|
def get_history(self, limit=20) -> List[Dict[str, Any]]: |
|
|
"""Get video generation history""" |
|
|
try: |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute("SELECT * FROM history ORDER BY id DESC LIMIT ?", (limit,)) |
|
|
rows = c.fetchall() |
|
|
conn.close() |
|
|
|
|
|
|
|
|
history = [] |
|
|
for row in rows: |
|
|
history.append({ |
|
|
"id": row[0], |
|
|
"prompt": row[1], |
|
|
"image_path": row[2], |
|
|
"audio_path": row[3], |
|
|
"output_path": row[4], |
|
|
"enhanced_path": row[5], |
|
|
"created_at": row[6] |
|
|
}) |
|
|
return history |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to get history: {e}") |
|
|
return [] |
|
|
|
|
|
def get_status(self) -> Dict[str, Any]: |
|
|
"""Get generator status and capabilities""" |
|
|
return { |
|
|
"torch_available": TORCH_AVAILABLE, |
|
|
"pil_available": PIL_AVAILABLE, |
|
|
"ffmpeg_available": FFMPEG_AVAILABLE, |
|
|
"tts_available": TTS_AVAILABLE, |
|
|
"device": self.device, |
|
|
"workdir": str(self.workdir), |
|
|
"output_dir": str(self.output_dir), |
|
|
"public_base_url": self.public_base_url |
|
|
} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
vg = VideoGenerator() |
|
|
result = vg.generate("A bright futuristic city skyline with flying cars and neon lights.") |
|
|
print("Generated:", result) |
|
|
import os |
|
|
import tempfile |
|
|
import subprocess |
|
|
import sqlite3 |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import Optional, Dict, Any, List |
|
|
import logging |
|
|
import wave |
|
|
import shutil |
|
|
|
|
|
logger = logging.getLogger("videogenerator") |
|
|
|
|
|
try: |
|
|
import torch |
|
|
TORCH_AVAILABLE = True |
|
|
except Exception: |
|
|
TORCH_AVAILABLE = False |
|
|
torch = None |
|
|
|
|
|
try: |
|
|
from PIL import Image |
|
|
PIL_AVAILABLE = True |
|
|
except Exception: |
|
|
PIL_AVAILABLE = False |
|
|
|
|
|
try: |
|
|
import ffmpeg |
|
|
FFMPEG_AVAILABLE = True |
|
|
except Exception: |
|
|
FFMPEG_AVAILABLE = False |
|
|
|
|
|
try: |
|
|
from TTS.api import TTS |
|
|
TTS_AVAILABLE = True |
|
|
except Exception: |
|
|
TTS_AVAILABLE = False |
|
|
|
|
|
|
|
|
class VideoGenerator: |
|
|
""" |
|
|
Offline text-to-video generator with local enhancement (ESRGAN + RIFE). |
|
|
No external API or internet required. |
|
|
IMPORTANT: If you want the returned URLs to be reachable by your frontend, |
|
|
configure your web server or app to serve the `workdir` directory under the |
|
|
public URL prefix defined by the environment variable VIDEO_PUBLIC_URL |
|
|
(default: /static/video_sandbox). Example (FastAPI): |
|
|
app.mount("/static/video_sandbox", StaticFiles(directory="/tmp/video_sandbox"), name="videos") |
|
|
""" |
|
|
|
|
|
def __init__(self, workdir="video_sandbox", db_name="history.db"): |
|
|
|
|
|
base_dir = Path(os.getenv("VIDEO_SANDBOX_DIR", "/tmp/video_sandbox")) |
|
|
|
|
|
self.workdir = base_dir.resolve() |
|
|
self.output_dir = self.workdir / "output" |
|
|
self.frames_dir = self.workdir / "frames" |
|
|
self.audio_dir = self.workdir / "audio" |
|
|
self.db_path = self.workdir / db_name |
|
|
|
|
|
|
|
|
self.public_base_url = os.getenv("VIDEO_PUBLIC_URL", "/static/video_sandbox").rstrip("/") |
|
|
|
|
|
|
|
|
for d in [self.workdir, self.output_dir, self.frames_dir, self.audio_dir]: |
|
|
d.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
self._init_db() |
|
|
self.device = "cuda" if (TORCH_AVAILABLE and torch and torch.cuda.is_available()) else "cpu" |
|
|
logger.info(f"VideoGenerator initialized with device: {self.device}, public_base_url: {self.public_base_url}") |
|
|
|
|
|
|
|
|
|
|
|
def _init_db(self): |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute( |
|
|
"""CREATE TABLE IF NOT EXISTS history ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
prompt TEXT, |
|
|
image_path TEXT, |
|
|
audio_path TEXT, |
|
|
output_path TEXT, |
|
|
enhanced_path TEXT, |
|
|
created_at TEXT |
|
|
)""" |
|
|
) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
def _save_history(self, prompt, image_path, audio_path, output_path, enhanced_path=None): |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute( |
|
|
"INSERT INTO history (prompt, image_path, audio_path, output_path, enhanced_path, created_at) VALUES (?, ?, ?, ?, ?, ?)", |
|
|
(prompt, image_path, audio_path, output_path, enhanced_path, datetime.now().isoformat()), |
|
|
) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
|
|
|
def _generate_frames_from_text(self, prompt, num_frames=16, resolution=(512, 512)): |
|
|
""" |
|
|
Generate creative video frames with realistic scenes based on prompt. |
|
|
Simulates beings (humans, animals) and objects with motion and detail. |
|
|
""" |
|
|
frames = [] |
|
|
from PIL import ImageDraw, ImageFont |
|
|
import math |
|
|
import random |
|
|
|
|
|
|
|
|
prompt_lower = prompt.lower() |
|
|
|
|
|
|
|
|
has_human = any(word in prompt_lower for word in ["person", "man", "woman", "human", "people", "walking", "running"]) |
|
|
has_animal = any(word in prompt_lower for word in ["dog", "cat", "bird", "animal", "flying", "swimming"]) |
|
|
has_nature = any(word in prompt_lower for word in ["tree", "forest", "mountain", "sky", "cloud", "sunset", "sunrise"]) |
|
|
has_city = any(word in prompt_lower for word in ["city", "building", "car", "street", "urban", "skyline"]) |
|
|
|
|
|
|
|
|
scene_type = "abstract" |
|
|
if has_human: |
|
|
scene_type = "human" |
|
|
elif has_animal: |
|
|
scene_type = "animal" |
|
|
elif has_nature: |
|
|
scene_type = "nature" |
|
|
elif has_city: |
|
|
scene_type = "city" |
|
|
|
|
|
for i in range(num_frames): |
|
|
img = Image.new("RGB", resolution, (0, 0, 0)) |
|
|
draw = ImageDraw.Draw(img) |
|
|
progress = i / max(1, num_frames - 1) |
|
|
|
|
|
|
|
|
if scene_type == "nature": |
|
|
|
|
|
for y in range(resolution[1]): |
|
|
sky_progress = y / resolution[1] |
|
|
r = int(135 + 50 * sky_progress) |
|
|
g = int(206 - 50 * sky_progress) |
|
|
b = int(235 - 30 * sky_progress) |
|
|
draw.line([(0, y), (resolution[0], y)], fill=(r, g, b)) |
|
|
|
|
|
|
|
|
ground_y = int(resolution[1] * 0.7) |
|
|
draw.rectangle([0, ground_y, resolution[0], resolution[1]], fill=(34, 139, 34)) |
|
|
|
|
|
|
|
|
for tree_x in range(50, resolution[0], 100): |
|
|
trunk_x = tree_x + int(20 * math.sin(progress * 2 * math.pi)) |
|
|
draw.rectangle([trunk_x, ground_y-60, trunk_x+20, ground_y], fill=(101, 67, 33)) |
|
|
draw.ellipse([trunk_x-30, ground_y-100, trunk_x+50, ground_y-40], fill=(0, 128, 0)) |
|
|
|
|
|
|
|
|
for cloud_x in range(100, resolution[0], 150): |
|
|
cloud_offset = int(progress * 50) |
|
|
cx = (cloud_x + cloud_offset) % resolution[0] |
|
|
cy = 80 + int(10 * math.sin(progress * math.pi)) |
|
|
draw.ellipse([cx-40, cy-20, cx+40, cy+20], fill=(255, 255, 255)) |
|
|
draw.ellipse([cx-20, cy-15, cx+60, cy+25], fill=(255, 255, 255)) |
|
|
|
|
|
elif scene_type == "city": |
|
|
|
|
|
for y in range(resolution[1] // 2): |
|
|
sky_val = int(100 + 100 * (y / (resolution[1] // 2))) |
|
|
draw.line([(0, y), (resolution[0], y)], fill=(sky_val, sky_val, sky_val + 50)) |
|
|
|
|
|
|
|
|
for bldg_x in range(0, resolution[0], 80): |
|
|
height = random.randint(150, 300) |
|
|
y_start = resolution[1] - height |
|
|
draw.rectangle([bldg_x, y_start, bldg_x+70, resolution[1]], |
|
|
fill=(random.randint(100, 150), random.randint(100, 150), random.randint(100, 150))) |
|
|
for win_y in range(y_start + 20, resolution[1], 30): |
|
|
for win_x in range(bldg_x + 10, bldg_x + 60, 20): |
|
|
light = random.choice([True, False]) |
|
|
color = (255, 255, 200) if light else (50, 50, 50) |
|
|
draw.rectangle([win_x, win_y, win_x+10, win_y+15], fill=color) |
|
|
|
|
|
|
|
|
car_x = int(progress * resolution[0]) |
|
|
car_y = resolution[1] - 40 |
|
|
draw.rectangle([car_x, car_y, car_x+60, car_y+25], fill=(255, 0, 0)) |
|
|
draw.ellipse([car_x+10, car_y+20, car_x+25, car_y+35], fill=(0, 0, 0)) |
|
|
draw.ellipse([car_x+45, car_y+20, car_x+60, car_y+35], fill=(0, 0, 0)) |
|
|
|
|
|
elif scene_type == "human": |
|
|
draw.rectangle([0, 0, resolution[0], resolution[1]], fill=(200, 220, 255)) |
|
|
ground_y = int(resolution[1] * 0.75) |
|
|
draw.rectangle([0, ground_y, resolution[0], resolution[1]], fill=(150, 150, 150)) |
|
|
|
|
|
person_x = int(100 + progress * (resolution[0] - 200)) |
|
|
person_y = ground_y - 100 |
|
|
leg_offset = int(20 * math.sin(progress * 10)) |
|
|
|
|
|
draw.ellipse([person_x+15, person_y, person_x+45, person_y+30], fill=(255, 220, 177)) |
|
|
draw.rectangle([person_x+20, person_y+30, person_x+40, person_y+70], fill=(0, 0, 255)) |
|
|
draw.line([(person_x+20, person_y+40), (person_x+5, person_y+60)], fill=(255, 220, 177), width=5) |
|
|
draw.line([(person_x+40, person_y+40), (person_x+55, person_y+60)], fill=(255, 220, 177), width=5) |
|
|
draw.line([(person_x+25, person_y+70), (person_x+20+leg_offset, ground_y)], fill=(0, 0, 139), width=5) |
|
|
draw.line([(person_x+35, person_y+70), (person_x+40-leg_offset, ground_y)], fill=(0, 0, 139), width=5) |
|
|
|
|
|
elif scene_type == "animal": |
|
|
for y in range(resolution[1]): |
|
|
val = int(180 + 50 * (y / resolution[1])) |
|
|
draw.line([(0, y), (resolution[0], y)], fill=(val, val-20, val-40)) |
|
|
|
|
|
ground_y = int(resolution[1] * 0.8) |
|
|
draw.rectangle([0, ground_y, resolution[0], resolution[1]], fill=(139, 90, 43)) |
|
|
|
|
|
animal_x = int(50 + progress * (resolution[0] - 150)) |
|
|
animal_y = ground_y - 60 |
|
|
|
|
|
draw.ellipse([animal_x, animal_y, animal_x+80, animal_y+40], fill=(139, 69, 19)) |
|
|
draw.ellipse([animal_x+60, animal_y-20, animal_x+100, animal_y+20], fill=(139, 69, 19)) |
|
|
draw.polygon([(animal_x+65, animal_y-20), (animal_x+70, animal_y-35), (animal_x+75, animal_y-20)], fill=(101, 67, 33)) |
|
|
draw.polygon([(animal_x+85, animal_y-20), (animal_x+90, animal_y-35), (animal_x+95, animal_y-20)], fill=(101, 67, 33)) |
|
|
|
|
|
leg_anim = int(5 * math.sin(progress * 15)) |
|
|
for leg_x in [animal_x+10, animal_x+30, animal_x+50, animal_x+70]: |
|
|
draw.rectangle([leg_x, animal_y+40, leg_x+8, ground_y+leg_anim], fill=(101, 67, 33)) |
|
|
|
|
|
tail_angle = 20 * math.sin(progress * 10) |
|
|
tail_end_x = animal_x - 20 + int(tail_angle) |
|
|
tail_end_y = animal_y + 10 |
|
|
draw.line([(animal_x, animal_y+20), (tail_end_x, tail_end_y)], fill=(101, 67, 33), width=5) |
|
|
|
|
|
else: |
|
|
for y in range(resolution[1]): |
|
|
color_val = int(y / resolution[1] * 255) |
|
|
r = int(50 + 100 * progress + color_val // 3) |
|
|
g = int(100 + 80 * progress + color_val // 3) |
|
|
b = int(150 + 50 * progress + color_val // 3) |
|
|
draw.line([(0, y), (resolution[0], y)], fill=(min(255, r), min(255, g), min(255, b))) |
|
|
|
|
|
for j in range(5): |
|
|
x = int((j * 100 + progress * 200) % resolution[0]) |
|
|
y = int(resolution[1] // 2 + 50 * math.sin(progress * 2 * math.pi + j)) |
|
|
radius = 20 + int(10 * math.sin(progress * math.pi + j)) |
|
|
color = ( |
|
|
int(255 * abs(math.sin(progress * math.pi + j))), |
|
|
int(255 * abs(math.cos(progress * math.pi + j))), |
|
|
int(255 * abs(math.sin(progress * 2 * math.pi + j))) |
|
|
) |
|
|
draw.ellipse([x-radius, y-radius, x+radius, y+radius], fill=color) |
|
|
|
|
|
try: |
|
|
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 32) |
|
|
except: |
|
|
font = ImageFont.load_default() |
|
|
|
|
|
text = prompt[:40] if len(prompt) <= 40 else prompt[:37] + "..." |
|
|
bbox = draw.textbbox((0, 0), text, font=font) |
|
|
text_width = bbox[2] - bbox[0] |
|
|
x = (resolution[0] - text_width) // 2 |
|
|
y = 20 |
|
|
|
|
|
for offset in [(-1,-1), (-1,1), (1,-1), (1,1)]: |
|
|
draw.text((x+offset[0], y+offset[1]), text, font=font, fill=(0, 0, 0)) |
|
|
draw.text((x, y), text, font=font, fill=(255, 255, 255)) |
|
|
|
|
|
frames.append(img) |
|
|
|
|
|
return frames |
|
|
|
|
|
def _combine_frames_to_video(self, frames, out_path, fps=8): |
|
|
if not FFMPEG_AVAILABLE: |
|
|
raise RuntimeError("ffmpeg-python not available") |
|
|
|
|
|
|
|
|
tmp_dir = tempfile.mkdtemp(dir=str(self.frames_dir)) |
|
|
try: |
|
|
for i, frame in enumerate(frames): |
|
|
frame_path = os.path.join(tmp_dir, f"frame_{i:03d}.png") |
|
|
frame.save(frame_path) |
|
|
|
|
|
( |
|
|
ffmpeg |
|
|
.input(os.path.join(tmp_dir, "frame_%03d.png"), framerate=fps) |
|
|
.output(out_path, vcodec='libx264', pix_fmt='yuv420p') |
|
|
.overwrite_output() |
|
|
.run(quiet=True, capture_stdout=True, capture_stderr=True) |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to combine frames: {e}") |
|
|
raise |
|
|
finally: |
|
|
|
|
|
try: |
|
|
shutil.rmtree(tmp_dir, ignore_errors=True) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
return out_path |
|
|
|
|
|
def _synthesize_audio(self, text, out_path): |
|
|
"""Synthesize audio with better quality and error handling""" |
|
|
if not TTS_AVAILABLE: |
|
|
logger.warning("TTS not available, creating audio with beeps") |
|
|
duration = max(2.0, len(text.split()) * 0.5) |
|
|
|
|
|
|
|
|
if shutil.which("ffmpeg"): |
|
|
try: |
|
|
subprocess.run([ |
|
|
"ffmpeg", "-f", "lavfi", |
|
|
"-i", f"sine=frequency=440:duration={duration}", |
|
|
"-ar", "22050", |
|
|
"-y", out_path |
|
|
], capture_output=True, check=True, timeout=30) |
|
|
logger.info(f"Created tone audio at {out_path}") |
|
|
return out_path |
|
|
except Exception as e: |
|
|
logger.warning(f"ffmpeg tone generation failed: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
sample_rate = 22050 |
|
|
n_channels = 1 |
|
|
sampwidth = 2 |
|
|
n_frames = int(duration * sample_rate) |
|
|
with wave.open(out_path, "wb") as wf: |
|
|
wf.setnchannels(n_channels) |
|
|
wf.setsampwidth(sampwidth) |
|
|
wf.setframerate(sample_rate) |
|
|
wf.writeframes(b'\x00\x00' * n_frames) |
|
|
logger.info(f"Created silent WAV at {out_path}") |
|
|
return out_path |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to create audio: {e}") |
|
|
raise |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info("Synthesizing audio with TTS...") |
|
|
tts = TTS(model_name="tts_models/en/ljspeech/tacotron2-DDC", progress_bar=False) |
|
|
tts.tts_to_file(text=text, file_path=out_path) |
|
|
logger.info(f"TTS synthesis successful: {out_path}") |
|
|
except Exception as e: |
|
|
logger.error(f"TTS synthesis failed: {e}, falling back to tone") |
|
|
|
|
|
if shutil.which("ffmpeg"): |
|
|
duration = max(2.0, len(text.split()) * 0.5) |
|
|
subprocess.run([ |
|
|
"ffmpeg", "-f", "lavfi", |
|
|
"-i", f"sine=frequency=440:duration={duration}", |
|
|
"-ar", "22050", |
|
|
"-y", out_path |
|
|
], capture_output=True, check=True, timeout=30) |
|
|
|
|
|
return out_path |
|
|
|
|
|
def _merge_audio_video(self, video_path, audio_path, out_path): |
|
|
"""Merge audio and video with better error handling""" |
|
|
if not FFMPEG_AVAILABLE: |
|
|
raise RuntimeError("ffmpeg-python not available") |
|
|
|
|
|
try: |
|
|
logger.info(f"Merging video {video_path} with audio {audio_path}") |
|
|
video_in = ffmpeg.input(video_path) |
|
|
audio_in = ffmpeg.input(audio_path) |
|
|
( |
|
|
ffmpeg |
|
|
.output(video_in, audio_in, out_path, |
|
|
vcodec='libx264', |
|
|
acodec='aac', |
|
|
audio_bitrate='128k', |
|
|
shortest=None, |
|
|
**{'b:v': '2M'}) |
|
|
.overwrite_output() |
|
|
.run(capture_stdout=True, capture_stderr=True) |
|
|
) |
|
|
logger.info(f"Successfully merged video and audio into {out_path}") |
|
|
except ffmpeg.Error as e: |
|
|
logger.error(f"FFmpeg error: {e.stderr.decode() if e.stderr else str(e)}") |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to merge audio and video: {e}") |
|
|
raise |
|
|
|
|
|
return out_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def enhance_video(self, input_path, scale=2, smooth=True): |
|
|
""" |
|
|
Uses Real-ESRGAN and RIFE (local binaries) for upscale and motion smoothing. |
|
|
Requires realesrgan-ncnn-vulkan and rife-ncnn-vulkan in workdir. |
|
|
""" |
|
|
realesrgan_bin = str(self.workdir / "realesrgan" / "realesrgan-ncnn-vulkan") |
|
|
rife_bin = str(self.workdir / "rife" / "rife-ncnn-vulkan") |
|
|
|
|
|
|
|
|
if os.name == "nt": |
|
|
realesrgan_bin += ".exe" |
|
|
rife_bin += ".exe" |
|
|
|
|
|
input_p = Path(input_path) |
|
|
upscaled = str(input_p.with_name(input_p.stem + "_upscaled" + input_p.suffix)) |
|
|
smoothed = str(input_p.with_name(input_p.stem + "_smoothed" + input_p.suffix)) |
|
|
|
|
|
if os.path.exists(realesrgan_bin): |
|
|
try: |
|
|
result = subprocess.run( |
|
|
[realesrgan_bin, "-i", input_path, "-o", upscaled, "-s", str(scale)], |
|
|
capture_output=True, timeout=300 |
|
|
) |
|
|
if result.returncode != 0: |
|
|
logger.warning(f"ESRGAN failed: {result.stderr.decode(errors='ignore')}") |
|
|
upscaled = input_path |
|
|
except Exception as e: |
|
|
logger.warning(f"ESRGAN enhancement failed: {e}") |
|
|
upscaled = input_path |
|
|
else: |
|
|
logger.info("ESRGAN binary not found, skipping upscaling") |
|
|
upscaled = input_path |
|
|
|
|
|
if smooth and os.path.exists(rife_bin): |
|
|
try: |
|
|
result = subprocess.run( |
|
|
[rife_bin, "-i", upscaled, "-o", smoothed], |
|
|
capture_output=True, timeout=300 |
|
|
) |
|
|
if result.returncode != 0: |
|
|
logger.warning(f"RIFE failed: {result.stderr.decode(errors='ignore')}") |
|
|
return upscaled |
|
|
return smoothed |
|
|
except Exception as e: |
|
|
logger.warning(f"RIFE smoothing failed: {e}") |
|
|
return upscaled |
|
|
else: |
|
|
logger.info("RIFE binary not found or smoothing disabled") |
|
|
|
|
|
return upscaled |
|
|
|
|
|
|
|
|
def _fs_path_to_public_url(self, fs_path: Optional[str]) -> Optional[str]: |
|
|
""" |
|
|
Convert an absolute filesystem path under self.workdir into a public URL |
|
|
using self.public_base_url. If the path is not under workdir, returns None. |
|
|
""" |
|
|
if not fs_path: |
|
|
return None |
|
|
try: |
|
|
p = Path(fs_path).resolve() |
|
|
rel = p.relative_to(self.workdir) |
|
|
|
|
|
return f"{self.public_base_url}/{rel.as_posix()}" |
|
|
except Exception: |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def generate( |
|
|
self, |
|
|
prompt: str, |
|
|
image_path: str = None, |
|
|
audio_path: str = None, |
|
|
output_name: str = None, |
|
|
num_frames: int = 16, |
|
|
fps: int = 8, |
|
|
enhance=True, |
|
|
duration_minutes: float = None, |
|
|
): |
|
|
"""Generate video with support for longer durations up to 10 minutes""" |
|
|
if duration_minutes: |
|
|
duration_minutes = min(duration_minutes, 10) |
|
|
num_frames = int(duration_minutes * 60 * fps) |
|
|
logger.info(f"Generating {duration_minutes} minute video with {num_frames} frames at {fps} fps") |
|
|
try: |
|
|
if not output_name: |
|
|
output_name = f"video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4" |
|
|
output_name_safe = Path(output_name).name |
|
|
output_path = str(self.output_dir / output_name_safe) |
|
|
|
|
|
frames = self._generate_frames_from_text(prompt, num_frames) |
|
|
raw_video = self._combine_frames_to_video(frames, out_path=output_path, fps=fps) |
|
|
|
|
|
if not audio_path: |
|
|
audio_out = str(self.audio_dir / f"{Path(output_name_safe).stem}.wav") |
|
|
self._synthesize_audio(prompt, audio_out) |
|
|
audio_path = audio_out |
|
|
|
|
|
final_out = str(self.output_dir / f"final_{Path(output_name_safe).name}") |
|
|
self._merge_audio_video(raw_video, audio_path, final_out) |
|
|
|
|
|
enhanced_path = None |
|
|
if enhance: |
|
|
try: |
|
|
enhanced_path = self.enhance_video(final_out) |
|
|
except Exception as e: |
|
|
logger.warning(f"Enhancement step failed: {e}") |
|
|
enhanced_path = None |
|
|
|
|
|
|
|
|
self._save_history(prompt, image_path, audio_path, final_out, enhanced_path) |
|
|
|
|
|
|
|
|
video_url = self._fs_path_to_public_url(final_out) or "" |
|
|
enhanced_url = self._fs_path_to_public_url(enhanced_path) or "" |
|
|
audio_url = self._fs_path_to_public_url(audio_path) or "" |
|
|
|
|
|
return { |
|
|
"video": final_out or "", |
|
|
"video_url": video_url, |
|
|
"enhanced": enhanced_path or "", |
|
|
"enhanced_url": enhanced_url, |
|
|
"audio": audio_path or "", |
|
|
"audio_url": audio_url, |
|
|
"frames": len(frames), |
|
|
"status": "success" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Video generation failed: {e}") |
|
|
|
|
|
return { |
|
|
"video": "", |
|
|
"video_url": "", |
|
|
"enhanced": "", |
|
|
"enhanced_url": "", |
|
|
"audio": "", |
|
|
"audio_url": "", |
|
|
"frames": 0, |
|
|
"status": "error", |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
|
|
|
def get_history(self, limit=20) -> List[Dict[str, Any]]: |
|
|
"""Get video generation history""" |
|
|
try: |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute("SELECT * FROM history ORDER BY id DESC LIMIT ?", (limit,)) |
|
|
rows = c.fetchall() |
|
|
conn.close() |
|
|
|
|
|
|
|
|
history = [] |
|
|
for row in rows: |
|
|
history.append({ |
|
|
"id": row[0], |
|
|
"prompt": row[1], |
|
|
"image_path": row[2], |
|
|
"audio_path": row[3], |
|
|
"output_path": row[4], |
|
|
"enhanced_path": row[5], |
|
|
"created_at": row[6] |
|
|
}) |
|
|
return history |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to get history: {e}") |
|
|
return [] |
|
|
|
|
|
def get_status(self) -> Dict[str, Any]: |
|
|
"""Get generator status and capabilities""" |
|
|
return { |
|
|
"torch_available": TORCH_AVAILABLE, |
|
|
"pil_available": PIL_AVAILABLE, |
|
|
"ffmpeg_available": FFMPEG_AVAILABLE, |
|
|
"tts_available": TTS_AVAILABLE, |
|
|
"device": self.device, |
|
|
"workdir": str(self.workdir), |
|
|
"output_dir": str(self.output_dir), |
|
|
"public_base_url": self.public_base_url |
|
|
} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
vg = VideoGenerator() |
|
|
result = vg.generate("A bright futuristic city skyline with flying cars and neon lights.") |
|
|
print("Generated:", result) |