justiceai / videogenerator.py
Princeaka's picture
Update videogenerator.py
9cd3579 verified
#!/usr/bin/env python3
import os
import tempfile
import subprocess
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any, List
import logging
import wave
import shutil
logger = logging.getLogger("videogenerator")
try:
import torch
TORCH_AVAILABLE = True
except Exception:
TORCH_AVAILABLE = False
torch = None
try:
from PIL import Image
PIL_AVAILABLE = True
except Exception:
PIL_AVAILABLE = False
try:
import ffmpeg
FFMPEG_AVAILABLE = True
except Exception:
FFMPEG_AVAILABLE = False
try:
from TTS.api import TTS
TTS_AVAILABLE = True
except Exception:
TTS_AVAILABLE = False
class VideoGenerator:
"""
Offline text-to-video generator with local enhancement (ESRGAN + RIFE).
No external API or internet required.
IMPORTANT: If you want the returned URLs to be reachable by your frontend,
configure your web server or app to serve the `workdir` directory under the
public URL prefix defined by the environment variable VIDEO_PUBLIC_URL
(default: /static/video_sandbox). Example (FastAPI):
app.mount("/static/video_sandbox", StaticFiles(directory="/tmp/video_sandbox"), name="videos")
"""
def __init__(self, workdir="video_sandbox", db_name="history.db"):
# Use a writable directory in environment or /tmp
base_dir = Path(os.getenv("VIDEO_SANDBOX_DIR", "/tmp/video_sandbox"))
self.workdir = base_dir.resolve()
self.output_dir = self.workdir / "output"
self.frames_dir = self.workdir / "frames"
self.audio_dir = self.workdir / "audio"
self.db_path = self.workdir / db_name
# Public base URL mapping (what frontend will use). No trailing slash.
self.public_base_url = os.getenv("VIDEO_PUBLIC_URL", "/static/video_sandbox").rstrip("/")
# Create directories safely
for d in [self.workdir, self.output_dir, self.frames_dir, self.audio_dir]:
d.mkdir(parents=True, exist_ok=True)
self._init_db()
self.device = "cuda" if (TORCH_AVAILABLE and torch and torch.cuda.is_available()) else "cpu"
logger.info(f"VideoGenerator initialized with device: {self.device}, public_base_url: {self.public_base_url}")
# ---------------- Database ----------------
def _init_db(self):
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute(
"""CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
prompt TEXT,
image_path TEXT,
audio_path TEXT,
output_path TEXT,
enhanced_path TEXT,
created_at TEXT
)"""
)
conn.commit()
conn.close()
def _save_history(self, prompt, image_path, audio_path, output_path, enhanced_path=None):
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute(
"INSERT INTO history (prompt, image_path, audio_path, output_path, enhanced_path, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(prompt, image_path, audio_path, output_path, enhanced_path, datetime.now().isoformat()),
)
conn.commit()
conn.close()
# ---------------- Frame + Audio Generation ----------------
def _generate_frames_from_text(self, prompt, num_frames=16, resolution=(512, 512)):
"""
Generate creative video frames with realistic scenes based on prompt.
Simulates beings (humans, animals) and objects with motion and detail.
"""
frames = []
from PIL import ImageDraw, ImageFont
import math
import random
# Parse prompt for content type
prompt_lower = prompt.lower()
# Detect subjects
has_human = any(word in prompt_lower for word in ["person", "man", "woman", "human", "people", "walking", "running"])
has_animal = any(word in prompt_lower for word in ["dog", "cat", "bird", "animal", "flying", "swimming"])
has_nature = any(word in prompt_lower for word in ["tree", "forest", "mountain", "sky", "cloud", "sunset", "sunrise"])
has_city = any(word in prompt_lower for word in ["city", "building", "car", "street", "urban", "skyline"])
# Scene setup
scene_type = "abstract"
if has_human:
scene_type = "human"
elif has_animal:
scene_type = "animal"
elif has_nature:
scene_type = "nature"
elif has_city:
scene_type = "city"
for i in range(num_frames):
img = Image.new("RGB", resolution, (0, 0, 0))
draw = ImageDraw.Draw(img)
progress = i / max(1, num_frames - 1)
# Dynamic background based on scene
if scene_type == "nature":
# Sky gradient
for y in range(resolution[1]):
sky_progress = y / resolution[1]
r = int(135 + 50 * sky_progress)
g = int(206 - 50 * sky_progress)
b = int(235 - 30 * sky_progress)
draw.line([(0, y), (resolution[0], y)], fill=(r, g, b))
# Ground
ground_y = int(resolution[1] * 0.7)
draw.rectangle([0, ground_y, resolution[0], resolution[1]], fill=(34, 139, 34))
# Trees
for tree_x in range(50, resolution[0], 100):
trunk_x = tree_x + int(20 * math.sin(progress * 2 * math.pi))
draw.rectangle([trunk_x, ground_y-60, trunk_x+20, ground_y], fill=(101, 67, 33))
draw.ellipse([trunk_x-30, ground_y-100, trunk_x+50, ground_y-40], fill=(0, 128, 0))
# Clouds
for cloud_x in range(100, resolution[0], 150):
cloud_offset = int(progress * 50)
cx = (cloud_x + cloud_offset) % resolution[0]
cy = 80 + int(10 * math.sin(progress * math.pi))
draw.ellipse([cx-40, cy-20, cx+40, cy+20], fill=(255, 255, 255))
draw.ellipse([cx-20, cy-15, cx+60, cy+25], fill=(255, 255, 255))
elif scene_type == "city":
# Sky
for y in range(resolution[1] // 2):
sky_val = int(100 + 100 * (y / (resolution[1] // 2)))
draw.line([(0, y), (resolution[0], y)], fill=(sky_val, sky_val, sky_val + 50))
# Buildings
for bldg_x in range(0, resolution[0], 80):
height = random.randint(150, 300)
y_start = resolution[1] - height
draw.rectangle([bldg_x, y_start, bldg_x+70, resolution[1]],
fill=(random.randint(100, 150), random.randint(100, 150), random.randint(100, 150)))
for win_y in range(y_start + 20, resolution[1], 30):
for win_x in range(bldg_x + 10, bldg_x + 60, 20):
light = random.choice([True, False])
color = (255, 255, 200) if light else (50, 50, 50)
draw.rectangle([win_x, win_y, win_x+10, win_y+15], fill=color)
# Moving car
car_x = int(progress * resolution[0])
car_y = resolution[1] - 40
draw.rectangle([car_x, car_y, car_x+60, car_y+25], fill=(255, 0, 0))
draw.ellipse([car_x+10, car_y+20, car_x+25, car_y+35], fill=(0, 0, 0))
draw.ellipse([car_x+45, car_y+20, car_x+60, car_y+35], fill=(0, 0, 0))
elif scene_type == "human":
draw.rectangle([0, 0, resolution[0], resolution[1]], fill=(200, 220, 255))
ground_y = int(resolution[1] * 0.75)
draw.rectangle([0, ground_y, resolution[0], resolution[1]], fill=(150, 150, 150))
person_x = int(100 + progress * (resolution[0] - 200))
person_y = ground_y - 100
leg_offset = int(20 * math.sin(progress * 10))
draw.ellipse([person_x+15, person_y, person_x+45, person_y+30], fill=(255, 220, 177))
draw.rectangle([person_x+20, person_y+30, person_x+40, person_y+70], fill=(0, 0, 255))
draw.line([(person_x+20, person_y+40), (person_x+5, person_y+60)], fill=(255, 220, 177), width=5)
draw.line([(person_x+40, person_y+40), (person_x+55, person_y+60)], fill=(255, 220, 177), width=5)
draw.line([(person_x+25, person_y+70), (person_x+20+leg_offset, ground_y)], fill=(0, 0, 139), width=5)
draw.line([(person_x+35, person_y+70), (person_x+40-leg_offset, ground_y)], fill=(0, 0, 139), width=5)
elif scene_type == "animal":
for y in range(resolution[1]):
val = int(180 + 50 * (y / resolution[1]))
draw.line([(0, y), (resolution[0], y)], fill=(val, val-20, val-40))
ground_y = int(resolution[1] * 0.8)
draw.rectangle([0, ground_y, resolution[0], resolution[1]], fill=(139, 90, 43))
animal_x = int(50 + progress * (resolution[0] - 150))
animal_y = ground_y - 60
draw.ellipse([animal_x, animal_y, animal_x+80, animal_y+40], fill=(139, 69, 19))
draw.ellipse([animal_x+60, animal_y-20, animal_x+100, animal_y+20], fill=(139, 69, 19))
draw.polygon([(animal_x+65, animal_y-20), (animal_x+70, animal_y-35), (animal_x+75, animal_y-20)], fill=(101, 67, 33))
draw.polygon([(animal_x+85, animal_y-20), (animal_x+90, animal_y-35), (animal_x+95, animal_y-20)], fill=(101, 67, 33))
leg_anim = int(5 * math.sin(progress * 15))
for leg_x in [animal_x+10, animal_x+30, animal_x+50, animal_x+70]:
draw.rectangle([leg_x, animal_y+40, leg_x+8, ground_y+leg_anim], fill=(101, 67, 33))
tail_angle = 20 * math.sin(progress * 10)
tail_end_x = animal_x - 20 + int(tail_angle)
tail_end_y = animal_y + 10
draw.line([(animal_x, animal_y+20), (tail_end_x, tail_end_y)], fill=(101, 67, 33), width=5)
else:
for y in range(resolution[1]):
color_val = int(y / resolution[1] * 255)
r = int(50 + 100 * progress + color_val // 3)
g = int(100 + 80 * progress + color_val // 3)
b = int(150 + 50 * progress + color_val // 3)
draw.line([(0, y), (resolution[0], y)], fill=(min(255, r), min(255, g), min(255, b)))
for j in range(5):
x = int((j * 100 + progress * 200) % resolution[0])
y = int(resolution[1] // 2 + 50 * math.sin(progress * 2 * math.pi + j))
radius = 20 + int(10 * math.sin(progress * math.pi + j))
color = (
int(255 * abs(math.sin(progress * math.pi + j))),
int(255 * abs(math.cos(progress * math.pi + j))),
int(255 * abs(math.sin(progress * 2 * math.pi + j)))
)
draw.ellipse([x-radius, y-radius, x+radius, y+radius], fill=color)
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 32)
except:
font = ImageFont.load_default()
text = prompt[:40] if len(prompt) <= 40 else prompt[:37] + "..."
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
x = (resolution[0] - text_width) // 2
y = 20
for offset in [(-1,-1), (-1,1), (1,-1), (1,1)]:
draw.text((x+offset[0], y+offset[1]), text, font=font, fill=(0, 0, 0))
draw.text((x, y), text, font=font, fill=(255, 255, 255))
frames.append(img)
return frames
def _combine_frames_to_video(self, frames, out_path, fps=8):
if not FFMPEG_AVAILABLE:
raise RuntimeError("ffmpeg-python not available")
# ensure a string dir is passed to mkdtemp
tmp_dir = tempfile.mkdtemp(dir=str(self.frames_dir))
try:
for i, frame in enumerate(frames):
frame_path = os.path.join(tmp_dir, f"frame_{i:03d}.png")
frame.save(frame_path)
(
ffmpeg
.input(os.path.join(tmp_dir, "frame_%03d.png"), framerate=fps)
.output(out_path, vcodec='libx264', pix_fmt='yuv420p')
.overwrite_output()
.run(quiet=True, capture_stdout=True, capture_stderr=True)
)
except Exception as e:
logger.error(f"Failed to combine frames: {e}")
raise
finally:
# Cleanup temp directory
try:
shutil.rmtree(tmp_dir, ignore_errors=True)
except Exception:
pass
return out_path
def _synthesize_audio(self, text, out_path):
"""Synthesize audio with better quality and error handling"""
if not TTS_AVAILABLE:
logger.warning("TTS not available, creating audio with beeps")
duration = max(2.0, len(text.split()) * 0.5)
# Try ffmpeg with tone generation
if shutil.which("ffmpeg"):
try:
subprocess.run([
"ffmpeg", "-f", "lavfi",
"-i", f"sine=frequency=440:duration={duration}",
"-ar", "22050",
"-y", out_path
], capture_output=True, check=True, timeout=30)
logger.info(f"Created tone audio at {out_path}")
return out_path
except Exception as e:
logger.warning(f"ffmpeg tone generation failed: {e}")
# Fallback to silent WAV
try:
sample_rate = 22050
n_channels = 1
sampwidth = 2
n_frames = int(duration * sample_rate)
with wave.open(out_path, "wb") as wf:
wf.setnchannels(n_channels)
wf.setsampwidth(sampwidth)
wf.setframerate(sample_rate)
wf.writeframes(b'\x00\x00' * n_frames)
logger.info(f"Created silent WAV at {out_path}")
return out_path
except Exception as e:
logger.error(f"Failed to create audio: {e}")
raise
try:
# Use TTS with better error handling
logger.info("Synthesizing audio with TTS...")
tts = TTS(model_name="tts_models/en/ljspeech/tacotron2-DDC", progress_bar=False)
tts.tts_to_file(text=text, file_path=out_path)
logger.info(f"TTS synthesis successful: {out_path}")
except Exception as e:
logger.error(f"TTS synthesis failed: {e}, falling back to tone")
# Fallback to tone
if shutil.which("ffmpeg"):
duration = max(2.0, len(text.split()) * 0.5)
subprocess.run([
"ffmpeg", "-f", "lavfi",
"-i", f"sine=frequency=440:duration={duration}",
"-ar", "22050",
"-y", out_path
], capture_output=True, check=True, timeout=30)
return out_path
def _merge_audio_video(self, video_path, audio_path, out_path):
"""Merge audio and video with better error handling"""
if not FFMPEG_AVAILABLE:
raise RuntimeError("ffmpeg-python not available")
try:
logger.info(f"Merging video {video_path} with audio {audio_path}")
video_in = ffmpeg.input(video_path)
audio_in = ffmpeg.input(audio_path)
(
ffmpeg
.output(video_in, audio_in, out_path,
vcodec='libx264',
acodec='aac',
audio_bitrate='128k',
shortest=None,
**{'b:v': '2M'})
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
logger.info(f"Successfully merged video and audio into {out_path}")
except ffmpeg.Error as e:
logger.error(f"FFmpeg error: {e.stderr.decode() if e.stderr else str(e)}")
raise
except Exception as e:
logger.error(f"Failed to merge audio and video: {e}")
raise
return out_path
# ---------------- Enhancement Modules ----------------
def enhance_video(self, input_path, scale=2, smooth=True):
"""
Uses Real-ESRGAN and RIFE (local binaries) for upscale and motion smoothing.
Requires realesrgan-ncnn-vulkan and rife-ncnn-vulkan in workdir.
"""
realesrgan_bin = str(self.workdir / "realesrgan" / "realesrgan-ncnn-vulkan")
rife_bin = str(self.workdir / "rife" / "rife-ncnn-vulkan")
# Add .exe extension on Windows
if os.name == "nt":
realesrgan_bin += ".exe"
rife_bin += ".exe"
input_p = Path(input_path)
upscaled = str(input_p.with_name(input_p.stem + "_upscaled" + input_p.suffix))
smoothed = str(input_p.with_name(input_p.stem + "_smoothed" + input_p.suffix))
if os.path.exists(realesrgan_bin):
try:
result = subprocess.run(
[realesrgan_bin, "-i", input_path, "-o", upscaled, "-s", str(scale)],
capture_output=True, timeout=300
)
if result.returncode != 0:
logger.warning(f"ESRGAN failed: {result.stderr.decode(errors='ignore')}")
upscaled = input_path
except Exception as e:
logger.warning(f"ESRGAN enhancement failed: {e}")
upscaled = input_path
else:
logger.info("ESRGAN binary not found, skipping upscaling")
upscaled = input_path
if smooth and os.path.exists(rife_bin):
try:
result = subprocess.run(
[rife_bin, "-i", upscaled, "-o", smoothed],
capture_output=True, timeout=300
)
if result.returncode != 0:
logger.warning(f"RIFE failed: {result.stderr.decode(errors='ignore')}")
return upscaled
return smoothed
except Exception as e:
logger.warning(f"RIFE smoothing failed: {e}")
return upscaled
else:
logger.info("RIFE binary not found or smoothing disabled")
return upscaled
# ---------------- Path -> URL mapping ----------------
def _fs_path_to_public_url(self, fs_path: Optional[str]) -> Optional[str]:
"""
Convert an absolute filesystem path under self.workdir into a public URL
using self.public_base_url. If the path is not under workdir, returns None.
"""
if not fs_path:
return None
try:
p = Path(fs_path).resolve()
rel = p.relative_to(self.workdir)
# Use POSIX-style path for URL
return f"{self.public_base_url}/{rel.as_posix()}"
except Exception:
# If the file is not inside the workdir, we can't map it safely.
return None
# ---------------- Core Generator ----------------
def generate(
self,
prompt: str,
image_path: str = None,
audio_path: str = None,
output_name: str = None,
num_frames: int = 16,
fps: int = 8,
enhance=True,
duration_minutes: float = None,
):
"""Generate video with support for longer durations up to 10 minutes"""
if duration_minutes:
duration_minutes = min(duration_minutes, 10)
num_frames = int(duration_minutes * 60 * fps)
logger.info(f"Generating {duration_minutes} minute video with {num_frames} frames at {fps} fps")
try:
if not output_name:
output_name = f"video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4"
output_name_safe = Path(output_name).name
output_path = str(self.output_dir / output_name_safe)
frames = self._generate_frames_from_text(prompt, num_frames)
raw_video = self._combine_frames_to_video(frames, out_path=output_path, fps=fps)
if not audio_path:
audio_out = str(self.audio_dir / f"{Path(output_name_safe).stem}.wav")
self._synthesize_audio(prompt, audio_out)
audio_path = audio_out
final_out = str(self.output_dir / f"final_{Path(output_name_safe).name}")
self._merge_audio_video(raw_video, audio_path, final_out)
enhanced_path = None
if enhance:
try:
enhanced_path = self.enhance_video(final_out)
except Exception as e:
logger.warning(f"Enhancement step failed: {e}")
enhanced_path = None
# Save history with filesystem paths
self._save_history(prompt, image_path, audio_path, final_out, enhanced_path)
# Convert filesystem paths to public URLs when possible
video_url = self._fs_path_to_public_url(final_out) or ""
enhanced_url = self._fs_path_to_public_url(enhanced_path) or ""
audio_url = self._fs_path_to_public_url(audio_path) or ""
return {
"video": final_out or "",
"video_url": video_url,
"enhanced": enhanced_path or "",
"enhanced_url": enhanced_url,
"audio": audio_path or "",
"audio_url": audio_url,
"frames": len(frames),
"status": "success"
}
except Exception as e:
logger.error(f"Video generation failed: {e}")
# ✅ Always return valid keys even when failing
return {
"video": "",
"video_url": "",
"enhanced": "",
"enhanced_url": "",
"audio": "",
"audio_url": "",
"frames": 0,
"status": "error",
"error": str(e)
}
# ---------------- Utility ----------------
def get_history(self, limit=20) -> List[Dict[str, Any]]:
"""Get video generation history"""
try:
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute("SELECT * FROM history ORDER BY id DESC LIMIT ?", (limit,))
rows = c.fetchall()
conn.close()
# Convert to dict format
history = []
for row in rows:
history.append({
"id": row[0],
"prompt": row[1],
"image_path": row[2],
"audio_path": row[3],
"output_path": row[4],
"enhanced_path": row[5],
"created_at": row[6]
})
return history
except Exception as e:
logger.error(f"Failed to get history: {e}")
return []
def get_status(self) -> Dict[str, Any]:
"""Get generator status and capabilities"""
return {
"torch_available": TORCH_AVAILABLE,
"pil_available": PIL_AVAILABLE,
"ffmpeg_available": FFMPEG_AVAILABLE,
"tts_available": TTS_AVAILABLE,
"device": self.device,
"workdir": str(self.workdir),
"output_dir": str(self.output_dir),
"public_base_url": self.public_base_url
}
if __name__ == "__main__":
# Basic demo run
vg = VideoGenerator()
result = vg.generate("A bright futuristic city skyline with flying cars and neon lights.")
print("Generated:", result)#!/usr/bin/env python3
import os
import tempfile
import subprocess
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any, List
import logging
import wave
import shutil
logger = logging.getLogger("videogenerator")
try:
import torch
TORCH_AVAILABLE = True
except Exception:
TORCH_AVAILABLE = False
torch = None
try:
from PIL import Image
PIL_AVAILABLE = True
except Exception:
PIL_AVAILABLE = False
try:
import ffmpeg
FFMPEG_AVAILABLE = True
except Exception:
FFMPEG_AVAILABLE = False
try:
from TTS.api import TTS
TTS_AVAILABLE = True
except Exception:
TTS_AVAILABLE = False
class VideoGenerator:
"""
Offline text-to-video generator with local enhancement (ESRGAN + RIFE).
No external API or internet required.
IMPORTANT: If you want the returned URLs to be reachable by your frontend,
configure your web server or app to serve the `workdir` directory under the
public URL prefix defined by the environment variable VIDEO_PUBLIC_URL
(default: /static/video_sandbox). Example (FastAPI):
app.mount("/static/video_sandbox", StaticFiles(directory="/tmp/video_sandbox"), name="videos")
"""
def __init__(self, workdir="video_sandbox", db_name="history.db"):
# Use a writable directory in environment or /tmp
base_dir = Path(os.getenv("VIDEO_SANDBOX_DIR", "/tmp/video_sandbox"))
self.workdir = base_dir.resolve()
self.output_dir = self.workdir / "output"
self.frames_dir = self.workdir / "frames"
self.audio_dir = self.workdir / "audio"
self.db_path = self.workdir / db_name
# Public base URL mapping (what frontend will use). No trailing slash.
self.public_base_url = os.getenv("VIDEO_PUBLIC_URL", "/static/video_sandbox").rstrip("/")
# Create directories safely
for d in [self.workdir, self.output_dir, self.frames_dir, self.audio_dir]:
d.mkdir(parents=True, exist_ok=True)
self._init_db()
self.device = "cuda" if (TORCH_AVAILABLE and torch and torch.cuda.is_available()) else "cpu"
logger.info(f"VideoGenerator initialized with device: {self.device}, public_base_url: {self.public_base_url}")
# ---------------- Database ----------------
def _init_db(self):
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute(
"""CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
prompt TEXT,
image_path TEXT,
audio_path TEXT,
output_path TEXT,
enhanced_path TEXT,
created_at TEXT
)"""
)
conn.commit()
conn.close()
def _save_history(self, prompt, image_path, audio_path, output_path, enhanced_path=None):
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute(
"INSERT INTO history (prompt, image_path, audio_path, output_path, enhanced_path, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(prompt, image_path, audio_path, output_path, enhanced_path, datetime.now().isoformat()),
)
conn.commit()
conn.close()
# ---------------- Frame + Audio Generation ----------------
def _generate_frames_from_text(self, prompt, num_frames=16, resolution=(512, 512)):
"""
Generate creative video frames with realistic scenes based on prompt.
Simulates beings (humans, animals) and objects with motion and detail.
"""
frames = []
from PIL import ImageDraw, ImageFont
import math
import random
# Parse prompt for content type
prompt_lower = prompt.lower()
# Detect subjects
has_human = any(word in prompt_lower for word in ["person", "man", "woman", "human", "people", "walking", "running"])
has_animal = any(word in prompt_lower for word in ["dog", "cat", "bird", "animal", "flying", "swimming"])
has_nature = any(word in prompt_lower for word in ["tree", "forest", "mountain", "sky", "cloud", "sunset", "sunrise"])
has_city = any(word in prompt_lower for word in ["city", "building", "car", "street", "urban", "skyline"])
# Scene setup
scene_type = "abstract"
if has_human:
scene_type = "human"
elif has_animal:
scene_type = "animal"
elif has_nature:
scene_type = "nature"
elif has_city:
scene_type = "city"
for i in range(num_frames):
img = Image.new("RGB", resolution, (0, 0, 0))
draw = ImageDraw.Draw(img)
progress = i / max(1, num_frames - 1)
# Dynamic background based on scene
if scene_type == "nature":
# Sky gradient
for y in range(resolution[1]):
sky_progress = y / resolution[1]
r = int(135 + 50 * sky_progress)
g = int(206 - 50 * sky_progress)
b = int(235 - 30 * sky_progress)
draw.line([(0, y), (resolution[0], y)], fill=(r, g, b))
# Ground
ground_y = int(resolution[1] * 0.7)
draw.rectangle([0, ground_y, resolution[0], resolution[1]], fill=(34, 139, 34))
# Trees
for tree_x in range(50, resolution[0], 100):
trunk_x = tree_x + int(20 * math.sin(progress * 2 * math.pi))
draw.rectangle([trunk_x, ground_y-60, trunk_x+20, ground_y], fill=(101, 67, 33))
draw.ellipse([trunk_x-30, ground_y-100, trunk_x+50, ground_y-40], fill=(0, 128, 0))
# Clouds
for cloud_x in range(100, resolution[0], 150):
cloud_offset = int(progress * 50)
cx = (cloud_x + cloud_offset) % resolution[0]
cy = 80 + int(10 * math.sin(progress * math.pi))
draw.ellipse([cx-40, cy-20, cx+40, cy+20], fill=(255, 255, 255))
draw.ellipse([cx-20, cy-15, cx+60, cy+25], fill=(255, 255, 255))
elif scene_type == "city":
# Sky
for y in range(resolution[1] // 2):
sky_val = int(100 + 100 * (y / (resolution[1] // 2)))
draw.line([(0, y), (resolution[0], y)], fill=(sky_val, sky_val, sky_val + 50))
# Buildings
for bldg_x in range(0, resolution[0], 80):
height = random.randint(150, 300)
y_start = resolution[1] - height
draw.rectangle([bldg_x, y_start, bldg_x+70, resolution[1]],
fill=(random.randint(100, 150), random.randint(100, 150), random.randint(100, 150)))
for win_y in range(y_start + 20, resolution[1], 30):
for win_x in range(bldg_x + 10, bldg_x + 60, 20):
light = random.choice([True, False])
color = (255, 255, 200) if light else (50, 50, 50)
draw.rectangle([win_x, win_y, win_x+10, win_y+15], fill=color)
# Moving car
car_x = int(progress * resolution[0])
car_y = resolution[1] - 40
draw.rectangle([car_x, car_y, car_x+60, car_y+25], fill=(255, 0, 0))
draw.ellipse([car_x+10, car_y+20, car_x+25, car_y+35], fill=(0, 0, 0))
draw.ellipse([car_x+45, car_y+20, car_x+60, car_y+35], fill=(0, 0, 0))
elif scene_type == "human":
draw.rectangle([0, 0, resolution[0], resolution[1]], fill=(200, 220, 255))
ground_y = int(resolution[1] * 0.75)
draw.rectangle([0, ground_y, resolution[0], resolution[1]], fill=(150, 150, 150))
person_x = int(100 + progress * (resolution[0] - 200))
person_y = ground_y - 100
leg_offset = int(20 * math.sin(progress * 10))
draw.ellipse([person_x+15, person_y, person_x+45, person_y+30], fill=(255, 220, 177))
draw.rectangle([person_x+20, person_y+30, person_x+40, person_y+70], fill=(0, 0, 255))
draw.line([(person_x+20, person_y+40), (person_x+5, person_y+60)], fill=(255, 220, 177), width=5)
draw.line([(person_x+40, person_y+40), (person_x+55, person_y+60)], fill=(255, 220, 177), width=5)
draw.line([(person_x+25, person_y+70), (person_x+20+leg_offset, ground_y)], fill=(0, 0, 139), width=5)
draw.line([(person_x+35, person_y+70), (person_x+40-leg_offset, ground_y)], fill=(0, 0, 139), width=5)
elif scene_type == "animal":
for y in range(resolution[1]):
val = int(180 + 50 * (y / resolution[1]))
draw.line([(0, y), (resolution[0], y)], fill=(val, val-20, val-40))
ground_y = int(resolution[1] * 0.8)
draw.rectangle([0, ground_y, resolution[0], resolution[1]], fill=(139, 90, 43))
animal_x = int(50 + progress * (resolution[0] - 150))
animal_y = ground_y - 60
draw.ellipse([animal_x, animal_y, animal_x+80, animal_y+40], fill=(139, 69, 19))
draw.ellipse([animal_x+60, animal_y-20, animal_x+100, animal_y+20], fill=(139, 69, 19))
draw.polygon([(animal_x+65, animal_y-20), (animal_x+70, animal_y-35), (animal_x+75, animal_y-20)], fill=(101, 67, 33))
draw.polygon([(animal_x+85, animal_y-20), (animal_x+90, animal_y-35), (animal_x+95, animal_y-20)], fill=(101, 67, 33))
leg_anim = int(5 * math.sin(progress * 15))
for leg_x in [animal_x+10, animal_x+30, animal_x+50, animal_x+70]:
draw.rectangle([leg_x, animal_y+40, leg_x+8, ground_y+leg_anim], fill=(101, 67, 33))
tail_angle = 20 * math.sin(progress * 10)
tail_end_x = animal_x - 20 + int(tail_angle)
tail_end_y = animal_y + 10
draw.line([(animal_x, animal_y+20), (tail_end_x, tail_end_y)], fill=(101, 67, 33), width=5)
else:
for y in range(resolution[1]):
color_val = int(y / resolution[1] * 255)
r = int(50 + 100 * progress + color_val // 3)
g = int(100 + 80 * progress + color_val // 3)
b = int(150 + 50 * progress + color_val // 3)
draw.line([(0, y), (resolution[0], y)], fill=(min(255, r), min(255, g), min(255, b)))
for j in range(5):
x = int((j * 100 + progress * 200) % resolution[0])
y = int(resolution[1] // 2 + 50 * math.sin(progress * 2 * math.pi + j))
radius = 20 + int(10 * math.sin(progress * math.pi + j))
color = (
int(255 * abs(math.sin(progress * math.pi + j))),
int(255 * abs(math.cos(progress * math.pi + j))),
int(255 * abs(math.sin(progress * 2 * math.pi + j)))
)
draw.ellipse([x-radius, y-radius, x+radius, y+radius], fill=color)
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 32)
except:
font = ImageFont.load_default()
text = prompt[:40] if len(prompt) <= 40 else prompt[:37] + "..."
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
x = (resolution[0] - text_width) // 2
y = 20
for offset in [(-1,-1), (-1,1), (1,-1), (1,1)]:
draw.text((x+offset[0], y+offset[1]), text, font=font, fill=(0, 0, 0))
draw.text((x, y), text, font=font, fill=(255, 255, 255))
frames.append(img)
return frames
def _combine_frames_to_video(self, frames, out_path, fps=8):
if not FFMPEG_AVAILABLE:
raise RuntimeError("ffmpeg-python not available")
# ensure a string dir is passed to mkdtemp
tmp_dir = tempfile.mkdtemp(dir=str(self.frames_dir))
try:
for i, frame in enumerate(frames):
frame_path = os.path.join(tmp_dir, f"frame_{i:03d}.png")
frame.save(frame_path)
(
ffmpeg
.input(os.path.join(tmp_dir, "frame_%03d.png"), framerate=fps)
.output(out_path, vcodec='libx264', pix_fmt='yuv420p')
.overwrite_output()
.run(quiet=True, capture_stdout=True, capture_stderr=True)
)
except Exception as e:
logger.error(f"Failed to combine frames: {e}")
raise
finally:
# Cleanup temp directory
try:
shutil.rmtree(tmp_dir, ignore_errors=True)
except Exception:
pass
return out_path
def _synthesize_audio(self, text, out_path):
"""Synthesize audio with better quality and error handling"""
if not TTS_AVAILABLE:
logger.warning("TTS not available, creating audio with beeps")
duration = max(2.0, len(text.split()) * 0.5)
# Try ffmpeg with tone generation
if shutil.which("ffmpeg"):
try:
subprocess.run([
"ffmpeg", "-f", "lavfi",
"-i", f"sine=frequency=440:duration={duration}",
"-ar", "22050",
"-y", out_path
], capture_output=True, check=True, timeout=30)
logger.info(f"Created tone audio at {out_path}")
return out_path
except Exception as e:
logger.warning(f"ffmpeg tone generation failed: {e}")
# Fallback to silent WAV
try:
sample_rate = 22050
n_channels = 1
sampwidth = 2
n_frames = int(duration * sample_rate)
with wave.open(out_path, "wb") as wf:
wf.setnchannels(n_channels)
wf.setsampwidth(sampwidth)
wf.setframerate(sample_rate)
wf.writeframes(b'\x00\x00' * n_frames)
logger.info(f"Created silent WAV at {out_path}")
return out_path
except Exception as e:
logger.error(f"Failed to create audio: {e}")
raise
try:
# Use TTS with better error handling
logger.info("Synthesizing audio with TTS...")
tts = TTS(model_name="tts_models/en/ljspeech/tacotron2-DDC", progress_bar=False)
tts.tts_to_file(text=text, file_path=out_path)
logger.info(f"TTS synthesis successful: {out_path}")
except Exception as e:
logger.error(f"TTS synthesis failed: {e}, falling back to tone")
# Fallback to tone
if shutil.which("ffmpeg"):
duration = max(2.0, len(text.split()) * 0.5)
subprocess.run([
"ffmpeg", "-f", "lavfi",
"-i", f"sine=frequency=440:duration={duration}",
"-ar", "22050",
"-y", out_path
], capture_output=True, check=True, timeout=30)
return out_path
def _merge_audio_video(self, video_path, audio_path, out_path):
"""Merge audio and video with better error handling"""
if not FFMPEG_AVAILABLE:
raise RuntimeError("ffmpeg-python not available")
try:
logger.info(f"Merging video {video_path} with audio {audio_path}")
video_in = ffmpeg.input(video_path)
audio_in = ffmpeg.input(audio_path)
(
ffmpeg
.output(video_in, audio_in, out_path,
vcodec='libx264',
acodec='aac',
audio_bitrate='128k',
shortest=None,
**{'b:v': '2M'})
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
logger.info(f"Successfully merged video and audio into {out_path}")
except ffmpeg.Error as e:
logger.error(f"FFmpeg error: {e.stderr.decode() if e.stderr else str(e)}")
raise
except Exception as e:
logger.error(f"Failed to merge audio and video: {e}")
raise
return out_path
# ---------------- Enhancement Modules ----------------
def enhance_video(self, input_path, scale=2, smooth=True):
"""
Uses Real-ESRGAN and RIFE (local binaries) for upscale and motion smoothing.
Requires realesrgan-ncnn-vulkan and rife-ncnn-vulkan in workdir.
"""
realesrgan_bin = str(self.workdir / "realesrgan" / "realesrgan-ncnn-vulkan")
rife_bin = str(self.workdir / "rife" / "rife-ncnn-vulkan")
# Add .exe extension on Windows
if os.name == "nt":
realesrgan_bin += ".exe"
rife_bin += ".exe"
input_p = Path(input_path)
upscaled = str(input_p.with_name(input_p.stem + "_upscaled" + input_p.suffix))
smoothed = str(input_p.with_name(input_p.stem + "_smoothed" + input_p.suffix))
if os.path.exists(realesrgan_bin):
try:
result = subprocess.run(
[realesrgan_bin, "-i", input_path, "-o", upscaled, "-s", str(scale)],
capture_output=True, timeout=300
)
if result.returncode != 0:
logger.warning(f"ESRGAN failed: {result.stderr.decode(errors='ignore')}")
upscaled = input_path
except Exception as e:
logger.warning(f"ESRGAN enhancement failed: {e}")
upscaled = input_path
else:
logger.info("ESRGAN binary not found, skipping upscaling")
upscaled = input_path
if smooth and os.path.exists(rife_bin):
try:
result = subprocess.run(
[rife_bin, "-i", upscaled, "-o", smoothed],
capture_output=True, timeout=300
)
if result.returncode != 0:
logger.warning(f"RIFE failed: {result.stderr.decode(errors='ignore')}")
return upscaled
return smoothed
except Exception as e:
logger.warning(f"RIFE smoothing failed: {e}")
return upscaled
else:
logger.info("RIFE binary not found or smoothing disabled")
return upscaled
# ---------------- Path -> URL mapping ----------------
def _fs_path_to_public_url(self, fs_path: Optional[str]) -> Optional[str]:
"""
Convert an absolute filesystem path under self.workdir into a public URL
using self.public_base_url. If the path is not under workdir, returns None.
"""
if not fs_path:
return None
try:
p = Path(fs_path).resolve()
rel = p.relative_to(self.workdir)
# Use POSIX-style path for URL
return f"{self.public_base_url}/{rel.as_posix()}"
except Exception:
# If the file is not inside the workdir, we can't map it safely.
return None
# ---------------- Core Generator ----------------
def generate(
self,
prompt: str,
image_path: str = None,
audio_path: str = None,
output_name: str = None,
num_frames: int = 16,
fps: int = 8,
enhance=True,
duration_minutes: float = None,
):
"""Generate video with support for longer durations up to 10 minutes"""
if duration_minutes:
duration_minutes = min(duration_minutes, 10)
num_frames = int(duration_minutes * 60 * fps)
logger.info(f"Generating {duration_minutes} minute video with {num_frames} frames at {fps} fps")
try:
if not output_name:
output_name = f"video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4"
output_name_safe = Path(output_name).name
output_path = str(self.output_dir / output_name_safe)
frames = self._generate_frames_from_text(prompt, num_frames)
raw_video = self._combine_frames_to_video(frames, out_path=output_path, fps=fps)
if not audio_path:
audio_out = str(self.audio_dir / f"{Path(output_name_safe).stem}.wav")
self._synthesize_audio(prompt, audio_out)
audio_path = audio_out
final_out = str(self.output_dir / f"final_{Path(output_name_safe).name}")
self._merge_audio_video(raw_video, audio_path, final_out)
enhanced_path = None
if enhance:
try:
enhanced_path = self.enhance_video(final_out)
except Exception as e:
logger.warning(f"Enhancement step failed: {e}")
enhanced_path = None
# Save history with filesystem paths
self._save_history(prompt, image_path, audio_path, final_out, enhanced_path)
# Convert filesystem paths to public URLs when possible
video_url = self._fs_path_to_public_url(final_out) or ""
enhanced_url = self._fs_path_to_public_url(enhanced_path) or ""
audio_url = self._fs_path_to_public_url(audio_path) or ""
return {
"video": final_out or "",
"video_url": video_url,
"enhanced": enhanced_path or "",
"enhanced_url": enhanced_url,
"audio": audio_path or "",
"audio_url": audio_url,
"frames": len(frames),
"status": "success"
}
except Exception as e:
logger.error(f"Video generation failed: {e}")
# ✅ Always return valid keys even when failing
return {
"video": "",
"video_url": "",
"enhanced": "",
"enhanced_url": "",
"audio": "",
"audio_url": "",
"frames": 0,
"status": "error",
"error": str(e)
}
# ---------------- Utility ----------------
def get_history(self, limit=20) -> List[Dict[str, Any]]:
"""Get video generation history"""
try:
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute("SELECT * FROM history ORDER BY id DESC LIMIT ?", (limit,))
rows = c.fetchall()
conn.close()
# Convert to dict format
history = []
for row in rows:
history.append({
"id": row[0],
"prompt": row[1],
"image_path": row[2],
"audio_path": row[3],
"output_path": row[4],
"enhanced_path": row[5],
"created_at": row[6]
})
return history
except Exception as e:
logger.error(f"Failed to get history: {e}")
return []
def get_status(self) -> Dict[str, Any]:
"""Get generator status and capabilities"""
return {
"torch_available": TORCH_AVAILABLE,
"pil_available": PIL_AVAILABLE,
"ffmpeg_available": FFMPEG_AVAILABLE,
"tts_available": TTS_AVAILABLE,
"device": self.device,
"workdir": str(self.workdir),
"output_dir": str(self.output_dir),
"public_base_url": self.public_base_url
}
if __name__ == "__main__":
# Basic demo run
vg = VideoGenerator()
result = vg.generate("A bright futuristic city skyline with flying cars and neon lights.")
print("Generated:", result)