Spaces:
Running
Running
| from typing import Dict, Optional, List | |
| from datetime import datetime, timezone | |
| import logging | |
| from .models import Robot, RobotStatus, DriverJointState | |
| logger = logging.getLogger(__name__) | |
| class RobotManager: | |
| """Manages robot lifecycle and state""" | |
| def __init__(self): | |
| self.robots: Dict[str, Robot] = {} | |
| async def create_robot(self, robot_id: str, robot_type: str, name: str) -> Robot: | |
| """Create a new robot""" | |
| if robot_id in self.robots: | |
| raise ValueError(f"Robot {robot_id} already exists") | |
| # Create demo joint configuration for so-arm100 | |
| joints = self._create_demo_joints(robot_type) | |
| robot = Robot( | |
| id=robot_id, | |
| name=name, | |
| robot_type=robot_type, | |
| created_at=datetime.now(timezone.utc), | |
| joints=joints | |
| ) | |
| self.robots[robot_id] = robot | |
| logger.info(f"Created robot {robot_id} ({robot_type}) with {len(joints)} joints") | |
| return robot | |
| def get_robot(self, robot_id: str) -> Optional[Robot]: | |
| """Get robot by ID""" | |
| return self.robots.get(robot_id) | |
| async def delete_robot(self, robot_id: str): | |
| """Delete a robot""" | |
| if robot_id in self.robots: | |
| del self.robots[robot_id] | |
| logger.info(f"Deleted robot {robot_id}") | |
| async def set_master_connected(self, robot_id: str, connection_id: str): | |
| """Mark robot as having a master connection""" | |
| robot = self.robots.get(robot_id) | |
| if robot: | |
| robot.master_connected = True | |
| robot.master_connection_id = connection_id | |
| robot.last_command_source = "master" | |
| robot.last_command_time = datetime.now(timezone.utc) | |
| async def set_master_disconnected(self, robot_id: str): | |
| """Mark robot as having no master connection""" | |
| robot = self.robots.get(robot_id) | |
| if robot: | |
| robot.master_connected = False | |
| robot.master_connection_id = None | |
| robot.last_command_source = "none" | |
| async def add_slave_connection(self, robot_id: str, connection_id: str): | |
| """Add a slave connection to robot""" | |
| robot = self.robots.get(robot_id) | |
| if robot: | |
| if connection_id not in robot.slave_connections: | |
| robot.slave_connections.append(connection_id) | |
| logger.info(f"Added slave {connection_id} to robot {robot_id} ({len(robot.slave_connections)} total)") | |
| async def remove_slave_connection(self, robot_id: str, connection_id: str): | |
| """Remove a slave connection from robot""" | |
| robot = self.robots.get(robot_id) | |
| if robot: | |
| try: | |
| robot.slave_connections.remove(connection_id) | |
| logger.info(f"Removed slave {connection_id} from robot {robot_id} ({len(robot.slave_connections)} remaining)") | |
| except ValueError: | |
| logger.warning(f"Slave {connection_id} not found in robot {robot_id} connections") | |
| def get_robot_status(self, robot_id: str) -> Optional[RobotStatus]: | |
| """Get robot connection status""" | |
| robot = self.robots.get(robot_id) | |
| if not robot: | |
| return None | |
| return RobotStatus( | |
| robot_id=robot_id, | |
| master_connected=robot.master_connected, | |
| slave_count=len(robot.slave_connections), | |
| last_command_source=robot.last_command_source, | |
| last_command_time=robot.last_command_time, | |
| last_seen=datetime.now(timezone.utc) | |
| ) | |
| def list_robots_with_masters(self) -> List[Robot]: | |
| """Get all robots that have master connections""" | |
| return [robot for robot in self.robots.values() if robot.master_connected] | |
| def list_robots_with_slaves(self) -> List[Robot]: | |
| """Get all robots that have slave connections""" | |
| return [robot for robot in self.robots.values() if robot.slave_connections] | |
| def _create_demo_joints(self, robot_type: str) -> List[DriverJointState]: | |
| """Create demo joint configuration based on robot type""" | |
| if robot_type == "so-arm100": | |
| return [ | |
| DriverJointState( | |
| name="Rotation", | |
| servo_id=1, | |
| type="revolute", | |
| virtual_value=0.0, | |
| real_value=0.0 | |
| ), | |
| DriverJointState( | |
| name="Pitch", | |
| servo_id=2, | |
| type="revolute", | |
| virtual_value=0.0, | |
| real_value=0.0 | |
| ), | |
| DriverJointState( | |
| name="Elbow", | |
| servo_id=3, | |
| type="revolute", | |
| virtual_value=0.0, | |
| real_value=0.0 | |
| ), | |
| DriverJointState( | |
| name="Wrist_Pitch", | |
| servo_id=4, | |
| type="revolute", | |
| virtual_value=0.0, | |
| real_value=0.0 | |
| ), | |
| DriverJointState( | |
| name="Wrist_Roll", | |
| servo_id=5, | |
| type="revolute", | |
| virtual_value=0.0, | |
| real_value=0.0 | |
| ), | |
| DriverJointState( | |
| name="Jaw", | |
| servo_id=6, | |
| type="revolute", | |
| virtual_value=0.0, | |
| real_value=0.0 | |
| ) | |
| ] | |
| else: | |
| # Default generic robot | |
| return [ | |
| DriverJointState( | |
| name="joint_1", | |
| servo_id=1, | |
| type="revolute", | |
| virtual_value=0.0, | |
| real_value=0.0 | |
| ), | |
| DriverJointState( | |
| name="joint_2", | |
| servo_id=2, | |
| type="revolute", | |
| virtual_value=0.0, | |
| real_value=0.0 | |
| ) | |
| ] |