""" Unified Movement Manager for Reachy Mini. This module provides a centralized control system for robot movements, inspired by the reachy_mini_conversation_app architecture. Key features: - Single 100Hz control loop (prevents race conditions) - Command queue pattern (thread-safe external API) - Error throttling (prevents log explosion) - Speech-driven head sway - Breathing animation during idle - Graceful shutdown """ import logging import math import threading import time from dataclasses import dataclass, field from enum import Enum from queue import Queue, Empty from typing import Any, Callable, Dict, List, Optional, Tuple, TYPE_CHECKING import numpy as np from scipy.spatial.transform import Rotation as R if TYPE_CHECKING: from reachy_mini import ReachyMini logger = logging.getLogger(__name__) # ============================================================================= # Constants (borrowed from conversation_app) # ============================================================================= CONTROL_LOOP_FREQUENCY_HZ = 100 # 100Hz control loop TARGET_PERIOD = 1.0 / CONTROL_LOOP_FREQUENCY_HZ # Speech sway parameters (from conversation_app SwayRollRT) # Rotation amplitudes SWAY_A_PITCH_DEG = 4.5 # Pitch amplitude (degrees) SWAY_A_YAW_DEG = 7.5 # Yaw amplitude SWAY_A_ROLL_DEG = 2.25 # Roll amplitude # Rotation frequencies SWAY_F_PITCH = 2.2 # Pitch frequency Hz SWAY_F_YAW = 0.6 # Yaw frequency SWAY_F_ROLL = 1.3 # Roll frequency # Translation amplitudes (mm -> m) SWAY_A_X_MM = 4.5 # X amplitude (mm) SWAY_A_Y_MM = 3.75 # Y amplitude (mm) SWAY_A_Z_MM = 2.25 # Z amplitude (mm) # Translation frequencies SWAY_F_X = 0.35 # X frequency Hz SWAY_F_Y = 0.45 # Y frequency Hz SWAY_F_Z = 0.25 # Z frequency Hz # Master scale SWAY_MASTER = 1.5 # Overall sway intensity multiplier # Breathing parameters BREATHING_Z_AMPLITUDE = 0.005 # 5mm BREATHING_FREQUENCY = 0.1 # 0.1Hz (6 breaths per minute) ANTENNA_SWAY_AMPLITUDE_DEG = 15.0 # 15 degrees ANTENNA_FREQUENCY = 0.5 # 0.5Hz # VAD parameters for speech detection VAD_DB_ON = -35 # Start detection threshold VAD_DB_OFF = -45 # Stop detection threshold # Antenna freeze parameters (listening mode) ANTENNA_BLEND_DURATION = 0.5 # Seconds to blend back from frozen state class RobotState(Enum): """Robot state machine states.""" IDLE = "idle" LISTENING = "listening" THINKING = "thinking" SPEAKING = "speaking" @dataclass class MovementState: """Internal movement state (only modified by control loop).""" # Current robot state robot_state: RobotState = RobotState.IDLE # Speech sway offsets (radians for rotation, meters for translation) speech_pitch: float = 0.0 speech_yaw: float = 0.0 speech_roll: float = 0.0 speech_x: float = 0.0 speech_y: float = 0.0 speech_z: float = 0.0 # Breathing offsets breathing_z: float = 0.0 breathing_antenna_left: float = 0.0 breathing_antenna_right: float = 0.0 # Target pose (from actions) target_pitch: float = 0.0 target_yaw: float = 0.0 target_roll: float = 0.0 target_x: float = 0.0 target_y: float = 0.0 target_z: float = 0.0 target_antenna_left: float = 0.0 target_antenna_right: float = 0.0 target_body_yaw: float = 0.0 # Timing last_activity_time: float = 0.0 idle_start_time: float = 0.0 # Speech sway state sway_time: float = 0.0 sway_envelope: float = 0.0 # 0-1, smoothed VAD sway_phase_pitch: float = 0.0 sway_phase_yaw: float = 0.0 sway_phase_roll: float = 0.0 sway_phase_x: float = 0.0 sway_phase_y: float = 0.0 sway_phase_z: float = 0.0 # Breathing state breathing_time: float = 0.0 breathing_active: bool = False # Antenna freeze state (listening mode) antenna_frozen: bool = False frozen_antenna_left: float = 0.0 frozen_antenna_right: float = 0.0 antenna_blend: float = 1.0 # 0=frozen, 1=normal antenna_blend_start_time: float = 0.0 @dataclass class PendingAction: """A pending motion action.""" name: str target_pitch: float = 0.0 target_yaw: float = 0.0 target_roll: float = 0.0 target_x: float = 0.0 target_y: float = 0.0 target_z: float = 0.0 duration: float = 0.5 callback: Optional[Callable] = None class SpeechSwayGenerator: """ Generates speech-driven head sway based on audio loudness. Uses multiple sine wave oscillators at different frequencies to create natural-looking "Lissajous" motion. Includes both rotation (pitch/yaw/roll) and translation (x/y/z). """ def __init__(self, seed: int = 7): # Random initial phases (avoid mechanical feel) rng = np.random.default_rng(seed) self.phase_pitch = float(rng.random() * 2 * math.pi) self.phase_yaw = float(rng.random() * 2 * math.pi) self.phase_roll = float(rng.random() * 2 * math.pi) self.phase_x = float(rng.random() * 2 * math.pi) self.phase_y = float(rng.random() * 2 * math.pi) self.phase_z = float(rng.random() * 2 * math.pi) # State self.t = 0.0 self.vad_on = False self.envelope = 0.0 # Smoothed VAD [0, 1] self.last_db = -100.0 def reset(self): """Reset state.""" self.t = 0.0 self.vad_on = False self.envelope = 0.0 self.last_db = -100.0 def update(self, dt: float, loudness_db: float) -> Tuple[float, float, float, float, float, float]: """ Update sway based on audio loudness. Args: dt: Time delta in seconds loudness_db: Audio loudness in dBFS Returns: Tuple of (pitch_rad, yaw_rad, roll_rad, x_m, y_m, z_m) offsets """ self.t += dt self.last_db = loudness_db # VAD detection with hysteresis if loudness_db >= VAD_DB_ON: self.vad_on = True elif loudness_db <= VAD_DB_OFF: self.vad_on = False # Smooth envelope target = 1.0 if self.vad_on else 0.0 self.envelope += 0.1 * (target - self.envelope) self.envelope = max(0.0, min(1.0, self.envelope)) # Loudness mapping: -50dB -> 0, -20dB -> 1 loud = max(0.0, min(1.0, (loudness_db + 50) / 30)) # Apply master scale env = self.envelope * SWAY_MASTER # Generate rotation sway pitch = (math.radians(SWAY_A_PITCH_DEG) * loud * env * math.sin(2 * math.pi * SWAY_F_PITCH * self.t + self.phase_pitch)) yaw = (math.radians(SWAY_A_YAW_DEG) * loud * env * math.sin(2 * math.pi * SWAY_F_YAW * self.t + self.phase_yaw)) roll = (math.radians(SWAY_A_ROLL_DEG) * loud * env * math.sin(2 * math.pi * SWAY_F_ROLL * self.t + self.phase_roll)) # Generate translation sway (mm -> m) x = (SWAY_A_X_MM / 1000.0 * loud * env * math.sin(2 * math.pi * SWAY_F_X * self.t + self.phase_x)) y = (SWAY_A_Y_MM / 1000.0 * loud * env * math.sin(2 * math.pi * SWAY_F_Y * self.t + self.phase_y)) z = (SWAY_A_Z_MM / 1000.0 * loud * env * math.sin(2 * math.pi * SWAY_F_Z * self.t + self.phase_z)) return pitch, yaw, roll, x, y, z class BreathingAnimation: """ Generates idle breathing animation. Creates subtle Z-axis movement and antenna sway to make the robot appear more alive when idle. """ def __init__(self): self.t = 0.0 self.active = False self.blend = 0.0 # Blend factor for smooth transitions def reset(self): """Reset animation.""" self.t = 0.0 self.blend = 0.0 def set_active(self, active: bool): """Set whether breathing is active.""" self.active = active def update(self, dt: float) -> Tuple[float, float, float]: """ Update breathing animation. Args: dt: Time delta in seconds Returns: Tuple of (z_offset_m, antenna_left_rad, antenna_right_rad) """ self.t += dt # Smooth blend in/out target_blend = 1.0 if self.active else 0.0 blend_speed = 0.5 # Blend over ~2 seconds self.blend += blend_speed * dt * (target_blend - self.blend) self.blend = max(0.0, min(1.0, self.blend)) if self.blend < 0.001: return 0.0, 0.0, 0.0 # Z breathing z_offset = (BREATHING_Z_AMPLITUDE * self.blend * math.sin(2 * math.pi * BREATHING_FREQUENCY * self.t)) # Antenna sway (opposite directions for natural look) antenna_angle = (math.radians(ANTENNA_SWAY_AMPLITUDE_DEG) * self.blend * math.sin(2 * math.pi * ANTENNA_FREQUENCY * self.t)) return z_offset, antenna_angle, -antenna_angle class MovementManager: """ Unified movement manager with 100Hz control loop. All external interactions go through the command queue, ensuring thread safety and preventing race conditions. """ def __init__(self, reachy_mini: Optional["ReachyMini"] = None): self.robot = reachy_mini self._now = time.monotonic # Command queue - all external threads communicate through this self._command_queue: Queue[Tuple[str, Any]] = Queue() # Internal state (only modified by control loop) self.state = MovementState() self.state.last_activity_time = self._now() self.state.idle_start_time = self._now() # Initialize random phases for sway rng = np.random.default_rng(42) self.state.sway_phase_pitch = float(rng.random() * 2 * math.pi) self.state.sway_phase_yaw = float(rng.random() * 2 * math.pi) self.state.sway_phase_roll = float(rng.random() * 2 * math.pi) # Sub-modules self._speech_sway = SpeechSwayGenerator() self._breathing = BreathingAnimation() # Thread control self._stop_event = threading.Event() self._thread: Optional[threading.Thread] = None # Error throttling self._last_error_time = 0.0 self._error_interval = 1.0 # Log at most once per second self._suppressed_errors = 0 # Connection health tracking self._connection_lost = False self._last_successful_command = self._now() self._connection_timeout = 5.0 # 5 seconds without success = connection lost self._reconnect_attempt_interval = 10.0 # Try reconnecting every 10 seconds self._last_reconnect_attempt = 0.0 # Pending action self._pending_action: Optional[PendingAction] = None self._action_start_time: float = 0.0 self._action_start_pose: Dict[str, float] = {} # Audio loudness (updated externally) self._audio_loudness_db: float = -100.0 self._audio_lock = threading.Lock() logger.info("MovementManager initialized") # ========================================================================= # Thread-safe public API (called from any thread) # ========================================================================= def set_state(self, new_state: RobotState) -> None: """Thread-safe: Set robot state.""" self._command_queue.put(("set_state", new_state)) def set_listening(self, listening: bool) -> None: """Thread-safe: Set listening state.""" state = RobotState.LISTENING if listening else RobotState.IDLE self._command_queue.put(("set_state", state)) def set_thinking(self) -> None: """Thread-safe: Set thinking state.""" self._command_queue.put(("set_state", RobotState.THINKING)) def set_speaking(self, speaking: bool) -> None: """Thread-safe: Set speaking state.""" state = RobotState.SPEAKING if speaking else RobotState.IDLE self._command_queue.put(("set_state", state)) def set_idle(self) -> None: """Thread-safe: Return to idle state.""" self._command_queue.put(("set_state", RobotState.IDLE)) def queue_action(self, action: PendingAction) -> None: """Thread-safe: Queue a motion action.""" self._command_queue.put(("action", action)) def turn_to_angle(self, yaw_deg: float, duration: float = 0.8) -> None: """Thread-safe: Turn head to face a direction.""" action = PendingAction( name="turn_to", target_yaw=math.radians(yaw_deg), duration=duration, ) self._command_queue.put(("action", action)) def nod(self, amplitude_deg: float = 15, duration: float = 0.5) -> None: """Thread-safe: Perform a nod gesture.""" self._command_queue.put(("nod", (amplitude_deg, duration))) def shake(self, amplitude_deg: float = 20, duration: float = 0.5) -> None: """Thread-safe: Perform a head shake gesture.""" self._command_queue.put(("shake", (amplitude_deg, duration))) def update_audio_loudness(self, loudness_db: float) -> None: """Thread-safe: Update audio loudness for speech sway.""" with self._audio_lock: self._audio_loudness_db = loudness_db def reset_to_neutral(self, duration: float = 0.5) -> None: """Thread-safe: Reset to neutral position.""" action = PendingAction( name="neutral", target_pitch=0.0, target_yaw=0.0, target_roll=0.0, target_x=0.0, target_y=0.0, target_z=0.0, duration=duration, ) self._command_queue.put(("action", action)) # ========================================================================= # Internal: Command processing (runs in control loop) # ========================================================================= def _poll_commands(self) -> None: """Process all pending commands from the queue.""" while True: try: cmd, payload = self._command_queue.get_nowait() except Empty: break self._handle_command(cmd, payload) def _handle_command(self, cmd: str, payload: Any) -> None: """Handle a single command.""" if cmd == "set_state": old_state = self.state.robot_state self.state.robot_state = payload self.state.last_activity_time = self._now() # State transition logic if payload == RobotState.IDLE and old_state != RobotState.IDLE: self.state.idle_start_time = self._now() self._breathing.set_active(True) self._speech_sway.reset() # Unfreeze antennas when returning to idle self._start_antenna_unfreeze() elif payload != RobotState.IDLE: self._breathing.set_active(False) if payload == RobotState.SPEAKING: self._speech_sway.reset() # Freeze antennas when entering listening mode if payload == RobotState.LISTENING: self._freeze_antennas() elif old_state == RobotState.LISTENING and payload != RobotState.LISTENING: # Start unfreezing when leaving listening mode self._start_antenna_unfreeze() logger.debug("State changed: %s -> %s", old_state.value, payload.value) elif cmd == "action": self._start_action(payload) elif cmd == "nod": amplitude_deg, duration = payload self._do_nod(amplitude_deg, duration) elif cmd == "shake": amplitude_deg, duration = payload self._do_shake(amplitude_deg, duration) def _start_action(self, action: PendingAction) -> None: """Start a new motion action.""" self._pending_action = action self._action_start_time = self._now() self._action_start_pose = { "pitch": self.state.target_pitch, "yaw": self.state.target_yaw, "roll": self.state.target_roll, "x": self.state.target_x, "y": self.state.target_y, "z": self.state.target_z, } logger.debug("Starting action: %s", action.name) def _do_nod(self, amplitude_deg: float, duration: float) -> None: """Execute nod gesture (blocking in control loop context).""" # This is simplified - in production, use action queue amplitude_rad = math.radians(amplitude_deg) half_duration = duration / 2 # Nod down action_down = PendingAction( name="nod_down", target_pitch=amplitude_rad, duration=half_duration, ) self._start_action(action_down) def _do_shake(self, amplitude_deg: float, duration: float) -> None: """Execute shake gesture (blocking in control loop context).""" amplitude_rad = math.radians(amplitude_deg) half_duration = duration / 2 # Shake left action_left = PendingAction( name="shake_left", target_yaw=-amplitude_rad, duration=half_duration, ) self._start_action(action_left) # ========================================================================= # Internal: Motion updates (runs in control loop) # ========================================================================= def _update_action(self, dt: float) -> None: """Update pending action interpolation.""" if self._pending_action is None: return elapsed = self._now() - self._action_start_time progress = min(1.0, elapsed / self._pending_action.duration) # Smooth interpolation (ease in-out) t = progress * progress * (3 - 2 * progress) # Interpolate pose start = self._action_start_pose action = self._pending_action self.state.target_pitch = start["pitch"] + t * (action.target_pitch - start["pitch"]) self.state.target_yaw = start["yaw"] + t * (action.target_yaw - start["yaw"]) self.state.target_roll = start["roll"] + t * (action.target_roll - start["roll"]) self.state.target_x = start["x"] + t * (action.target_x - start["x"]) self.state.target_y = start["y"] + t * (action.target_y - start["y"]) self.state.target_z = start["z"] + t * (action.target_z - start["z"]) # Action complete if progress >= 1.0: if self._pending_action.callback: try: self._pending_action.callback() except Exception as e: logger.error("Action callback error: %s", e) self._pending_action = None def _update_speech_sway(self, dt: float) -> None: """Update speech-driven head sway.""" if self.state.robot_state != RobotState.SPEAKING: # Decay sway when not speaking self.state.speech_pitch *= 0.9 self.state.speech_yaw *= 0.9 self.state.speech_roll *= 0.9 self.state.speech_x *= 0.9 self.state.speech_y *= 0.9 self.state.speech_z *= 0.9 return # Get current audio loudness with self._audio_lock: loudness_db = self._audio_loudness_db # Update sway generator pitch, yaw, roll, x, y, z = self._speech_sway.update(dt, loudness_db) self.state.speech_pitch = pitch self.state.speech_yaw = yaw self.state.speech_roll = roll self.state.speech_x = x self.state.speech_y = y self.state.speech_z = z def _update_breathing(self, dt: float) -> None: """Update breathing animation.""" z_offset, antenna_left, antenna_right = self._breathing.update(dt) self.state.breathing_z = z_offset self.state.breathing_antenna_left = antenna_left self.state.breathing_antenna_right = antenna_right def _freeze_antennas(self) -> None: """Freeze antennas at current position (for listening mode).""" # Capture current antenna positions current_left = self.state.target_antenna_left + self.state.breathing_antenna_left current_right = self.state.target_antenna_right + self.state.breathing_antenna_right self.state.antenna_frozen = True self.state.frozen_antenna_left = current_left self.state.frozen_antenna_right = current_right self.state.antenna_blend = 0.0 # Fully frozen logger.debug("Antennas frozen at left=%.2f, right=%.2f", math.degrees(current_left), math.degrees(current_right)) def _start_antenna_unfreeze(self) -> None: """Start unfreezing antennas (smooth blend back to normal).""" if not self.state.antenna_frozen: return self.state.antenna_blend_start_time = self._now() logger.debug("Starting antenna unfreeze") def _update_antenna_blend(self, dt: float) -> None: """Update antenna blend state for smooth unfreezing.""" if not self.state.antenna_frozen: return if self.state.antenna_blend >= 1.0: # Fully unfrozen self.state.antenna_frozen = False return # Calculate blend progress elapsed = self._now() - self.state.antenna_blend_start_time if elapsed > 0: self.state.antenna_blend = min(1.0, elapsed / ANTENNA_BLEND_DURATION) if self.state.antenna_blend >= 1.0: self.state.antenna_frozen = False logger.debug("Antennas unfrozen") def _compose_final_pose(self) -> Dict[str, float]: """Compose final pose from all sources.""" # Primary pose (from actions) pitch = self.state.target_pitch yaw = self.state.target_yaw roll = self.state.target_roll x = self.state.target_x y = self.state.target_y z = self.state.target_z # Add speech sway (secondary) - rotation and translation pitch += self.state.speech_pitch yaw += self.state.speech_yaw roll += self.state.speech_roll x += self.state.speech_x y += self.state.speech_y z += self.state.speech_z # Add breathing z += self.state.breathing_z # Antenna pose with freeze blending target_antenna_left = self.state.target_antenna_left + self.state.breathing_antenna_left target_antenna_right = self.state.target_antenna_right + self.state.breathing_antenna_right # Apply antenna freeze blending (listening mode) blend = self.state.antenna_blend if blend < 1.0: # Blend between frozen position and target position antenna_left = (self.state.frozen_antenna_left * (1.0 - blend) + target_antenna_left * blend) antenna_right = (self.state.frozen_antenna_right * (1.0 - blend) + target_antenna_right * blend) else: antenna_left = target_antenna_left antenna_right = target_antenna_right return { "pitch": pitch, "yaw": yaw, "roll": roll, "x": x, "y": y, "z": z, "antenna_left": antenna_left, "antenna_right": antenna_right, "body_yaw": self.state.target_body_yaw, } # ========================================================================= # Internal: Robot control (runs in control loop) # ========================================================================= def _issue_control_command(self, pose: Dict[str, float]) -> None: """Send control command to robot with error throttling and connection health tracking.""" if self.robot is None: return # Check if connection is lost and we should skip sending commands now = self._now() if self._connection_lost: time_since_last_success = now - self._last_successful_command # Only attempt reconnection every N seconds if now - self._last_reconnect_attempt >= self._reconnect_attempt_interval: self._last_reconnect_attempt = now logger.info("Attempting to send command after connection loss...") else: # Skip sending commands to reduce error spam return try: # Build head pose matrix rotation = R.from_euler('xyz', [ pose["pitch"], pose["roll"], # Note: SDK uses different order pose["yaw"], ]) head_pose = np.eye(4) head_pose[:3, :3] = rotation.as_matrix() head_pose[0, 3] = pose["x"] head_pose[1, 3] = pose["y"] head_pose[2, 3] = pose["z"] # Send to robot (single control point!) self.robot.set_target( head=head_pose, antennas=[pose["antenna_right"], pose["antenna_left"]], body_yaw=pose["body_yaw"], ) # Command succeeded - update connection health self._last_successful_command = now if self._connection_lost: logger.info("✓ Connection to robot restored") self._connection_lost = False self._suppressed_errors = 0 # Reset error counter except Exception as e: error_msg = str(e) # Check if this is a connection error if "Lost connection" in error_msg or "ZError" in error_msg: time_since_last_success = now - self._last_successful_command if not self._connection_lost and time_since_last_success > self._connection_timeout: # First time detecting connection loss logger.error(f"✗ Lost connection to robot daemon: {error_msg}") logger.error(" Troubleshooting steps:") logger.error(" 1. Check if Reachy Mini Daemon is running: sudo systemctl status reachy-mini-daemon") logger.error(" 2. Verify Zenoh service on port 7447: netstat -tlnp | grep 7447") logger.error(" 3. Check robot hardware connections") logger.error(" 4. Review daemon logs: sudo journalctl -u reachy-mini-daemon -n 50") logger.error(f" Will retry connection every {self._reconnect_attempt_interval}s...") self._connection_lost = True self._last_reconnect_attempt = now elif self._connection_lost: # Already in lost state, use throttled logging self._log_error_throttled(f"Connection still lost: {error_msg}") else: # Transient error, not yet considered lost self._log_error_throttled(f"Failed to set robot target: {error_msg}") else: # Non-connection error self._log_error_throttled(f"Failed to set robot target: {error_msg}") def _log_error_throttled(self, message: str) -> None: """Log error with throttling to prevent log explosion.""" now = self._now() if now - self._last_error_time >= self._error_interval: if self._suppressed_errors > 0: message += f" (suppressed {self._suppressed_errors} repeats)" self._suppressed_errors = 0 logger.error(message) self._last_error_time = now else: self._suppressed_errors += 1 # ========================================================================= # Control loop # ========================================================================= def _control_loop(self) -> None: """Main 100Hz control loop.""" logger.info("Movement manager control loop started (%.0f Hz)", CONTROL_LOOP_FREQUENCY_HZ) last_time = self._now() while not self._stop_event.is_set(): loop_start = self._now() dt = loop_start - last_time last_time = loop_start try: # 1. Process commands from queue self._poll_commands() # 2. Update action interpolation self._update_action(dt) # 3. Update speech sway self._update_speech_sway(dt) # 4. Update breathing animation self._update_breathing(dt) # 5. Update antenna blend (listening mode freeze/unfreeze) self._update_antenna_blend(dt) # 6. Compose final pose pose = self._compose_final_pose() # 7. Send to robot (single control point!) self._issue_control_command(pose) except Exception as e: self._log_error_throttled(f"Control loop error: {e}") # Adaptive sleep elapsed = self._now() - loop_start sleep_time = max(0.0, TARGET_PERIOD - elapsed) if sleep_time > 0: time.sleep(sleep_time) logger.info("Movement manager control loop stopped") # ========================================================================= # Lifecycle # ========================================================================= def start(self) -> None: """Start the control loop.""" if self._thread is not None and self._thread.is_alive(): logger.warning("Movement manager already running") return self._stop_event.clear() self._thread = threading.Thread( target=self._control_loop, daemon=True, name="MovementManager", ) self._thread.start() logger.info("Movement manager started") def stop(self) -> None: """Stop the control loop and reset robot.""" if self._thread is None or not self._thread.is_alive(): return logger.info("Stopping movement manager...") # Signal stop self._stop_event.set() # Wait for thread self._thread.join(timeout=2.0) if self._thread.is_alive(): logger.warning("Movement manager thread did not stop in time") # Reset robot to neutral self._reset_to_neutral_blocking() logger.info("Movement manager stopped") def _reset_to_neutral_blocking(self) -> None: """Reset robot to neutral position (blocking).""" if self.robot is None: return try: neutral_pose = np.eye(4) self.robot.goto_target( head=neutral_pose, antennas=[0.0, 0.0], body_yaw=0.0, duration=1.0, ) logger.info("Robot reset to neutral position") except Exception as e: logger.error("Failed to reset robot: %s", e) @property def is_running(self) -> bool: """Check if control loop is running.""" return self._thread is not None and self._thread.is_alive()