import gradio as gr import os import time import json from datetime import datetime import random import string import boto3 from botocore.exceptions import NoCredentialsError from dotenv import load_dotenv # Load environment variables with detailed error handling def load_environment_variables(): # Try to load from .env file load_dotenv() # Check if critical environment variables exist required_vars = ['AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'AWS_REGION'] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: print(f"WARNING: Missing required environment variables: {', '.join(missing_vars)}") # Set default values for testing/development (DO NOT USE IN PRODUCTION) if 'AWS_REGION' in missing_vars and not os.getenv('AWS_REGION'): os.environ['AWS_REGION'] = 'us-east-1' print("Set default AWS_REGION to 'us-east-1' for testing purposes") else: print("All required environment variables are set") # Load environment variables at startup load_environment_variables() def main(): # Initialize AWS client with more robust error handling def init_aws_client(): aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID') aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY') aws_region = os.getenv('AWS_REGION', 'us-east-1').strip() # Ensure region is stripped of whitespace # Print debug info about environment variables (redacted for security) print(f"AWS_ACCESS_KEY_ID: {'*' * 5 + aws_access_key_id[-4:] if aws_access_key_id else 'Not set'}") print(f"AWS_SECRET_ACCESS_KEY: {'*' * 10 if aws_secret_access_key else 'Not set'}") print(f"AWS_REGION: {aws_region or 'Not set'}") if not all([aws_access_key_id, aws_secret_access_key, aws_region]): missing = [] if not aws_access_key_id: missing.append("AWS_ACCESS_KEY_ID") if not aws_secret_access_key: missing.append("AWS_SECRET_ACCESS_KEY") if not aws_region: missing.append("AWS_REGION") return None, f"AWS credentials not found in environment variables: Missing {', '.join(missing)}" try: # Fix: Use endpoint_url parameter and construct it properly endpoint_url = f"https://s3.{aws_region}.amazonaws.com" print(f"Using endpoint URL: {endpoint_url}") s3_client = boto3.client( 's3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=aws_region, endpoint_url=None # Let boto3 construct the endpoint URL automatically ) # Test the connection with a simple operation try: s3_client.list_buckets() print("Successfully connected to AWS S3") except Exception as e: print(f"Warning: AWS credentials may be invalid: {str(e)}") return None, f"AWS connection error: {str(e)}" return s3_client, "AWS client initialized successfully" except Exception as e: print(f"Error initializing AWS client: {str(e)}") return None, f"Error initializing AWS client: {str(e)}" # Function to fetch outline data from AWS def fetch_outline_data(outline_id, bucket_name="dediro-backup"): s3_client, message = init_aws_client() if not s3_client: return None, message try: # Construct the S3 key with the correct path s3_key = f"Outline-Agent/agent_steps/{outline_id}_outline.json" # Debug info log_message = f"Looking for file at: {bucket_name}/{s3_key}" print(log_message) # Try to get the object from S3 try: response = s3_client.get_object(Bucket=bucket_name, Key=s3_key) outline_data = json.loads(response['Body'].read().decode('utf-8')) return outline_data, f"Successfully retrieved outline data for ID: {outline_id}" except s3_client.exceptions.NoSuchKey: # If exact key not found, try to list objects with similar prefix prefix = f"Outline-Agent/agent_steps/{outline_id}" response = s3_client.list_objects_v2( Bucket=bucket_name, Prefix=prefix ) if 'Contents' in response: # Find the first outline JSON file outline_files = [obj['Key'] for obj in response['Contents'] if '_outline.json' in obj['Key']] if outline_files: # Get the first matching outline file outline_key = outline_files[0] print(f"Found alternative file: {outline_key}") response = s3_client.get_object(Bucket=bucket_name, Key=outline_key) outline_data = json.loads(response['Body'].read().decode('utf-8')) return outline_data, f"Found similar outline: {outline_key}" return None, f"No outline data found for ID: {outline_id}" except NoCredentialsError: return None, "AWS credentials not available" except Exception as e: return None, f"Error retrieving outline data: {str(e)}" # Function to fetch research data from AWS def fetch_research_data(outline_id, bucket_name="dediro-backup"): s3_client, message = init_aws_client() if not s3_client: return None, message try: # Construct the S3 key with the correct path for Research Agent s3_key = f"Research-Agent/agent_steps/{outline_id}_results.json" # Debug info log_message = f"Looking for research file at: {bucket_name}/{s3_key}" print(log_message) # Try to get the object from S3 try: response = s3_client.get_object(Bucket=bucket_name, Key=s3_key) research_data = json.loads(response['Body'].read().decode('utf-8')) return research_data, f"Successfully retrieved research data for ID: {outline_id}" except s3_client.exceptions.NoSuchKey: # If exact key not found, try to list objects with similar prefix prefix = f"Research-Agent/agent_steps/{outline_id}" response = s3_client.list_objects_v2( Bucket=bucket_name, Prefix=prefix ) if 'Contents' in response: # Find the first research JSON file research_files = [obj['Key'] for obj in response['Contents'] if '_results.json' in obj['Key']] if research_files: # Get the first matching research file research_key = research_files[0] print(f"Found alternative file: {research_key}") response = s3_client.get_object(Bucket=bucket_name, Key=research_key) research_data = json.loads(response['Body'].read().decode('utf-8')) return research_data, f"Found similar research results: {research_key}" return None, f"No research data found for ID: {outline_id}" except NoCredentialsError: return None, "AWS credentials not available" except Exception as e: return None, f"Error retrieving research data: {str(e)}" # Helper function to extract outline from potentially nested structure def extract_outline_from_data(data): """Extract outline data from potentially nested structures""" if not data: return None # Try different possible paths to the outline data if isinstance(data, dict): if "outline" in data: return data["outline"] elif "data" in data and "outline" in data["data"]: return data["data"]["outline"] # Check if this looks like an outline directly (has title and themes) if "title" in data and "themes" in data: return data # Return the original if we can't determine the structure return data # Helper function to extract research results from potentially nested structure def extract_research_from_data(data): """Extract research data from potentially nested structures""" if not data: return None # Try different possible paths to the research data if isinstance(data, dict): if "results" in data: return data["results"] elif "data" in data and "results" in data["data"]: return data["data"]["results"] # Check if this looks like research results directly if "theme" in data or "articles" in data: return data # Return the original if we can't determine the structure return data with gr.Blocks(theme=gr.themes.Soft(primary_hue="orange")) as demo: # Header gr.Markdown("# AI Outline Generator") # ID-based process handling with gr.Row(): with gr.Column(scale=2): # New synthesis section gr.Markdown("## Create New Outline") query_input = gr.Textbox( label="Enter Query:", placeholder="What topic would you like to generate an outline for?", lines=2 ) with gr.Row(): create_button = gr.Button("Generate New Outline", variant="primary") synthesis_id = gr.Textbox(label="Outline ID", placeholder="ID will appear here...") with gr.Column(scale=2): # Continue existing synthesis section gr.Markdown("## Continue Existing Outline") existing_id_input = gr.Textbox( label="Enter Existing Outline ID:", placeholder="Enter ID to continue from a previous session..." ) with gr.Row(): load_button = gr.Button("Load Outline") # New: Research section with gr.Row(): with gr.Column(scale=2): # Research Agent section gr.Markdown("## Generate Research") research_query_input = gr.Textbox( label="Enter Outline ID for Research:", placeholder="Enter the Outline ID to generate research for...", lines=1 ) with gr.Row(): generate_research_button = gr.Button("Generate Research", variant="primary") with gr.Column(scale=2): # Continue existing research section gr.Markdown("## Continue Existing Research") existing_research_id_input = gr.Textbox( label="Enter Existing Research ID:", placeholder="Enter ID to view existing research..." ) with gr.Row(): load_research_button = gr.Button("Load Research") # Synthesis status display with gr.Row(visible=False) as status_panel: gr.Markdown("## Current Project Status") with gr.Column(): status_query = gr.Textbox(label="Query") status_id = gr.Textbox(label="Outline ID") with gr.Row(): status_created = gr.Textbox(label="Created", scale=1) status_updated = gr.Textbox(label="Last Updated", scale=1) with gr.Row(): status_cost = gr.Number(label="Cost ($)", value=0.00) # Step Status Card steps_container = gr.Row(visible=False) with steps_container: gr.Markdown("## Process Status") with gr.Group(): gr.Markdown("### Outline Generator") outline_status = gr.Markdown("⏳ **In Progress**...") with gr.Row(): outline_retry = gr.Button("Retry", size="sm", variant="primary") outline_view = gr.Button("View Results", size="sm") outline_download = gr.Button("Download", size="sm") # Add Research Agent status section with gr.Group(): gr.Markdown("### Research Agent") research_status = gr.Markdown("⏳ **Waiting**...") with gr.Row(): research_retry = gr.Button("Retry", size="sm", variant="primary") research_view = gr.Button("View Results", size="sm") research_download = gr.Button("Download", size="sm") # Result Preview for Outline with gr.Row(visible=False) as result_panel: gr.Markdown("## Outline Preview") with gr.Column(): # Add a text area to display any errors or messages result_message = gr.Textbox( label="Status", value="", visible=False, lines=2 ) # Keep the JSON component for the outline data result_json = gr.JSON( label="Outline Results", value={} ) # Add buttons for refresh and download with gr.Row(): refresh_results = gr.Button("Refresh Results") download_results = gr.Button("Download Results") # Result Preview for Research with gr.Row(visible=False) as research_panel: gr.Markdown("## Research Results") with gr.Column(): # Add a text area to display any research errors or messages research_message = gr.Textbox( label="Status", value="", visible=False, lines=2 ) # JSON component for the research data research_json = gr.JSON( label="Research Results", value={} ) # Add buttons for refresh and download with gr.Row(): refresh_research = gr.Button("Refresh Research") download_research_btn = gr.Button("Download Research") # Log panel with gr.Row(visible=False) as log_panel: gr.Markdown("## Process Log") log_output = gr.Textbox( label="", lines=8, max_lines=15, placeholder="Process logs will appear here..." ) # Function for generating a new outline def generate_new_outline(query): if not query.strip(): return "Please enter a query first", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), "", "", "", "", "", 0.00, "", "" # Generate a timestamp-based ID (in real implementation, this would match your AWS naming pattern) timestamp = str(int(time.time())) formatted_query = query.replace(" ", "_") synthesis_id = f"{timestamp}_{formatted_query}" # Update UI log_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return ( synthesis_id, gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), f"[{log_timestamp}] Created new outline with ID: {synthesis_id}\n[{log_timestamp}] Starting outline generation for query: {query}...\n", query, synthesis_id, datetime.now().strftime("%Y-%m-%d %H:%M"), datetime.now().strftime("%Y-%m-%d %H:%M"), 0.00, "🟡 **In Progress**...", "⏳ **Waiting**..." # Research status starts as waiting ) # Function for generating research def generate_research(outline_id): if not outline_id.strip(): return gr.update(visible=True), f"Please enter a valid outline ID", gr.update(visible=False) # First check if the outline exists outline_data, outline_message = fetch_outline_data(outline_id) if not outline_data: return gr.update(visible=True), f"Error: {outline_message}", gr.update(visible=False) # Update UI log_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_message = f"[{log_timestamp}] Starting research generation for outline ID: {outline_id}\n[{log_timestamp}] This would trigger the ResearchAgent in a real implementation...\n" return gr.update(visible=True), log_message, gr.update(visible=True) # Function for loading existing outline def load_existing(existing_id): if not existing_id.strip(): # Return all required outputs even in error case return ( gr.update(visible=False), # status_panel gr.update(visible=False), # steps_container gr.update(visible=False), # log_panel f"Please enter a valid outline ID", # log_output "", # status_query "", # status_id "", # status_created "", # status_updated 0.0, # status_cost "❌ **Error**", # outline_status "❌ **Error**" # research_status (new) ) # Now we actually fetch the data from AWS S3 outline_data, outline_message = fetch_outline_data(existing_id) # Also check if research data exists research_data, research_message = fetch_research_data(existing_id) if not outline_data: # Return all required outputs even in error case return ( gr.update(visible=False), # status_panel gr.update(visible=False), # steps_container gr.update(visible=False), # log_panel f"Error: {outline_message}", # log_output "", # status_query existing_id, # status_id "", # status_created "", # status_updated 0.0, # status_cost "❌ **Error**", # outline_status "❌ **Not Started**" # research_status ) # Extract the actual outline actual_outline = extract_outline_from_data(outline_data) # Extract query from ID (assuming ID format: timestamp_formatted_query) parts = existing_id.split('_', 1) if len(parts) < 2: # Return all required outputs even in error case return ( gr.update(visible=False), # status_panel gr.update(visible=False), # steps_container gr.update(visible=False), # log_panel "Invalid outline ID format", # log_output "", # status_query existing_id, # status_id "", # status_created "", # status_updated 0.0, # status_cost "❌ **Error**", # outline_status "❌ **Error**" # research_status ) # Reconstruct the original query original_query = parts[1].replace("_", " ") # Update UI components log_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Determine actual status based on outline data outline_status_text = "✅ **Completed**" # Default to completed if we found data # Determine research status if research_data: research_status_text = "✅ **Completed**" log_message = f"[{log_timestamp}] Loaded outline with ID: {existing_id}\n[{log_timestamp}] {outline_message}\n[{log_timestamp}] Research data found: {research_message}\n" else: research_status_text = "⏳ **Not Started**" log_message = f"[{log_timestamp}] Loaded outline with ID: {existing_id}\n[{log_timestamp}] {outline_message}\n[{log_timestamp}] No research data found yet.\n" # Get creation time from timestamp in ID try: creation_time = datetime.fromtimestamp(int(parts[0])).strftime("%Y-%m-%d %H:%M") except: creation_time = "Unknown" # Extract cost if available in the data cost = 0.00 if isinstance(outline_data, dict): if "metrics" in outline_data: cost = outline_data.get("metrics", {}).get("cost", 0.00) return ( gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), log_message, original_query, existing_id, creation_time, datetime.now().strftime("%Y-%m-%d %H:%M"), # Last updated is now cost, outline_status_text, research_status_text ) # Function for loading existing research def load_existing_research(existing_id): if not existing_id.strip(): return gr.update(visible=False), "Please enter a valid research ID", gr.update(visible=True) # Fetch research data from AWS research_data, message = fetch_research_data(existing_id) if not research_data: return gr.update(visible=False), f"Error: {message}", gr.update(visible=True) # Extract the actual research results actual_research = extract_research_from_data(research_data) # Update UI components log_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_message = f"[{log_timestamp}] Loaded research with ID: {existing_id}\n[{log_timestamp}] {message}\n" return gr.update(visible=True), actual_research, log_message # Function for retrying outline generation def retry_outline(outline_id): # This would call your OutlineAgent with force_rerun=True # For now, just update the UI log_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return ( "🟡 **Retrying**...", f"[{log_timestamp}] Retrying outline generation for ID: {outline_id}\n[{log_timestamp}] Connecting to AWS...\n" ) # Function for retrying research generation def retry_research(outline_id): # This would call your ResearchAgent with force_rerun=True # For now, just update the UI log_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return ( "🟡 **Retrying**...", f"[{log_timestamp}] Retrying research generation for ID: {outline_id}\n[{log_timestamp}] Connecting to AWS...\n" ) # Function to view outline results def view_outline_results(outline_id): # Fetch real data from AWS outline_data, message = fetch_outline_data(outline_id) if not outline_data: # Return a message if data not found return gr.update(visible=True), {"error": message} # Print the raw data for debugging print(f"Raw outline data type: {type(outline_data)}") # Step by step extraction with verbose logging try: # First, check if "outline" is directly in the data if isinstance(outline_data, dict) and "outline" in outline_data: print("Found 'outline' key at top level") actual_outline = outline_data["outline"] # Then check if it's in a nested "data" object elif isinstance(outline_data, dict) and "data" in outline_data and isinstance(outline_data["data"], dict) and "outline" in outline_data["data"]: print("Found 'outline' key inside 'data'") actual_outline = outline_data["data"]["outline"] # Next, try the most common pattern seen in OutlineAgent elif isinstance(outline_data, dict) and "models" in outline_data and "prompts" in outline_data and "query" in outline_data: print("Found OutlineAgent format with 'models', 'prompts', 'query'") # In this case, we want the actual outline in the data structure if isinstance(outline_data.get("outline", None), dict): actual_outline = outline_data["outline"] print("Using 'outline' field in OutlineAgent format") else: # Just use the whole data actual_outline = outline_data print("Using complete OutlineAgent data") # If none of those work, check if this looks like an outline directly elif isinstance(outline_data, dict) and "title" in outline_data and "themes" in outline_data: print("Found direct outline structure with 'title' and 'themes'") actual_outline = outline_data else: # As a last resort, just use the whole data print("Could not identify specific outline structure, using complete data") actual_outline = outline_data # Ensure we're working with a non-empty dictionary if not actual_outline or not isinstance(actual_outline, dict): print(f"Warning: Extracted outline is not a valid dictionary: {actual_outline}") # If it's not, pass the original data actual_outline = {"warning": "Could not extract proper outline structure", "data": str(outline_data)[:1000]} # Make the result panel visible and return the data for the JSON component return gr.update(visible=True), actual_outline except Exception as e: # If there's an error processing the data, log it and return the raw data error_msg = f"Error processing outline data: {str(e)}" print(error_msg) traceback_info = __import__('traceback').format_exc() print(f"Traceback: {traceback_info}") # Return both the error and the raw data for debugging return gr.update(visible=True), { "error": error_msg, "raw_data": str(outline_data)[:1000] } # Function to view research results def view_research_results(outline_id): # Fetch real data from AWS research_data, message = fetch_research_data(outline_id) if not research_data: # Return a message if data not found return gr.update(visible=True), gr.update(visible=True), {"error": message} # Print the raw data for debugging print(f"Raw research data type: {type(research_data)}") # Step by step extraction with verbose logging try: # Extract the actual research results actual_research = extract_research_from_data(research_data) # Ensure we're working with a non-empty structure if not actual_research: print(f"Warning: Extracted research is not valid: {actual_research}") # If it's not, pass the original data actual_research = {"warning": "Could not extract proper research structure", "data": str(research_data)[:1000]} # Make the research panel visible and return the data for the JSON component return gr.update(visible=True), gr.update(visible=False), actual_research except Exception as e: # If there's an error processing the data, log it and return the raw data error_msg = f"Error processing research data: {str(e)}" print(error_msg) traceback_info = __import__('traceback').format_exc() print(f"Traceback: {traceback_info}") # Return both the error and the raw data for debugging return gr.update(visible=True), gr.update(visible=True), { "error": error_msg, "raw_data": str(research_data)[:1000] } # Function for downloading outline data def download_outline_file(outline_id): # For now, just log that we would download log_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return f"[{log_timestamp}] Downloading outline with ID: {outline_id}...\n" # Function for downloading research data def download_research_file(outline_id): # For now, just log that we would download log_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return f"[{log_timestamp}] Downloading research with ID: {outline_id}...\n" # Connect the handlers create_button.click( fn=generate_new_outline, inputs=[query_input], outputs=[ synthesis_id, status_panel, steps_container, log_panel, log_output, status_query, status_id, status_created, status_updated, status_cost, outline_status, research_status ] ) load_button.click( fn=load_existing, inputs=[existing_id_input], outputs=[ status_panel, steps_container, log_panel, log_output, status_query, status_id, status_created, status_updated, status_cost, outline_status, research_status ] ) # Connect research generation button generate_research_button.click( fn=generate_research, inputs=[research_query_input], outputs=[ log_panel, log_output, steps_container ] ) # Connect load research button load_research_button.click( fn=load_existing_research, inputs=[existing_research_id_input], outputs=[ research_panel, research_json, log_output ] ) # Connect outline action buttons outline_retry.click( fn=retry_outline, inputs=[status_id], outputs=[outline_status, log_output] ) outline_view.click( fn=view_outline_results, inputs=[status_id], outputs=[result_panel, result_json] ) outline_download.click( fn=download_outline_file, inputs=[status_id], outputs=[log_output] ) # Connect research action buttons research_retry.click( fn=retry_research, inputs=[status_id], outputs=[research_status, log_output] ) research_view.click( fn=view_research_results, inputs=[status_id], outputs=[research_panel, research_message, research_json] ) research_download.click( fn=download_research_file, inputs=[status_id], outputs=[log_output] ) # Connect refresh button to view function for outline refresh_results.click( fn=view_outline_results, inputs=[status_id], outputs=[result_panel, result_json] ) # Connect download results button for outline download_results.click( fn=download_outline_file, inputs=[status_id], outputs=[log_output] ) # Connect refresh button to view function for research refresh_research.click( fn=view_research_results, inputs=[status_id], outputs=[research_panel, research_message, research_json] ) return demo if __name__ == "__main__": # Check for .env file existence and provide guidance if not found env_file_path = '.env' if not os.path.exists(env_file_path): print("\n" + "="*80) print("WARNING: .env file not found!") print("="*80 + "\n") # Initialize the demo demo = main() demo.launch(debug=True)