Spaces:
Running
Running
| import os | |
| import json | |
| import datetime | |
| from pathlib import Path | |
| from openai import OpenAI | |
| from dotenv import load_dotenv | |
| # --- Configuration & Initialization --- | |
| load_dotenv() | |
| # The script runs from the 'scripts/' directory, so its root is one level up. | |
| PROJECT_ROOT = Path(__file__).resolve().parent.parent | |
| # File Paths | |
| MASTER_PROMPT_PATH = PROJECT_ROOT / "data" / "master_prompt.json" | |
| FEEDBACK_LOG_PATH = PROJECT_ROOT / "data" / "feedback_log.json" | |
| STATUS_LOG_PATH = PROJECT_ROOT / "data" / "status_log.txt" | |
| # LLM Model ID for Rewriting (Meta-LLM) | |
| META_LLM_MODEL = "x-ai/grok-4-fast" | |
| # The minimum number of negative feedback entries required to trigger an update | |
| MIN_NEGATIVE_FEEDBACK = 3 | |
| def load_data(path: Path): | |
| """Safely loads JSON data, handling empty file content.""" | |
| try: | |
| if path.exists(): | |
| content = path.read_text().strip() | |
| # Handle empty content | |
| if not content: | |
| return [] | |
| data = json.loads(content) | |
| # Ensure correct type based on file (MASTER_PROMPT is dict, FEEDBACK_LOG is list) | |
| if path == MASTER_PROMPT_PATH and not isinstance(data, dict): | |
| return {} | |
| if path == FEEDBACK_LOG_PATH and not isinstance(data, list): | |
| return [] | |
| return data | |
| # Create initial state if file doesn't exist | |
| if path == MASTER_PROMPT_PATH: | |
| return {"system_message": "A critical error occurred.", "version": "1.0.0", "last_updated": datetime.datetime.now().isoformat()} | |
| return [] | |
| except Exception as e: | |
| print(f"Error loading {path}: {e}") | |
| return [] | |
| def aggregate_negative_feedback(feedback_data: list) -> str: | |
| """ | |
| Analyzes the feedback log to summarize only the negative (rating=0) feedback. | |
| """ | |
| negative_feedback = [entry for entry in feedback_data if entry.get("rating") == 0] | |
| if len(negative_feedback) < MIN_NEGATIVE_FEEDBACK: | |
| print(f"INFO: Only {len(negative_feedback)} negative entries found. Skipping optimization.") | |
| return None | |
| # Summarize the negative prompts that led to user dislike | |
| summary = [] | |
| for entry in negative_feedback: | |
| summary.append( | |
| f"User disliked the response (Rating 0) after input: '{entry['original_prompt']}' " | |
| f"The resulting OPTIMIZED PROMPT was: '{entry['optimized_prompt']}'" | |
| ) | |
| print(f"INFO: Aggregated {len(negative_feedback)} negative feedback entries.") | |
| return "\n---\n".join(summary) | |
| def optimize_system_prompt(current_system_message: str, feedback_summary: str) -> str: | |
| """ | |
| Calls the Meta-LLM to rewrite the system message based on negative feedback. | |
| """ | |
| # We define a strict Meta-Prompt for the Grok model to follow | |
| meta_prompt = ( | |
| "You are the **System Prompt Optimizing Agent**. Your goal is to analyze the 'FAILED FEEDBACK' and rewrite the 'CURRENT SYSTEM MESSAGE' " | |
| "to address the problems identified. The new system message must aim to improve the quality of future responses, making them more accurate, " | |
| "detailed, or strictly adherent to formatting rules, based on the failure patterns. " | |
| "You must output **ONLY** the new system message text, nothing else. Do not use markdown quotes." | |
| ) | |
| # The user message feeds the prompt and the negative data to the agent | |
| user_message = f""" | |
| CURRENT SYSTEM MESSAGE: | |
| --- | |
| {current_system_message} | |
| --- | |
| FAILED FEEDBACK (You must incorporate lessons from this data): | |
| --- | |
| {feedback_summary} | |
| --- | |
| Based ONLY on the above, rewrite the CURRENT SYSTEM MESSAGE to improve it. | |
| New System Message: | |
| """ | |
| try: | |
| # Call OpenRouter API with the Meta-LLM | |
| client = OpenAI( | |
| base_url="https://openrouter.ai/api/v1", | |
| api_key=os.getenv("OPENROUTER_API_KEY"), | |
| ) | |
| response = client.chat.completions.create( | |
| model=META_LLM_MODEL, | |
| messages=[ | |
| {"role": "system", "content": meta_prompt}, | |
| {"role": "user", "content": user_message} | |
| ], | |
| temperature=0.1, # Low temperature for reliable instruction following | |
| max_tokens=512, | |
| ) | |
| new_prompt = response.choices[0].message.content.strip() | |
| print("SUCCESS: New System Prompt generated by Meta-LLM.") | |
| return new_prompt | |
| except Exception as e: | |
| print(f"CRITICAL ERROR: Meta-LLM API call failed: {e}") | |
| return current_system_message # Return original prompt on failure | |
| def increment_version(version_str: str) -> str: | |
| """Safely increments the minor version (Y in X.Y.Z) of a version string.""" | |
| try: | |
| # Assumes format X.Y.Z | |
| parts = version_str.split('.') | |
| if len(parts) < 2: | |
| return "1.0.0" | |
| # Increment the second part (the minor version) | |
| new_minor_version = int(parts[1]) + 1 | |
| # Rebuild the version string | |
| # Uses parts[2:] for safety, handles missing Z value if needed | |
| new_parts = [parts[0], str(new_minor_version)] + parts[2:] | |
| return ".".join(new_parts) | |
| except Exception: | |
| # If any part fails (non-integer, etc.), reset to a safe, known state. | |
| return "1.0.0" | |
| def run_optimization(): | |
| """Main function for the MLOps pipeline script.""" | |
| print(f"--- Running Prompt Optimization Pipeline at {datetime.datetime.now()} ---") | |
| # 1. Load Data | |
| current_config = load_data(MASTER_PROMPT_PATH) | |
| feedback_data = load_data(FEEDBACK_LOG_PATH) | |
| current_system_message = current_config.get("system_message", "") | |
| if not feedback_data: | |
| print("INFO: Feedback log is empty. Exiting optimization.") | |
| # Update the status log to reflect that the scheduled job ran but was skipped. | |
| update_status_log("MLOps Optimization skipped (No new feedback).") | |
| return | |
| # 2. Aggregate Feedback | |
| feedback_summary = aggregate_negative_feedback(feedback_data) | |
| if feedback_summary is None: | |
| # Optimization was skipped because not enough negative feedback was found | |
| update_status_log("MLOps Optimization skipped (Insufficient negative feedback).") | |
| return | |
| # 3. Optimize Prompt | |
| new_system_message = optimize_system_prompt(current_system_message, feedback_summary) | |
| # 4. Check if prompt actually changed before committing | |
| if new_system_message != current_system_message: | |
| print("\n*** PROMPT UPDATED ***") | |
| # 5. Update Master Prompt File | |
| current_config["system_message"] = new_system_message | |
| current_config["version"] = increment_version(current_config.get("version", "1.0.0")) | |
| current_config["last_updated"] = datetime.datetime.now().isoformat() | |
| with open(MASTER_PROMPT_PATH, 'w') as f: | |
| json.dump(current_config, f, indent=4) | |
| print(f"Successfully wrote new prompt version {current_config['version']} to master_prompt.json") | |
| # 6. Clear Feedback Log (Ready for next cycle) | |
| with open(FEEDBACK_LOG_PATH, 'w') as f: | |
| json.dump([], f) | |
| print("Feedback log cleared.") | |
| # 7. Update the Status Log (Rewrites the file with new timestamp) | |
| update_status_log(f"MLOps Prompt Optimization deployed successfully. New version: {current_config['version']}") | |
| else: | |
| print("\nINFO: No significant change or API error. Master prompt remains the same.") | |
| update_status_log("MLOps Optimization failed to deploy (API error or no meaningful change detected).") | |
| def update_status_log(status_message: str): | |
| """Writes the current status to the status log file, overwriting the previous entry.""" | |
| current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S IST") | |
| log_content = f"[{current_time}] {status_message}" | |
| try: | |
| with open(STATUS_LOG_PATH, 'w') as f: | |
| f.write(log_content) | |
| print(f"Status log updated: {log_content}") | |
| except Exception as e: | |
| print(f"CRITICAL ERROR: Failed to write to status log: {e}") | |
| if __name__ == '__main__': | |
| run_optimization() |