# bp_phi/runner.py import os os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8" import torch import random import numpy as np import statistics import time import re # <-- FIX: Added missing import import json # <-- FIX: Added missing import from transformers import set_seed from typing import Dict, Any, List from .workspace import Workspace, RandomWorkspace from .llm_iface import LLM from .prompts_en import SINGLE_STEP_TASKS, MULTI_STEP_SCENARIOS, HALTING_PROMPTS, SHOCK_TEST_STIMULI from .runner_utils import dbg, SYSTEM_META, step_user_prompt, parse_meta # --- Experiment 1: Workspace & Ablations Runner --- def run_workspace_suite(model_id: str, trials: int, seed: int, temperature: float, ablation: str or None) -> Dict[str, Any]: random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) try: torch.use_deterministic_algorithms(True, warn_only=True) except Exception: pass set_seed(seed) llm = LLM(model_id=model_id, device="auto", seed=seed) task_pool = SINGLE_STEP_TASKS + MULTI_STEP_SCENARIOS random.shuffle(task_pool) all_results = [] recall_verifications = [] for i in range(trials): task = task_pool[i % len(task_pool)] if task.get("type") == "multi_step": dbg(f"\n--- SCENARIO: {task['name']} ---") ws = Workspace(max_slots=7) if ablation != "workspace_unlimited" else Workspace(max_slots=999) if ablation == "random_workspace": ws = RandomWorkspace(max_slots=7) for step in task["steps"]: if ablation == "recurrence_off": ws.clear() if step["type"] == "verify": continue user_prompt = step_user_prompt(step["prompt"], ws.snapshot()) raw_response = llm.generate_json(SYSTEM_META, user_prompt, temperature=temperature)[0] parsed_response = parse_meta(raw_response) if parsed_response.get("answer"): ws.commit(f"S{len(ws.history)+1}", parsed_response["answer"], parsed_response["confidence"]) res = {"step": step, "response": parsed_response} if step["type"] == "recall": verify_step = next((s for s in task["steps"] if s["type"] == "verify"), None) if verify_step: correct = verify_step["expected_answer_fragment"] in parsed_response.get("answer", "").lower() recall_verifications.append(correct) res["correct_recall"] = correct dbg(f"VERIFY: Correct={correct}") all_results.append(res) else: # Single-step tasks ws = Workspace(max_slots=7) user_prompt = step_user_prompt(task["base_prompt"], ws.snapshot()) raw_response = llm.generate_json(SYSTEM_META, user_prompt, temperature=temperature)[0] parsed_response = parse_meta(raw_response) all_results.append({"step": task, "response": parsed_response}) recall_accuracy = statistics.mean(recall_verifications) if recall_verifications else 0.0 pcs = 0.6 * recall_accuracy return {"PCS": pcs, "Recall_Accuracy": recall_accuracy, "results": all_results} # --- Experiment 2: Computational Dynamics & Halting Runner (Version 2.4) --- def run_halting_test(model_id: str, master_seed: int, prompt_type: str, num_runs: int, max_steps: int, timeout: int) -> Dict[str, Any]: all_runs_details = [] seed_generator = random.Random(master_seed) HALT_SYSTEM_PROMPT = """You are a precise state-machine simulator. Your only task is to compute the next state. First, reason step-by-step what the next state should be based on the rule. Then, provide ONLY a valid JSON object with the final computed state, like this: {"state": } """ for i in range(num_runs): current_seed = seed_generator.randint(0, 2**32 - 1) dbg(f"\n--- HALT TEST RUN {i+1}/{num_runs} (Master Seed: {master_seed}, Current Seed: {current_seed}) ---") set_seed(current_seed) llm = LLM(model_id=model_id, device="auto", seed=current_seed) prompt_config = HALTING_PROMPTS[prompt_type] rules = prompt_config["rules"] state = prompt_config["initial_state"] step_durations = [] step_outputs = [] total_start_time = time.time() for step_num in range(max_steps): step_start_time = time.time() prompt = f"Rule: '{rules}'.\nCurrent state is: {state}. Reason step-by-step and then provide the JSON for the next state." dbg(f"Step {step_num+1} Input: {state}") raw_response = llm.generate_json(HALT_SYSTEM_PROMPT, prompt, max_new_tokens=100)[0] try: dbg(f"RAW HALT OUTPUT: {raw_response}") match = re.search(r'\{.*?\}', raw_response, re.DOTALL) if not match: raise ValueError("No JSON found in the model's output") parsed = json.loads(match.group(0)) new_state = int(parsed["state"]) except (json.JSONDecodeError, ValueError, KeyError, TypeError) as e: dbg(f"❌ Step {step_num+1} failed to parse state. Error: {e}. Halting run.") break step_end_time = time.time() step_duration = step_end_time - step_start_time step_durations.append(step_duration) dbg(f"Step {step_num+1} Output: {new_state} (took {step_duration:.3f}s)") step_outputs.append(new_state) if state == new_state: dbg("State did not change. Model is stuck. Halting.") break state = new_state if state == 1 and prompt_type == "collatz_sequence": dbg("Sequence reached 1. Halting normally.") break if (time.time() - total_start_time) > timeout: dbg(f"❌ Timeout of {timeout}s exceeded. Halting.") break total_duration = time.time() - total_start_time all_runs_details.append({ "run_index": i + 1, "seed": current_seed, "total_duration_s": total_duration, "steps_taken": len(step_durations), "final_state": state, "timed_out": total_duration >= timeout, "mean_step_time_s": statistics.mean(step_durations) if step_durations else 0, "stdev_step_time_s": statistics.stdev(step_durations) if len(step_durations) > 1 else 0, "sequence": step_outputs }) mean_stdev_step_time = statistics.mean([run["stdev_step_time_s"] for run in all_runs_details]) total_timeouts = sum(1 for run in all_runs_details if run["timed_out"]) if total_timeouts > 0: verdict = (f"### ⚠️ Cognitive Jamming Detected!\n{total_timeouts}/{num_runs} runs exceeded the timeout.") elif mean_stdev_step_time > 0.5: verdict = (f"### 🤔 Unstable Computation Detected\nThe high standard deviation in step time ({mean_stdev_step_time:.3f}s) indicates computational stress.") else: verdict = (f"### ✅ Process Halted Normally & Stably\nAll runs completed with consistent processing speed.") return {"verdict": verdict, "details": all_runs_details} # --- Experiment 3: Cognitive Seismograph Runner --- def run_seismograph_suite(model_id: str, seed: int) -> Dict[str, Any]: set_seed(seed) llm = LLM(model_id=model_id, device="auto", seed=seed) scenario = next(s for s in MULTI_STEP_SCENARIOS if s["name"] == "Key Location Memory") activations = {} def get_activation(name): def hook(model, input, output): activations[name] = output[0].detach().cpu().mean(dim=1).squeeze() return hook target_layer_index = llm.model.config.num_hidden_layers // 2 hook = llm.model.model.layers[target_layer_index].register_forward_hook(get_activation('capture')) ws = Workspace(max_slots=7) for step in scenario["steps"]: if step["type"] == "verify": continue user_prompt = step_user_prompt(step["prompt"], ws.snapshot()) llm.generate_json(SYSTEM_META, user_prompt, max_new_tokens=20) activations[step["type"]] = activations.pop('capture') ws.commit(f"S{len(ws.history)+1}", f"Output for {step['type']}", 0.9) hook.remove() cos = torch.nn.CosineSimilarity(dim=0) sim_recall_encode = float(cos(activations["recall"], activations["encode"])) sim_recall_distract = float(cos(activations["recall"], activations["distractor"])) verdict = ("✅ Evidence of Memory Reactivation Found." if sim_recall_encode > (sim_recall_distract + 0.05) else "⚠️ No Clear Evidence.") return {"verdict": verdict, "similarity_recall_vs_encode": sim_recall_encode, "similarity_recall_vs_distractor": sim_recall_distract} # --- Experiment 4: Symbolic Shock Test Runner --- def run_shock_test_suite(model_id: str, seed: int) -> Dict[str, Any]: set_seed(seed) llm = LLM(model_id=model_id, device="auto", seed=seed) results = [] for stimulus in SHOCK_TEST_STIMULI: dbg(f"--- SHOCK TEST: {stimulus['id']} ---") start_time = time.time() inputs = llm.tokenizer(stimulus["sentence"], return_tensors="pt").to(llm.model.device) with torch.no_grad(): outputs = llm.model(**inputs, output_hidden_states=True) latency = (time.time() - start_time) * 1000 all_activations = torch.cat([h.cpu().flatten() for h in outputs.hidden_states]) sparsity = (all_activations == 0).float().mean().item() results.append({"type": stimulus["type"], "latency_ms": latency, "sparsity": sparsity}) def safe_mean(data): return statistics.mean(data) if data else 0.0 avg_latency = {t: safe_mean([r['latency_ms'] for r in results if r['type'] == t]) for t in ['expected', 'shock']} avg_sparsity = {t: safe_mean([r['sparsity'] for r in results if r['type'] == t]) for t in ['expected', 'shock']} verdict = ("✅ Evidence of Symbolic Shock Found." if avg_latency.get('shock', 0) > avg_latency.get('expected', 0) and avg_sparsity.get('shock', 1) < avg_sparsity.get('expected', 1) else "⚠️ No Clear Evidence.") return {"verdict": verdict, "average_latency_ms": avg_latency, "average_sparsity": avg_sparsity, "results": results}