import os import json import datetime from pathlib import Path from openai import OpenAI from dotenv import load_dotenv # --- Configuration & Initialization --- load_dotenv() # The script runs from the 'scripts/' directory, so its root is one level up. PROJECT_ROOT = Path(__file__).resolve().parent.parent # File Paths MASTER_PROMPT_PATH = PROJECT_ROOT / "data" / "master_prompt.json" FEEDBACK_LOG_PATH = PROJECT_ROOT / "data" / "feedback_log.json" STATUS_LOG_PATH = PROJECT_ROOT / "data" / "status_log.txt" # LLM Model ID for Rewriting (Meta-LLM) META_LLM_MODEL = "x-ai/grok-4-fast" # The minimum number of negative feedback entries required to trigger an update MIN_NEGATIVE_FEEDBACK = 3 def load_data(path: Path): """Safely loads JSON data, handling empty file content.""" try: if path.exists(): content = path.read_text().strip() # Handle empty content if not content: return [] data = json.loads(content) # Ensure correct type based on file (MASTER_PROMPT is dict, FEEDBACK_LOG is list) if path == MASTER_PROMPT_PATH and not isinstance(data, dict): return {} if path == FEEDBACK_LOG_PATH and not isinstance(data, list): return [] return data # Create initial state if file doesn't exist if path == MASTER_PROMPT_PATH: return {"system_message": "A critical error occurred.", "version": "1.0.0", "last_updated": datetime.datetime.now().isoformat()} return [] except Exception as e: print(f"Error loading {path}: {e}") return [] def aggregate_negative_feedback(feedback_data: list) -> str: """ Analyzes the feedback log to summarize only the negative (rating=0) feedback. """ negative_feedback = [entry for entry in feedback_data if entry.get("rating") == 0] if len(negative_feedback) < MIN_NEGATIVE_FEEDBACK: print(f"INFO: Only {len(negative_feedback)} negative entries found. Skipping optimization.") return None # Summarize the negative prompts that led to user dislike summary = [] for entry in negative_feedback: summary.append( f"User disliked the response (Rating 0) after input: '{entry['original_prompt']}' " f"The resulting OPTIMIZED PROMPT was: '{entry['optimized_prompt']}'" ) print(f"INFO: Aggregated {len(negative_feedback)} negative feedback entries.") return "\n---\n".join(summary) def optimize_system_prompt(current_system_message: str, feedback_summary: str) -> str: """ Calls the Meta-LLM to rewrite the system message based on negative feedback. """ # We define a strict Meta-Prompt for the Grok model to follow meta_prompt = ( "You are the **System Prompt Optimizing Agent**. Your goal is to analyze the 'FAILED FEEDBACK' and rewrite the 'CURRENT SYSTEM MESSAGE' " "to address the problems identified. The new system message must aim to improve the quality of future responses, making them more accurate, " "detailed, or strictly adherent to formatting rules, based on the failure patterns. " "You must output **ONLY** the new system message text, nothing else. Do not use markdown quotes." ) # The user message feeds the prompt and the negative data to the agent user_message = f""" CURRENT SYSTEM MESSAGE: --- {current_system_message} --- FAILED FEEDBACK (You must incorporate lessons from this data): --- {feedback_summary} --- Based ONLY on the above, rewrite the CURRENT SYSTEM MESSAGE to improve it. New System Message: """ try: # Call OpenRouter API with the Meta-LLM client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=os.getenv("OPENROUTER_API_KEY"), ) response = client.chat.completions.create( model=META_LLM_MODEL, messages=[ {"role": "system", "content": meta_prompt}, {"role": "user", "content": user_message} ], temperature=0.1, # Low temperature for reliable instruction following max_tokens=512, ) new_prompt = response.choices[0].message.content.strip() print("SUCCESS: New System Prompt generated by Meta-LLM.") return new_prompt except Exception as e: print(f"CRITICAL ERROR: Meta-LLM API call failed: {e}") return current_system_message # Return original prompt on failure def increment_version(version_str: str) -> str: """Safely increments the minor version (Y in X.Y.Z) of a version string.""" try: # Assumes format X.Y.Z parts = version_str.split('.') if len(parts) < 2: return "1.0.0" # Increment the second part (the minor version) new_minor_version = int(parts[1]) + 1 # Rebuild the version string # Uses parts[2:] for safety, handles missing Z value if needed new_parts = [parts[0], str(new_minor_version)] + parts[2:] return ".".join(new_parts) except Exception: # If any part fails (non-integer, etc.), reset to a safe, known state. return "1.0.0" def run_optimization(): """Main function for the MLOps pipeline script.""" print(f"--- Running Prompt Optimization Pipeline at {datetime.datetime.now()} ---") # 1. Load Data current_config = load_data(MASTER_PROMPT_PATH) feedback_data = load_data(FEEDBACK_LOG_PATH) current_system_message = current_config.get("system_message", "") if not feedback_data: print("INFO: Feedback log is empty. Exiting optimization.") # Update the status log to reflect that the scheduled job ran but was skipped. update_status_log("MLOps Optimization skipped (No new feedback).") return # 2. Aggregate Feedback feedback_summary = aggregate_negative_feedback(feedback_data) if feedback_summary is None: # Optimization was skipped because not enough negative feedback was found update_status_log("MLOps Optimization skipped (Insufficient negative feedback).") return # 3. Optimize Prompt new_system_message = optimize_system_prompt(current_system_message, feedback_summary) # 4. Check if prompt actually changed before committing if new_system_message != current_system_message: print("\n*** PROMPT UPDATED ***") # 5. Update Master Prompt File current_config["system_message"] = new_system_message current_config["version"] = increment_version(current_config.get("version", "1.0.0")) current_config["last_updated"] = datetime.datetime.now().isoformat() with open(MASTER_PROMPT_PATH, 'w') as f: json.dump(current_config, f, indent=4) print(f"Successfully wrote new prompt version {current_config['version']} to master_prompt.json") # 6. Clear Feedback Log (Ready for next cycle) with open(FEEDBACK_LOG_PATH, 'w') as f: json.dump([], f) print("Feedback log cleared.") # 7. Update the Status Log (Rewrites the file with new timestamp) update_status_log(f"MLOps Prompt Optimization deployed successfully. New version: {current_config['version']}") else: print("\nINFO: No significant change or API error. Master prompt remains the same.") update_status_log("MLOps Optimization failed to deploy (API error or no meaningful change detected).") def update_status_log(status_message: str): """Writes the current status to the status log file, overwriting the previous entry.""" current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S IST") log_content = f"[{current_time}] {status_message}" try: with open(STATUS_LOG_PATH, 'w') as f: f.write(log_content) print(f"Status log updated: {log_content}") except Exception as e: print(f"CRITICAL ERROR: Failed to write to status log: {e}") if __name__ == '__main__': run_optimization()