SyedZaid-Bin-Haris's picture
Update app.py
80598b3 verified
import gradio as gr
import os
import time
import json
from datetime import datetime
import random
import string
import boto3
from botocore.exceptions import NoCredentialsError
from dotenv import load_dotenv
# Load environment variables with detailed error handling
def load_environment_variables():
# Try to load from .env file
load_dotenv()
# Check if critical environment variables exist
required_vars = ['AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'AWS_REGION']
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
print(f"WARNING: Missing required environment variables: {', '.join(missing_vars)}")
# Set default values for testing/development (DO NOT USE IN PRODUCTION)
if 'AWS_REGION' in missing_vars and not os.getenv('AWS_REGION'):
os.environ['AWS_REGION'] = 'us-east-1'
print("Set default AWS_REGION to 'us-east-1' for testing purposes")
else:
print("All required environment variables are set")
# Load environment variables at startup
load_environment_variables()
def main():
# Initialize AWS client with more robust error handling
def init_aws_client():
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
aws_region = os.getenv('AWS_REGION', 'us-east-1').strip() # Ensure region is stripped of whitespace
# Print debug info about environment variables (redacted for security)
print(f"AWS_ACCESS_KEY_ID: {'*' * 5 + aws_access_key_id[-4:] if aws_access_key_id else 'Not set'}")
print(f"AWS_SECRET_ACCESS_KEY: {'*' * 10 if aws_secret_access_key else 'Not set'}")
print(f"AWS_REGION: {aws_region or 'Not set'}")
if not all([aws_access_key_id, aws_secret_access_key, aws_region]):
missing = []
if not aws_access_key_id: missing.append("AWS_ACCESS_KEY_ID")
if not aws_secret_access_key: missing.append("AWS_SECRET_ACCESS_KEY")
if not aws_region: missing.append("AWS_REGION")
return None, f"AWS credentials not found in environment variables: Missing {', '.join(missing)}"
try:
# Fix: Use endpoint_url parameter and construct it properly
endpoint_url = f"https://s3.{aws_region}.amazonaws.com"
print(f"Using endpoint URL: {endpoint_url}")
s3_client = boto3.client(
's3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=aws_region,
endpoint_url=None # Let boto3 construct the endpoint URL automatically
)
# Test the connection with a simple operation
try:
s3_client.list_buckets()
print("Successfully connected to AWS S3")
except Exception as e:
print(f"Warning: AWS credentials may be invalid: {str(e)}")
return None, f"AWS connection error: {str(e)}"
return s3_client, "AWS client initialized successfully"
except Exception as e:
print(f"Error initializing AWS client: {str(e)}")
return None, f"Error initializing AWS client: {str(e)}"
# Function to fetch outline data from AWS
def fetch_outline_data(outline_id, bucket_name="dediro-backup"):
s3_client, message = init_aws_client()
if not s3_client:
return None, message
try:
# Construct the S3 key with the correct path
s3_key = f"Outline-Agent/agent_steps/{outline_id}_outline.json"
# Debug info
log_message = f"Looking for file at: {bucket_name}/{s3_key}"
print(log_message)
# Try to get the object from S3
try:
response = s3_client.get_object(Bucket=bucket_name, Key=s3_key)
outline_data = json.loads(response['Body'].read().decode('utf-8'))
return outline_data, f"Successfully retrieved outline data for ID: {outline_id}"
except s3_client.exceptions.NoSuchKey:
# If exact key not found, try to list objects with similar prefix
prefix = f"Outline-Agent/agent_steps/{outline_id}"
response = s3_client.list_objects_v2(
Bucket=bucket_name,
Prefix=prefix
)
if 'Contents' in response:
# Find the first outline JSON file
outline_files = [obj['Key'] for obj in response['Contents']
if '_outline.json' in obj['Key']]
if outline_files:
# Get the first matching outline file
outline_key = outline_files[0]
print(f"Found alternative file: {outline_key}")
response = s3_client.get_object(Bucket=bucket_name, Key=outline_key)
outline_data = json.loads(response['Body'].read().decode('utf-8'))
return outline_data, f"Found similar outline: {outline_key}"
return None, f"No outline data found for ID: {outline_id}"
except NoCredentialsError:
return None, "AWS credentials not available"
except Exception as e:
return None, f"Error retrieving outline data: {str(e)}"
# Function to fetch research data from AWS
def fetch_research_data(outline_id, bucket_name="dediro-backup"):
s3_client, message = init_aws_client()
if not s3_client:
return None, message
try:
# Construct the S3 key with the correct path for Research Agent
s3_key = f"Research-Agent/agent_steps/{outline_id}_results.json"
# Debug info
log_message = f"Looking for research file at: {bucket_name}/{s3_key}"
print(log_message)
# Try to get the object from S3
try:
response = s3_client.get_object(Bucket=bucket_name, Key=s3_key)
research_data = json.loads(response['Body'].read().decode('utf-8'))
return research_data, f"Successfully retrieved research data for ID: {outline_id}"
except s3_client.exceptions.NoSuchKey:
# If exact key not found, try to list objects with similar prefix
prefix = f"Research-Agent/agent_steps/{outline_id}"
response = s3_client.list_objects_v2(
Bucket=bucket_name,
Prefix=prefix
)
if 'Contents' in response:
# Find the first research JSON file
research_files = [obj['Key'] for obj in response['Contents']
if '_results.json' in obj['Key']]
if research_files:
# Get the first matching research file
research_key = research_files[0]
print(f"Found alternative file: {research_key}")
response = s3_client.get_object(Bucket=bucket_name, Key=research_key)
research_data = json.loads(response['Body'].read().decode('utf-8'))
return research_data, f"Found similar research results: {research_key}"
return None, f"No research data found for ID: {outline_id}"
except NoCredentialsError:
return None, "AWS credentials not available"
except Exception as e:
return None, f"Error retrieving research data: {str(e)}"
# Helper function to extract outline from potentially nested structure
def extract_outline_from_data(data):
"""Extract outline data from potentially nested structures"""
if not data:
return None
# Try different possible paths to the outline data
if isinstance(data, dict):
if "outline" in data:
return data["outline"]
elif "data" in data and "outline" in data["data"]:
return data["data"]["outline"]
# Check if this looks like an outline directly (has title and themes)
if "title" in data and "themes" in data:
return data
# Return the original if we can't determine the structure
return data
# Helper function to extract research results from potentially nested structure
def extract_research_from_data(data):
"""Extract research data from potentially nested structures"""
if not data:
return None
# Try different possible paths to the research data
if isinstance(data, dict):
if "results" in data:
return data["results"]
elif "data" in data and "results" in data["data"]:
return data["data"]["results"]
# Check if this looks like research results directly
if "theme" in data or "articles" in data:
return data
# Return the original if we can't determine the structure
return data
with gr.Blocks(theme=gr.themes.Soft(primary_hue="orange")) as demo:
# Header
gr.Markdown("# AI Outline Generator")
# ID-based process handling
with gr.Row():
with gr.Column(scale=2):
# New synthesis section
gr.Markdown("## Create New Outline")
query_input = gr.Textbox(
label="Enter Query:",
placeholder="What topic would you like to generate an outline for?",
lines=2
)
with gr.Row():
create_button = gr.Button("Generate New Outline", variant="primary")
synthesis_id = gr.Textbox(label="Outline ID", placeholder="ID will appear here...")
with gr.Column(scale=2):
# Continue existing synthesis section
gr.Markdown("## Continue Existing Outline")
existing_id_input = gr.Textbox(
label="Enter Existing Outline ID:",
placeholder="Enter ID to continue from a previous session..."
)
with gr.Row():
load_button = gr.Button("Load Outline")
# New: Research section
with gr.Row():
with gr.Column(scale=2):
# Research Agent section
gr.Markdown("## Generate Research")
research_query_input = gr.Textbox(
label="Enter Outline ID for Research:",
placeholder="Enter the Outline ID to generate research for...",
lines=1
)
with gr.Row():
generate_research_button = gr.Button("Generate Research", variant="primary")
with gr.Column(scale=2):
# Continue existing research section
gr.Markdown("## Continue Existing Research")
existing_research_id_input = gr.Textbox(
label="Enter Existing Research ID:",
placeholder="Enter ID to view existing research..."
)
with gr.Row():
load_research_button = gr.Button("Load Research")
# Synthesis status display
with gr.Row(visible=False) as status_panel:
gr.Markdown("## Current Project Status")
with gr.Column():
status_query = gr.Textbox(label="Query")
status_id = gr.Textbox(label="Outline ID")
with gr.Row():
status_created = gr.Textbox(label="Created", scale=1)
status_updated = gr.Textbox(label="Last Updated", scale=1)
with gr.Row():
status_cost = gr.Number(label="Cost ($)", value=0.00)
# Step Status Card
steps_container = gr.Row(visible=False)
with steps_container:
gr.Markdown("## Process Status")
with gr.Group():
gr.Markdown("### Outline Generator")
outline_status = gr.Markdown("⏳ **In Progress**...")
with gr.Row():
outline_retry = gr.Button("Retry", size="sm", variant="primary")
outline_view = gr.Button("View Results", size="sm")
outline_download = gr.Button("Download", size="sm")
# Add Research Agent status section
with gr.Group():
gr.Markdown("### Research Agent")
research_status = gr.Markdown("⏳ **Waiting**...")
with gr.Row():
research_retry = gr.Button("Retry", size="sm", variant="primary")
research_view = gr.Button("View Results", size="sm")
research_download = gr.Button("Download", size="sm")
# Result Preview for Outline
with gr.Row(visible=False) as result_panel:
gr.Markdown("## Outline Preview")
with gr.Column():
# Add a text area to display any errors or messages
result_message = gr.Textbox(
label="Status",
value="",
visible=False,
lines=2
)
# Keep the JSON component for the outline data
result_json = gr.JSON(
label="Outline Results",
value={}
)
# Add buttons for refresh and download
with gr.Row():
refresh_results = gr.Button("Refresh Results")
download_results = gr.Button("Download Results")
# Result Preview for Research
with gr.Row(visible=False) as research_panel:
gr.Markdown("## Research Results")
with gr.Column():
# Add a text area to display any research errors or messages
research_message = gr.Textbox(
label="Status",
value="",
visible=False,
lines=2
)
# JSON component for the research data
research_json = gr.JSON(
label="Research Results",
value={}
)
# Add buttons for refresh and download
with gr.Row():
refresh_research = gr.Button("Refresh Research")
download_research_btn = gr.Button("Download Research")
# Log panel
with gr.Row(visible=False) as log_panel:
gr.Markdown("## Process Log")
log_output = gr.Textbox(
label="",
lines=8,
max_lines=15,
placeholder="Process logs will appear here..."
)
# Function for generating a new outline
def generate_new_outline(query):
if not query.strip():
return "Please enter a query first", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), "", "", "", "", "", 0.00, "", ""
# Generate a timestamp-based ID (in real implementation, this would match your AWS naming pattern)
timestamp = str(int(time.time()))
formatted_query = query.replace(" ", "_")
synthesis_id = f"{timestamp}_{formatted_query}"
# Update UI
log_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return (
synthesis_id,
gr.update(visible=True),
gr.update(visible=True),
gr.update(visible=True),
f"[{log_timestamp}] Created new outline with ID: {synthesis_id}\n[{log_timestamp}] Starting outline generation for query: {query}...\n",
query,
synthesis_id,
datetime.now().strftime("%Y-%m-%d %H:%M"),
datetime.now().strftime("%Y-%m-%d %H:%M"),
0.00,
"🟡 **In Progress**...",
"⏳ **Waiting**..." # Research status starts as waiting
)
# Function for generating research
def generate_research(outline_id):
if not outline_id.strip():
return gr.update(visible=True), f"Please enter a valid outline ID", gr.update(visible=False)
# First check if the outline exists
outline_data, outline_message = fetch_outline_data(outline_id)
if not outline_data:
return gr.update(visible=True), f"Error: {outline_message}", gr.update(visible=False)
# Update UI
log_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_message = f"[{log_timestamp}] Starting research generation for outline ID: {outline_id}\n[{log_timestamp}] This would trigger the ResearchAgent in a real implementation...\n"
return gr.update(visible=True), log_message, gr.update(visible=True)
# Function for loading existing outline
def load_existing(existing_id):
if not existing_id.strip():
# Return all required outputs even in error case
return (
gr.update(visible=False), # status_panel
gr.update(visible=False), # steps_container
gr.update(visible=False), # log_panel
f"Please enter a valid outline ID", # log_output
"", # status_query
"", # status_id
"", # status_created
"", # status_updated
0.0, # status_cost
"❌ **Error**", # outline_status
"❌ **Error**" # research_status (new)
)
# Now we actually fetch the data from AWS S3
outline_data, outline_message = fetch_outline_data(existing_id)
# Also check if research data exists
research_data, research_message = fetch_research_data(existing_id)
if not outline_data:
# Return all required outputs even in error case
return (
gr.update(visible=False), # status_panel
gr.update(visible=False), # steps_container
gr.update(visible=False), # log_panel
f"Error: {outline_message}", # log_output
"", # status_query
existing_id, # status_id
"", # status_created
"", # status_updated
0.0, # status_cost
"❌ **Error**", # outline_status
"❌ **Not Started**" # research_status
)
# Extract the actual outline
actual_outline = extract_outline_from_data(outline_data)
# Extract query from ID (assuming ID format: timestamp_formatted_query)
parts = existing_id.split('_', 1)
if len(parts) < 2:
# Return all required outputs even in error case
return (
gr.update(visible=False), # status_panel
gr.update(visible=False), # steps_container
gr.update(visible=False), # log_panel
"Invalid outline ID format", # log_output
"", # status_query
existing_id, # status_id
"", # status_created
"", # status_updated
0.0, # status_cost
"❌ **Error**", # outline_status
"❌ **Error**" # research_status
)
# Reconstruct the original query
original_query = parts[1].replace("_", " ")
# Update UI components
log_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Determine actual status based on outline data
outline_status_text = "✅ **Completed**" # Default to completed if we found data
# Determine research status
if research_data:
research_status_text = "✅ **Completed**"
log_message = f"[{log_timestamp}] Loaded outline with ID: {existing_id}\n[{log_timestamp}] {outline_message}\n[{log_timestamp}] Research data found: {research_message}\n"
else:
research_status_text = "⏳ **Not Started**"
log_message = f"[{log_timestamp}] Loaded outline with ID: {existing_id}\n[{log_timestamp}] {outline_message}\n[{log_timestamp}] No research data found yet.\n"
# Get creation time from timestamp in ID
try:
creation_time = datetime.fromtimestamp(int(parts[0])).strftime("%Y-%m-%d %H:%M")
except:
creation_time = "Unknown"
# Extract cost if available in the data
cost = 0.00
if isinstance(outline_data, dict):
if "metrics" in outline_data:
cost = outline_data.get("metrics", {}).get("cost", 0.00)
return (
gr.update(visible=True),
gr.update(visible=True),
gr.update(visible=True),
log_message,
original_query,
existing_id,
creation_time,
datetime.now().strftime("%Y-%m-%d %H:%M"), # Last updated is now
cost,
outline_status_text,
research_status_text
)
# Function for loading existing research
def load_existing_research(existing_id):
if not existing_id.strip():
return gr.update(visible=False), "Please enter a valid research ID", gr.update(visible=True)
# Fetch research data from AWS
research_data, message = fetch_research_data(existing_id)
if not research_data:
return gr.update(visible=False), f"Error: {message}", gr.update(visible=True)
# Extract the actual research results
actual_research = extract_research_from_data(research_data)
# Update UI components
log_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_message = f"[{log_timestamp}] Loaded research with ID: {existing_id}\n[{log_timestamp}] {message}\n"
return gr.update(visible=True), actual_research, log_message
# Function for retrying outline generation
def retry_outline(outline_id):
# This would call your OutlineAgent with force_rerun=True
# For now, just update the UI
log_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return (
"🟡 **Retrying**...",
f"[{log_timestamp}] Retrying outline generation for ID: {outline_id}\n[{log_timestamp}] Connecting to AWS...\n"
)
# Function for retrying research generation
def retry_research(outline_id):
# This would call your ResearchAgent with force_rerun=True
# For now, just update the UI
log_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return (
"🟡 **Retrying**...",
f"[{log_timestamp}] Retrying research generation for ID: {outline_id}\n[{log_timestamp}] Connecting to AWS...\n"
)
# Function to view outline results
def view_outline_results(outline_id):
# Fetch real data from AWS
outline_data, message = fetch_outline_data(outline_id)
if not outline_data:
# Return a message if data not found
return gr.update(visible=True), {"error": message}
# Print the raw data for debugging
print(f"Raw outline data type: {type(outline_data)}")
# Step by step extraction with verbose logging
try:
# First, check if "outline" is directly in the data
if isinstance(outline_data, dict) and "outline" in outline_data:
print("Found 'outline' key at top level")
actual_outline = outline_data["outline"]
# Then check if it's in a nested "data" object
elif isinstance(outline_data, dict) and "data" in outline_data and isinstance(outline_data["data"], dict) and "outline" in outline_data["data"]:
print("Found 'outline' key inside 'data'")
actual_outline = outline_data["data"]["outline"]
# Next, try the most common pattern seen in OutlineAgent
elif isinstance(outline_data, dict) and "models" in outline_data and "prompts" in outline_data and "query" in outline_data:
print("Found OutlineAgent format with 'models', 'prompts', 'query'")
# In this case, we want the actual outline in the data structure
if isinstance(outline_data.get("outline", None), dict):
actual_outline = outline_data["outline"]
print("Using 'outline' field in OutlineAgent format")
else:
# Just use the whole data
actual_outline = outline_data
print("Using complete OutlineAgent data")
# If none of those work, check if this looks like an outline directly
elif isinstance(outline_data, dict) and "title" in outline_data and "themes" in outline_data:
print("Found direct outline structure with 'title' and 'themes'")
actual_outline = outline_data
else:
# As a last resort, just use the whole data
print("Could not identify specific outline structure, using complete data")
actual_outline = outline_data
# Ensure we're working with a non-empty dictionary
if not actual_outline or not isinstance(actual_outline, dict):
print(f"Warning: Extracted outline is not a valid dictionary: {actual_outline}")
# If it's not, pass the original data
actual_outline = {"warning": "Could not extract proper outline structure", "data": str(outline_data)[:1000]}
# Make the result panel visible and return the data for the JSON component
return gr.update(visible=True), actual_outline
except Exception as e:
# If there's an error processing the data, log it and return the raw data
error_msg = f"Error processing outline data: {str(e)}"
print(error_msg)
traceback_info = __import__('traceback').format_exc()
print(f"Traceback: {traceback_info}")
# Return both the error and the raw data for debugging
return gr.update(visible=True), {
"error": error_msg,
"raw_data": str(outline_data)[:1000]
}
# Function to view research results
def view_research_results(outline_id):
# Fetch real data from AWS
research_data, message = fetch_research_data(outline_id)
if not research_data:
# Return a message if data not found
return gr.update(visible=True), gr.update(visible=True), {"error": message}
# Print the raw data for debugging
print(f"Raw research data type: {type(research_data)}")
# Step by step extraction with verbose logging
try:
# Extract the actual research results
actual_research = extract_research_from_data(research_data)
# Ensure we're working with a non-empty structure
if not actual_research:
print(f"Warning: Extracted research is not valid: {actual_research}")
# If it's not, pass the original data
actual_research = {"warning": "Could not extract proper research structure", "data": str(research_data)[:1000]}
# Make the research panel visible and return the data for the JSON component
return gr.update(visible=True), gr.update(visible=False), actual_research
except Exception as e:
# If there's an error processing the data, log it and return the raw data
error_msg = f"Error processing research data: {str(e)}"
print(error_msg)
traceback_info = __import__('traceback').format_exc()
print(f"Traceback: {traceback_info}")
# Return both the error and the raw data for debugging
return gr.update(visible=True), gr.update(visible=True), {
"error": error_msg,
"raw_data": str(research_data)[:1000]
}
# Function for downloading outline data
def download_outline_file(outline_id):
# For now, just log that we would download
log_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return f"[{log_timestamp}] Downloading outline with ID: {outline_id}...\n"
# Function for downloading research data
def download_research_file(outline_id):
# For now, just log that we would download
log_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return f"[{log_timestamp}] Downloading research with ID: {outline_id}...\n"
# Connect the handlers
create_button.click(
fn=generate_new_outline,
inputs=[query_input],
outputs=[
synthesis_id,
status_panel,
steps_container,
log_panel,
log_output,
status_query,
status_id,
status_created,
status_updated,
status_cost,
outline_status,
research_status
]
)
load_button.click(
fn=load_existing,
inputs=[existing_id_input],
outputs=[
status_panel,
steps_container,
log_panel,
log_output,
status_query,
status_id,
status_created,
status_updated,
status_cost,
outline_status,
research_status
]
)
# Connect research generation button
generate_research_button.click(
fn=generate_research,
inputs=[research_query_input],
outputs=[
log_panel,
log_output,
steps_container
]
)
# Connect load research button
load_research_button.click(
fn=load_existing_research,
inputs=[existing_research_id_input],
outputs=[
research_panel,
research_json,
log_output
]
)
# Connect outline action buttons
outline_retry.click(
fn=retry_outline,
inputs=[status_id],
outputs=[outline_status, log_output]
)
outline_view.click(
fn=view_outline_results,
inputs=[status_id],
outputs=[result_panel, result_json]
)
outline_download.click(
fn=download_outline_file,
inputs=[status_id],
outputs=[log_output]
)
# Connect research action buttons
research_retry.click(
fn=retry_research,
inputs=[status_id],
outputs=[research_status, log_output]
)
research_view.click(
fn=view_research_results,
inputs=[status_id],
outputs=[research_panel, research_message, research_json]
)
research_download.click(
fn=download_research_file,
inputs=[status_id],
outputs=[log_output]
)
# Connect refresh button to view function for outline
refresh_results.click(
fn=view_outline_results,
inputs=[status_id],
outputs=[result_panel, result_json]
)
# Connect download results button for outline
download_results.click(
fn=download_outline_file,
inputs=[status_id],
outputs=[log_output]
)
# Connect refresh button to view function for research
refresh_research.click(
fn=view_research_results,
inputs=[status_id],
outputs=[research_panel, research_message, research_json]
)
return demo
if __name__ == "__main__":
# Check for .env file existence and provide guidance if not found
env_file_path = '.env'
if not os.path.exists(env_file_path):
print("\n" + "="*80)
print("WARNING: .env file not found!")
print("="*80 + "\n")
# Initialize the demo
demo = main()
demo.launch(debug=True)