creatorplus / app.py
nitubhai's picture
Upload 13 files
0a512d5 verified
from flask import Flask, render_template, request, jsonify, send_file, redirect, url_for, session
import os
import uuid
import threading
from datetime import datetime
from typing import Dict, Any, Optional
import logging
from dotenv import load_dotenv
import secrets
# Load environment variables from .env file
load_dotenv()
# βœ… FIX: Allow insecure transport for local OAuth (development only)
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
# Import our modules
from downloader import download_reel_with_audio # Ensure downloader.py defines this function
from uploader import upload_to_youtube, check_authentication, authenticate_youtube, get_youtube_service, get_channel_info, logout_youtube
from ai_genrator import AIMetadataGenerator # Fixed import to use the correct class
from video_editor import VideoEditor # Import video editor
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# Configuration
DOWNLOAD_FOLDER = 'downloads'
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')
# βœ… NEW: Store user tokens in a separate folder
USER_TOKENS_FOLDER = 'user_tokens'
os.makedirs(USER_TOKENS_FOLDER, exist_ok=True)
# Ensure download folder exists
os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
# Task storage (in production, use Redis or database)
tasks = {}
class TaskStatus:
def __init__(self, task_id: str):
self.task_id = task_id
self.status = 'started'
self.progress = 0
self.message = 'Task started'
self.error = None
self.result = None
self.metadata = None
self.youtube_url = None
self.created_at = datetime.now()
def update_task_status(task_id: str, status: str, message: str = '', progress: int = 0, **kwargs):
"""Update task status"""
if task_id in tasks:
task = tasks[task_id]
task.status = status
task.message = message
task.progress = progress
# Update additional fields
for key, value in kwargs.items():
setattr(task, key, value)
logger.info(f"Task {task_id}: {status} - {message}")
# βœ… NEW: Helper function to get user-specific token path
def get_user_token_path():
"""Get token path for current user session"""
if 'user_id' not in session:
session['user_id'] = str(uuid.uuid4())
user_id = session['user_id']
return os.path.join(USER_TOKENS_FOLDER, f'token_{user_id}.json')
def background_upload_task(task_id: str, reel_url: str, editing_options: Optional[dict] = None, token_path: str = 'token.json'):
"""Background task for downloading, editing, and uploading"""
video_path = None
edited_video_path = None
uploaded_music_path = None # βœ… NEW: Track uploaded music for cleanup
try:
update_task_status(task_id, 'downloading', 'Downloading reel from Instagram...', 10)
# Get sessionid from environment
sessionid = os.getenv("IG_SESSIONID")
if not sessionid:
raise Exception("IG_SESSIONID not configured in .env file")
# Validate sessionid format (basic check)
if len(sessionid) < 20 or not sessionid.replace('%', '').replace(':', '').isalnum():
raise Exception("IG_SESSIONID appears to be invalid. Please update it in the .env file.")
# Download the reel with sessionid
try:
video_path = download_reel_with_audio(reel_url, DOWNLOAD_FOLDER, sessionid)
except Exception as download_error:
error_msg = str(download_error)
# Check for Instagram authentication/permission errors
if "403" in error_msg or "Forbidden" in error_msg or "metadata failed" in error_msg.lower():
raise Exception(
"Instagram session expired or invalid. Please update your IG_SESSIONID in the .env file"
)
elif "Login required" in error_msg or "Not logged in" in error_msg:
raise Exception(
"Instagram login required. Your session has expired. Please update IG_SESSIONID in .env file."
)
else:
raise Exception(f"Failed to download reel: {error_msg}")
if not video_path or not os.path.exists(video_path):
raise Exception("Failed to download video file")
logger.info(f"βœ… Video downloaded: {video_path}")
# Determine final video path
final_video_path = video_path
# βœ… Video Editing Step (FIXED to support local files)
if editing_options and editing_options.get('enabled'):
try:
update_task_status(task_id, 'editing', 'Editing video (adding music & text overlays)...', 30)
logger.info(f"🎬 Starting video editing with options: {editing_options}")
# Initialize video editor
editor = VideoEditor()
# Generate edited video filename
base_name = os.path.splitext(os.path.basename(video_path))[0]
edited_video_path = os.path.join(DOWNLOAD_FOLDER, f"{base_name}_edited.mp4")
# βœ… NEW: Determine music source (URL or local file)
music_source = editing_options.get('music_url') or editing_options.get('music_file')
uploaded_music_path = editing_options.get('music_file') # Track for cleanup
# Apply edits
editor.edit_video(
video_path=video_path,
output_path=edited_video_path,
music_url=music_source, # Can be either YouTube URL or local file path
music_volume=editing_options.get('music_volume', 0.3),
text_overlays=editing_options.get('text_overlays')
)
logger.info(f"βœ… Video editing completed: {edited_video_path}")
# Use edited video for upload
final_video_path = edited_video_path
except Exception as edit_error:
error_msg = str(edit_error)
logger.error(f"❌ Video editing failed: {error_msg}")
# If editing fails, use original video
logger.warning("⚠️ Using original video due to editing failure")
final_video_path = video_path
# Update status to show editing was skipped
update_task_status(
task_id,
'generating_metadata',
f'Skipped editing (error: {error_msg[:50]}...). Using original video...',
50
)
update_task_status(task_id, 'generating_metadata', 'AI analyzing video content and generating metadata...', 60)
# Generate metadata using AI with actual video analysis
try:
# Create AI Metadata Generator instance
ai_generator = AIMetadataGenerator(GEMINI_API_KEY)
# Generate metadata based on actual video content
generated_metadata = ai_generator.generate_complete_metadata(
video_path=final_video_path,
target_audience="social media users"
)
# Extract needed fields for YouTube upload
metadata = {
'title': generated_metadata['title'],
'description': generated_metadata['description'],
'tags': generated_metadata['tags'],
'keywords': generated_metadata['keywords'],
'hashtags': generated_metadata['hashtags'],
'video_analysis': generated_metadata.get('video_analysis', 'Content analysis unavailable')
}
logger.info(f"βœ… AI metadata generated successfully")
logger.info(f"πŸ“ Title: {metadata['title']}")
except Exception as e:
logger.warning(f"AI metadata generation failed: {str(e)}. Using fallback metadata.")
# Fallback metadata
filename = os.path.basename(final_video_path)
metadata = {
'title': f'Amazing Social Media Content - {filename}',
'description': f'Check out this amazing content!\n\nOriginal source: {reel_url}\n\n#SocialMedia #Viral #Content #Entertainment',
'tags': ['social media', 'viral', 'entertainment', 'content', 'video'],
'keywords': ['social media video', 'viral content', 'entertainment'],
'hashtags': ['#SocialMedia', '#Viral', '#Content']
}
update_task_status(task_id, 'uploading', 'Uploading to YouTube...', 85, metadata=metadata)
# βœ… FIX: Pass token_path to upload_to_youtube
try:
video_id = upload_to_youtube(
video_path=final_video_path,
title=metadata['title'],
description=metadata['description'],
tags=metadata['tags'],
privacy_status="public",
token_path=token_path # βœ… FIXED: Now passing token_path
)
youtube_url = f"https://www.youtube.com/watch?v={video_id}"
update_task_status(
task_id,
'completed',
'Upload completed successfully!',
100,
result={'video_id': video_id},
youtube_url=youtube_url,
metadata=metadata
)
except Exception as upload_error:
raise Exception(f"YouTube upload failed: {str(upload_error)}")
except Exception as e:
logger.error(f"Task {task_id} failed: {str(e)}")
update_task_status(task_id, 'failed', str(e), error=str(e))
finally:
# Clean up downloaded files after successful upload or failure
for path in [video_path, edited_video_path, uploaded_music_path]:
if path and os.path.exists(path):
try:
os.remove(path)
logger.info(f"🧹 Cleaned up temporary file: {path}")
except Exception as cleanup_error:
logger.warning(f"⚠️ Could not clean up file {path}: {cleanup_error}")
@app.route('/')
def index():
"""Render the main page"""
return render_template('index.html')
@app.route('/downloader')
def downloader_page():
"""Render the downloader page"""
return render_template('downloader.html')
@app.route('/metadata-generator')
def metadata_generator_page():
"""Render the metadata generator page"""
return render_template('metadata_generator.html')
@app.route('/uploader')
def uploader_page():
"""Render the uploader page"""
is_authenticated = check_authentication()
return render_template('uploader.html', authenticated=is_authenticated)
@app.route('/check-auth')
def check_auth():
"""Check YouTube authentication status for current user"""
try:
# βœ… Pass user-specific token path
token_path = get_user_token_path()
is_authenticated = check_authentication(token_path)
channel_info = None
if is_authenticated:
channel_info = get_channel_info(token_path)
return jsonify({
'authenticated': is_authenticated,
'channel': channel_info
})
except Exception as e:
logger.error(f"Error checking authentication: {str(e)}")
return jsonify({'authenticated': False, 'error': str(e)})
@app.route('/authenticate', methods=['POST'])
def authenticate():
"""Authenticate with YouTube for current user"""
try:
token_path = get_user_token_path()
credentials = authenticate_youtube(token_path)
if credentials:
return jsonify({'success': True, 'message': 'Authentication successful'})
else:
return jsonify({'success': False, 'error': 'Authentication failed'})
except Exception as e:
logger.error(f"Authentication error: {str(e)}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/auth/start')
def start_auth():
"""Start OAuth flow for current user"""
try:
import google_auth_oauthlib.flow
scopes = ["https://www.googleapis.com/auth/youtube.upload"]
client_secrets_file = "client_secret.json"
if not os.path.exists(client_secrets_file):
return jsonify({'error': 'client_secret.json not found'})
flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file(
client_secrets_file, scopes)
redirect_uri = request.url_root.rstrip('/') + '/auth/callback'
if os.getenv('ENVIRONMENT') != 'production':
if 'localhost' in redirect_uri or '127.0.0.1' in redirect_uri:
redirect_uri = redirect_uri.replace('https://', 'http://')
flow.redirect_uri = redirect_uri
auth_url, state = flow.authorization_url(
prompt='consent',
access_type='offline',
include_granted_scopes='true'
)
# βœ… Store flow in user session (not shared globally)
session['oauth_flow_state'] = state
# Store flow data in a temporary file for this user
flow_data = {
'client_secrets_file': client_secrets_file,
'redirect_uri': redirect_uri,
'scopes': scopes
}
session['oauth_flow_data'] = flow_data
return jsonify({'auth_url': auth_url})
except Exception as e:
logger.error(f"Error starting auth: {str(e)}")
return jsonify({'error': str(e)})
@app.route('/auth/callback')
def auth_callback():
"""Handle OAuth callback for current user"""
try:
# βœ… Reconstruct flow from session data
flow_data = session.get('oauth_flow_data')
state = session.get('oauth_flow_state')
if not flow_data:
return "OAuth flow not found. Please restart the authentication process.", 400
import google_auth_oauthlib.flow
flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file(
flow_data['client_secrets_file'], flow_data['scopes'])
flow.redirect_uri = flow_data['redirect_uri']
auth_code = request.args.get('code')
error = request.args.get('error')
if error:
return f"Authentication failed: {error}", 400
if not auth_code:
return "No authorization code received", 400
try:
flow.fetch_token(code=auth_code)
credentials = flow.credentials
except Exception as token_error:
logger.error(f"Token exchange failed: {str(token_error)}")
return f"Token exchange failed: {str(token_error)}", 500
# βœ… Save credentials to user-specific token file
token_path = get_user_token_path()
with open(token_path, 'w') as token:
token.write(credentials.to_json())
# Clean up session
session.pop('oauth_flow_state', None)
session.pop('oauth_flow_data', None)
return """
<html>
<head>
<style>
body {
font-family: 'Inter', sans-serif;
background: linear-gradient(135deg, #0f0f23 0%, #1a1a3e 100%);
color: white;
display: flex;
justify-content: center;
align-items: center;
height: 100vh;
margin: 0;
}
.container {
text-align: center;
background: rgba(255, 255, 255, 0.05);
backdrop-filter: blur(20px);
border: 1px solid rgba(255, 255, 255, 0.1);
border-radius: 20px;
padding: 50px;
box-shadow: 0 20px 60px rgba(0, 0, 0, 0.5);
}
h2 {
color: #00E676;
font-size: 32px;
margin-bottom: 20px;
}
p {
font-size: 18px;
color: rgba(255, 255, 255, 0.7);
}
.icon {
font-size: 80px;
margin-bottom: 20px;
}
</style>
</head>
<body>
<div class="container">
<div class="icon">βœ…</div>
<h2>Authentication Successful!</h2>
<p>You can now close this tab and return to the application.</p>
</div>
<script>
setTimeout(function() {
window.close();
}, 3000);
</script>
</body>
</html>
"""
except Exception as e:
logger.error(f"Error in auth callback: {str(e)}")
return f"""
<html>
<head>
<style>
body {{
font-family: 'Inter', sans-serif;
background: linear-gradient(135deg, #0f0f23 0%, #1a1a3e 100%);
color: white;
display: flex;
justify-content: center;
align-items: center;
height: 100vh;
margin: 0;
}}
.container {{
text-align: center;
background: rgba(255, 255, 255, 0.05);
backdrop-filter: blur(20px);
border: 1px solid rgba(255, 255, 255, 0.1);
border-radius: 20px;
padding: 50px;
box-shadow: 0 20px 60px rgba(0, 0, 0, 0.5);
}}
h2 {{
color: #FF1744;
font-size: 32px;
margin-bottom: 20px;
}}
p {{
font-size: 16px;
color: rgba(255, 255, 255, 0.7);
}}
.icon {{
font-size: 80px;
margin-bottom: 20px;
}}
</style>
</head>
<body>
<div class="container">
<div class="icon">❌</div>
<h2>Authentication Failed</h2>
<p>{str(e)}</p>
</div>
</body>
</html>
""", 500
@app.route('/download', methods=['POST'])
def download_reel():
"""Download reel only"""
try:
data = request.get_json(silent=True) or {}
reel_url = data.get('url') or request.form.get('url') or request.args.get('url')
if not reel_url:
return jsonify({'success': False, 'error': 'URL is required'})
# Get sessionid from environment
sessionid = os.getenv("IG_SESSIONID")
if not sessionid:
return jsonify({
'success': False,
'error': 'IG_SESSIONID not configured. Please add it to your .env file.'
})
# Validate sessionid format
if len(sessionid) < 20 or not sessionid.replace('%', '').replace(':', '').isalnum():
return jsonify({
'success': False,
'error': 'IG_SESSIONID appears to be invalid. Please update it in the .env file.'
})
# Download the reel
try:
video_path = download_reel_with_audio(reel_url, DOWNLOAD_FOLDER, sessionid)
except Exception as download_error:
error_msg = str(download_error)
# Check for Instagram authentication errors
if "403" in error_msg or "Forbidden" in error_msg or "metadata failed" in error_msg.lower():
return jsonify({
'success': False,
'error': 'Instagram session expired or invalid',
'details': 'Please update your IG_SESSIONID:\n1. Log into Instagram\n2. Copy sessionid cookie from browser\n3. Update .env file\n4. Restart app'
})
elif "Login required" in error_msg or "Not logged in" in error_msg:
return jsonify({
'success': False,
'error': 'Instagram login required. Session expired.',
'details': 'Please update IG_SESSIONID in .env file'
})
else:
return jsonify({'success': False, 'error': f'Download failed: {error_msg}'})
if not video_path or not os.path.exists(video_path):
return jsonify({'success': False, 'error': 'Failed to download video'})
filename = os.path.basename(video_path)
return jsonify({
'success': True,
'message': 'Download completed',
'filename': filename,
'filepath': video_path
})
except Exception as e:
logger.error(f"Download error: {str(e)}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/auto-upload-async', methods=['POST'])
def auto_upload_async():
"""Start async upload process for current user"""
try:
# βœ… Check user-specific authentication
token_path = get_user_token_path()
if not check_authentication(token_path):
return jsonify({'success': False, 'error': 'Not authenticated with YouTube'}), 401
data = request.get_json(silent=True) or {}
reel_url = data.get('url') or request.form.get('url') or request.args.get('url')
editing_options = data.get('editing')
if not reel_url:
return jsonify({'success': False, 'error': 'URL is required'})
task_id = str(uuid.uuid4())
tasks[task_id] = TaskStatus(task_id)
# βœ… Pass user token path to background task
thread = threading.Thread(
target=background_upload_task,
args=(task_id, reel_url, editing_options, token_path)
)
thread.daemon = True
thread.start()
return jsonify({
'success': True,
'task_id': task_id,
'message': 'Upload process started'
})
except Exception as e:
logger.error(f"Auto upload error: {str(e)}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/task-status/<task_id>')
def get_task_status(task_id):
"""Get task status"""
try:
if task_id not in tasks:
return jsonify({'success': False, 'error': 'Task not found'})
task = tasks[task_id]
return jsonify({
'success': True,
'task': {
'id': task.task_id,
'status': task.status,
'message': task.message,
'progress': task.progress,
'error': task.error,
'result': task.result,
'metadata': task.metadata,
'youtube_url': task.youtube_url
}
})
except Exception as e:
logger.error(f"Error getting task status: {str(e)}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/get-video/<filename>')
def get_video(filename):
"""Download video file"""
try:
# Security: Only allow files from downloads folder
safe_filename = os.path.basename(filename)
file_path = os.path.join(DOWNLOAD_FOLDER, safe_filename)
logger.info(f"Serving video request for: {safe_filename}")
logger.info(f"Full path: {file_path}")
logger.info(f"File exists: {os.path.exists(file_path)}")
if not os.path.exists(file_path):
logger.error(f"File not found: {file_path}")
# List available files for debugging
available_files = os.listdir(DOWNLOAD_FOLDER) if os.path.exists(DOWNLOAD_FOLDER) else []
logger.error(f"Available files: {available_files}")
return jsonify({
'error': 'File not found',
'requested': safe_filename,
'available': available_files
}), 404
# Verify it's a video file
if not file_path.lower().endswith(('.mp4', '.mov', '.avi', '.mkv', '.webm')):
logger.error(f"Invalid file type: {file_path}")
return jsonify({'error': 'Invalid file type'}), 400
# Get file size for logging
file_size = os.path.getsize(file_path)
logger.info(f"Serving file: {safe_filename} ({file_size} bytes)")
# Serve the file with proper headers
return send_file(
file_path,
mimetype='video/mp4',
as_attachment=True,
download_name=safe_filename
)
except Exception as e:
logger.error(f"Error serving video: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return jsonify({'error': str(e)}), 500
@app.route('/cleanup/<filename>', methods=['POST'])
def cleanup_file(filename):
"""Clean up downloaded file after user has downloaded it"""
try:
safe_filename = os.path.basename(filename)
file_path = os.path.join(DOWNLOAD_FOLDER, safe_filename)
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"Cleaned up file: {file_path}")
return jsonify({'success': True, 'message': 'File cleaned up'})
else:
return jsonify({'success': False, 'error': 'File not found'})
except Exception as e:
logger.error(f"Error cleaning up file: {str(e)}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/list-downloads')
def list_downloads():
"""List all files in downloads folder (for debugging)"""
try:
files = os.listdir(DOWNLOAD_FOLDER)
files_info = []
for f in files:
filepath = os.path.join(DOWNLOAD_FOLDER, f)
files_info.append({
'name': f,
'size': os.path.getsize(filepath),
'exists': os.path.exists(filepath)
})
return jsonify({'files': files_info})
except Exception as e:
return jsonify({'error': str(e)})
@app.route('/generate-preview', methods=['POST'])
def generate_preview():
"""Generate metadata preview by downloading and analyzing video"""
try:
data = request.get_json(silent=True) or {}
reel_url = data.get('url') or request.form.get('url') or request.args.get('url')
if not reel_url:
return jsonify({'success': False, 'error': 'URL is required'})
# Get sessionid from environment
sessionid = os.getenv("IG_SESSIONID")
if not sessionid:
return jsonify({
'success': False,
'error': 'IG_SESSIONID not configured in .env file'
})
# Validate sessionid format
if len(sessionid) < 20:
return jsonify({
'success': False,
'error': 'IG_SESSIONID appears to be invalid. Please update it in .env file.'
})
# Download video temporarily for analysis
temp_video_path = None
try:
# Download the video for analysis
try:
temp_video_path = download_reel_with_audio(reel_url, DOWNLOAD_FOLDER, sessionid)
except Exception as download_error:
error_msg = str(download_error)
if "403" in error_msg or "Forbidden" in error_msg or "metadata failed" in error_msg.lower():
return jsonify({
'success': False,
'error': 'Instagram session expired',
'details': 'Please update your IG_SESSIONID in the .env file and restart the app'
})
else:
raise download_error
# Create AI Metadata Generator instance
ai_generator = AIMetadataGenerator(GEMINI_API_KEY)
# Generate metadata based on actual video content
generated_metadata = ai_generator.generate_complete_metadata(
video_path=temp_video_path,
target_audience="social media users"
)
return jsonify({
'success': True,
'title': generated_metadata['title'],
'description': generated_metadata['description'],
'tags': generated_metadata['tags'],
'hashtags': generated_metadata['hashtags'],
'video_analysis': generated_metadata.get('video_analysis', 'Analysis unavailable')
})
except Exception as e:
logger.warning(f"AI metadata generation preview failed: {str(e)}")
# Fallback metadata
return jsonify({
'success': True,
'title': 'Amazing Social Media Content',
'description': f'Check out this amazing content from social media!\n\nSource: {reel_url}\n\n#SocialMedia #Viral #Content',
'tags': ['social media', 'viral', 'entertainment', 'content'],
'hashtags': ['#SocialMedia', '#Viral', '#Content', '#Entertainment']
})
finally:
# Clean up temporary file
if temp_video_path and os.path.exists(temp_video_path):
try:
os.remove(temp_video_path)
except Exception:
pass
except Exception as e:
logger.error(f"Preview generation error: {str(e)}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/get-channel-info')
def channel_info():
"""Get information about the connected YouTube channel for current user"""
try:
token_path = get_user_token_path()
if not check_authentication(token_path):
return jsonify({'authenticated': False})
channel_data = get_channel_info(token_path)
if channel_data:
return jsonify({
'authenticated': True,
'channel': channel_data
})
else:
return jsonify({'authenticated': True, 'channel': None})
except Exception as e:
logger.error(f"Error getting channel info: {str(e)}")
return jsonify({'authenticated': False, 'error': str(e)})
@app.route('/logout', methods=['POST'])
def logout():
"""Logout from YouTube for current user"""
try:
token_path = get_user_token_path()
success = logout_youtube(token_path)
# βœ… Clear user session
if success and os.path.exists(token_path):
os.remove(token_path)
session.pop('user_id', None)
return jsonify({'success': success})
except Exception as e:
logger.error(f"Error during logout: {str(e)}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/privacy-policy')
def privacy_policy():
"""Privacy policy page"""
from datetime import datetime
return render_template('privacy_policy.html', current_date=datetime.now().strftime('%B %d, %Y'))
@app.route('/terms')
def terms():
"""Terms of service page"""
return render_template('terms.html')
# βœ… Production configuration with better error handling
if os.getenv('SPACE_ID'): # Running on Hugging Face Spaces
print("🌐 Running on Hugging Face Spaces")
os.environ['ENVIRONMENT'] = 'production'
# Allow insecure transport for Hugging Face internal routing
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
# Hugging Face specific settings
app.config['SESSION_COOKIE_SECURE'] = False # HF uses internal HTTP routing
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
# Session configuration
app.config['SESSION_TYPE'] = 'filesystem'
app.config['SESSION_FILE_DIR'] = '/tmp/flask_session'
app.config['SESSION_PERMANENT'] = False
app.config['SESSION_USE_SIGNER'] = True
app.config['PERMANENT_SESSION_LIFETIME'] = 3600
# Create session directory
os.makedirs('/tmp/flask_session', exist_ok=True)
PORT = 7860 # Hugging Face Spaces port
app.debug = False
print(f"πŸ“‘ Port: {PORT}")
print("πŸ” Secure sessions enabled")
elif os.getenv('ENVIRONMENT') == 'production':
# Disable insecure transport in production
if 'OAUTHLIB_INSECURE_TRANSPORT' in os.environ:
del os.environ['OAUTHLIB_INSECURE_TRANSPORT']
# Set secure session
app.config['SESSION_COOKIE_SECURE'] = True
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
# Session configuration
app.config['SESSION_TYPE'] = 'filesystem'
app.config['SESSION_FILE_DIR'] = '/tmp/flask_session'
app.config['SESSION_PERMANENT'] = False
app.config['SESSION_USE_SIGNER'] = True
app.config['PERMANENT_SESSION_LIFETIME'] = 3600
# Create session directory
os.makedirs('/tmp/flask_session', exist_ok=True)
# βœ… FIX: Use PORT from environment (Render provides this)
# Render sets PORT to 10000 by default
PORT = int(os.getenv('PORT', 5000))
# Disable debug mode
app.debug = False
else:
# Development mode
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
PORT = int(os.getenv('PORT', 5000))
# Session configuration for development
app.config['SESSION_TYPE'] = 'filesystem'
app.config['SESSION_PERMANENT'] = False
app.config['SESSION_USE_SIGNER'] = True
app.config['PERMANENT_SESSION_LIFETIME'] = 3600
# Secret key for sessions - MUST be set for production
app.secret_key = os.getenv('SECRET_KEY', secrets.token_hex(32))
# βœ… Global error handlers
@app.errorhandler(404)
def not_found(error):
if request.path.startswith('/api/'):
return jsonify({'error': 'Endpoint not found'}), 404
return render_template('404.html'), 404
@app.errorhandler(500)
def internal_error(error):
logger.error(f"Internal server error: {str(error)}")
if request.path.startswith('/api/'):
return jsonify({'error': 'Internal server error', 'message': 'Please try again later'}), 500
return render_template('500.html'), 500
@app.errorhandler(Exception)
def handle_exception(e):
logger.error(f"Unhandled exception: {str(e)}")
import traceback
logger.error(traceback.format_exc())
if request.path.startswith('/api/'):
return jsonify({
'error': 'An unexpected error occurred',
'message': str(e) if app.debug else 'Please try again later'
}), 500
return render_template('500.html'), 500
# βœ… Health check with detailed status
@app.route('/health')
def health_check():
"""Health check endpoint for monitoring"""
try:
health_status = {
'status': 'healthy',
'version': '1.0.0',
'environment': os.getenv('ENVIRONMENT', 'development'),
'checks': {
'downloads_folder': os.path.exists(DOWNLOAD_FOLDER),
'user_tokens_folder': os.path.exists(USER_TOKENS_FOLDER),
'gemini_configured': bool(GEMINI_API_KEY and GEMINI_API_KEY != 'your-gemini-api-key-here'),
'ig_session_configured': bool(os.getenv('IG_SESSIONID') and len(os.getenv('IG_SESSIONID', '')) > 20)
}
}
# Check if all critical services are OK
if not all(health_status['checks'].values()):
health_status['status'] = 'degraded'
return jsonify(health_status), 503
return jsonify(health_status), 200
except Exception as e:
logger.error(f"Health check failed: {str(e)}")
return jsonify({
'status': 'unhealthy',
'error': str(e)
}), 503
# βœ… Request logging middleware
@app.before_request
def log_request():
"""Log incoming requests"""
if not request.path.startswith('/static/'):
logger.info(f"{request.method} {request.path} - {request.remote_addr}")
@app.after_request
def log_response(response):
"""Log responses"""
if not request.path.startswith('/static/'):
logger.info(f"{request.method} {request.path} - {response.status_code}")
return response
# βœ… Startup validation
def validate_environment():
"""Validate critical environment variables"""
issues = []
if not os.getenv('SECRET_KEY') and os.getenv('ENVIRONMENT') == 'production':
issues.append("SECRET_KEY not set (will use generated key)")
if not os.getenv('GEMINI_API_KEY'):
issues.append("GEMINI_API_KEY not set - AI features will use fallback")
if not os.getenv('IG_SESSIONID'):
issues.append("IG_SESSIONID not set - Instagram downloads will fail")
if not os.path.exists('client_secret.json'):
issues.append("client_secret.json not found - YouTube authentication will fail")
if issues:
logger.warning("⚠️ Environment validation issues:")
for issue in issues:
logger.warning(f" - {issue}")
else:
logger.info("βœ… Environment validation passed")
if __name__ == "__main__":
print(f"πŸš€ YouTube Automation Machine Starting...")
print(f"πŸ“ Downloads folder: {DOWNLOAD_FOLDER}")
print(f"πŸ“ User tokens folder: {USER_TOKENS_FOLDER}")
print(f"πŸ€– Gemini AI: {'Configured' if GEMINI_API_KEY and GEMINI_API_KEY != 'your-gemini-api-key-here' else 'Not configured (using fallback)'}")
# Validate Instagram session
ig_session = os.getenv("IG_SESSIONID")
if ig_session and len(ig_session) > 20:
print(f"πŸ“Έ Instagram Session: Configured (length: {len(ig_session)})")
else:
print("⚠️ Instagram Session: NOT CONFIGURED or INVALID")
print(" Please add IG_SESSIONID to your environment variables")
# Check environment
environment = os.getenv('ENVIRONMENT', 'development')
if environment == 'production':
print("🌐 Running in PRODUCTION mode (Render)")
print(f"πŸ“‘ Port: {PORT}")
print("πŸ”’ HTTPS enabled")
print("πŸ” Secure sessions enabled")
else:
print("⚠️ Running in DEVELOPMENT mode")
print(f"πŸ“‘ Port: {PORT}")
print()
# Validate environment
validate_environment()
# Run with appropriate settings
debug = environment != 'production'
# βœ… FIX: Bind to 0.0.0.0 and use PORT already configured above
app.run(host='0.0.0.0', port=PORT, debug=debug, threaded=True)