Spaces:
Sleeping
Sleeping
| # bp_phi/memory.py | |
| import random | |
| from typing import Dict, Any, List | |
| class WorkspaceManager: | |
| """A stateful, external workspace that the LLM agent can interact with via tools.""" | |
| def __init__(self, max_slots: int = 7, is_random: bool = False): | |
| self.max_slots = max_slots | |
| self.is_random = is_random | |
| self.slots: Dict[str, str] = {} | |
| def write(self, key: str, content: str) -> str: | |
| """Writes content to a slot, handling capacity limits.""" | |
| if len(self.slots) >= self.max_slots and key not in self.slots: | |
| if self.is_random: | |
| evict_key = random.choice(list(self.slots.keys())) | |
| else: | |
| # Simple FIFO eviction for non-random | |
| evict_key = next(iter(self.slots)) | |
| del self.slots[evict_key] | |
| self.slots[key] = content | |
| return f"Success: Wrote to slot '{key}'." | |
| def read(self, key: str) -> str: | |
| """Reads content from a slot.""" | |
| return self.slots.get(key, f"Error: Slot '{key}' is empty.") | |
| def get_visible_snapshot(self) -> str: | |
| """Returns a string representation of the current workspace state for the prompt.""" | |
| if not self.slots: | |
| return "Workspace is empty." | |
| return "\n".join([f"- Slot '{k}': '{v[:100]}...'" for k, v in self.slots.items()]) | |
| def clear(self): | |
| """Empties the entire workspace.""" | |
| self.slots.clear() | |