""" Unified Movement Manager for Reachy Mini. This module provides a centralized control system for robot movements, inspired by the reachy_mini_conversation_app architecture. Key features: - Single 10Hz control loop (balanced between responsiveness and stability) - Command queue pattern (thread-safe external API) - Error throttling (prevents log explosion) - JSON-driven animation system (conversation state animations) - Graceful shutdown - Pose change detection (skip sending if no significant change) - Robust connection recovery (faster reconnection attempts) - Proper pose composition using SDK's compose_world_offset (same as conversation_app) - Antenna freeze during listening mode with smooth blend back """ import logging import math import threading import time from dataclasses import dataclass, field from enum import Enum from queue import Queue, Empty from typing import Any, Callable, Dict, List, Optional, Tuple, TYPE_CHECKING import numpy as np from scipy.spatial.transform import Rotation as R if TYPE_CHECKING: from reachy_mini import ReachyMini logger = logging.getLogger(__name__) # Import SDK utilities for pose composition (same as conversation_app) try: from reachy_mini.utils import create_head_pose from reachy_mini.utils.interpolation import compose_world_offset SDK_UTILS_AVAILABLE = True except ImportError: SDK_UTILS_AVAILABLE = False logger.warning("SDK utils not available, using fallback pose composition") # Import animation player from .animation_player import AnimationPlayer # ============================================================================= # Constants # ============================================================================= # Control loop frequency - daemon now supports higher rates CONTROL_LOOP_FREQUENCY_HZ = 100 # 100Hz control loop (same as conversation_app) TARGET_PERIOD = 1.0 / CONTROL_LOOP_FREQUENCY_HZ # Antenna freeze parameters (listening mode) ANTENNA_BLEND_DURATION = 0.5 # Seconds to blend back from frozen state # State to animation mapping STATE_ANIMATION_MAP = { "idle": "idle", "listening": "listening", "thinking": "thinking", "speaking": "speaking", } class RobotState(Enum): """Robot state machine states.""" IDLE = "idle" LISTENING = "listening" THINKING = "thinking" SPEAKING = "speaking" @dataclass class MovementState: """Internal movement state (only modified by control loop).""" # Current robot state robot_state: RobotState = RobotState.IDLE # Animation offsets (from AnimationPlayer) anim_pitch: float = 0.0 anim_yaw: float = 0.0 anim_roll: float = 0.0 anim_x: float = 0.0 anim_y: float = 0.0 anim_z: float = 0.0 anim_antenna_left: float = 0.0 anim_antenna_right: float = 0.0 # Speech sway offsets (from audio analysis) sway_pitch: float = 0.0 sway_yaw: float = 0.0 sway_roll: float = 0.0 sway_x: float = 0.0 sway_y: float = 0.0 sway_z: float = 0.0 # Target pose (from actions) target_pitch: float = 0.0 target_yaw: float = 0.0 target_roll: float = 0.0 target_x: float = 0.0 target_y: float = 0.0 target_z: float = 0.0 target_antenna_left: float = 0.0 target_antenna_right: float = 0.0 target_body_yaw: float = 0.0 # Timing last_activity_time: float = 0.0 idle_start_time: float = 0.0 # Antenna freeze state (listening mode) antenna_frozen: bool = False frozen_antenna_left: float = 0.0 frozen_antenna_right: float = 0.0 antenna_blend: float = 1.0 # 0=frozen, 1=normal antenna_blend_start_time: float = 0.0 @dataclass class PendingAction: """A pending motion action.""" name: str target_pitch: float = 0.0 target_yaw: float = 0.0 target_roll: float = 0.0 target_x: float = 0.0 target_y: float = 0.0 target_z: float = 0.0 duration: float = 0.5 callback: Optional[Callable] = None class MovementManager: """ Unified movement manager with 10Hz control loop. All external interactions go through the command queue, ensuring thread safety and preventing race conditions. Note: Frequency reduced from 100Hz to 10Hz to prevent daemon crashes caused by excessive Zenoh message traffic. """ def __init__(self, reachy_mini: Optional["ReachyMini"] = None): self.robot = reachy_mini self._now = time.monotonic # Command queue - all external threads communicate through this self._command_queue: Queue[Tuple[str, Any]] = Queue() # Internal state (only modified by control loop) self.state = MovementState() self.state.last_activity_time = self._now() self.state.idle_start_time = self._now() # Animation player (JSON-driven animations) self._animation_player = AnimationPlayer() # Thread control self._stop_event = threading.Event() self._thread: Optional[threading.Thread] = None # Error throttling self._last_error_time = 0.0 self._error_interval = 1.0 # Log at most once per second self._suppressed_errors = 0 # Connection health tracking self._connection_lost = False self._last_successful_command = self._now() self._connection_timeout = 3.0 self._reconnect_attempt_interval = 2.0 self._last_reconnect_attempt = 0.0 self._consecutive_errors = 0 self._max_consecutive_errors = 5 # Pending action self._pending_action: Optional[PendingAction] = None self._action_start_time: float = 0.0 self._action_start_pose: Dict[str, float] = {} # Pose change detection threshold self._last_sent_pose: Optional[Dict[str, float]] = None self._pose_change_threshold = 0.005 # Face tracking offsets (from camera worker) self._face_tracking_offsets: Tuple[float, float, float, float, float, float] = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0) self._face_tracking_lock = threading.Lock() # Camera server reference for face tracking self._camera_server = None # Face tracking smoothing (exponential moving average) self._smoothed_face_offsets: List[float] = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] self._face_smoothing_factor = 0.3 logger.info("MovementManager initialized with AnimationPlayer") # ========================================================================= # Thread-safe public API (called from any thread) # ========================================================================= def set_state(self, new_state: RobotState) -> None: """Thread-safe: Set robot state.""" self._command_queue.put(("set_state", new_state)) def set_listening(self, listening: bool) -> None: """Thread-safe: Set listening state.""" state = RobotState.LISTENING if listening else RobotState.IDLE self._command_queue.put(("set_state", state)) def set_thinking(self) -> None: """Thread-safe: Set thinking state.""" self._command_queue.put(("set_state", RobotState.THINKING)) def set_speaking(self, speaking: bool) -> None: """Thread-safe: Set speaking state.""" state = RobotState.SPEAKING if speaking else RobotState.IDLE self._command_queue.put(("set_state", state)) def set_idle(self) -> None: """Thread-safe: Return to idle state.""" self._command_queue.put(("set_state", RobotState.IDLE)) def queue_action(self, action: PendingAction) -> None: """Thread-safe: Queue a motion action.""" self._command_queue.put(("action", action)) def turn_to_angle(self, yaw_deg: float, duration: float = 0.8) -> None: """Thread-safe: Turn head to face a direction.""" action = PendingAction( name="turn_to", target_yaw=math.radians(yaw_deg), duration=duration, ) self._command_queue.put(("action", action)) def nod(self, amplitude_deg: float = 15, duration: float = 0.5) -> None: """Thread-safe: Perform a nod gesture.""" self._command_queue.put(("nod", (amplitude_deg, duration))) def shake(self, amplitude_deg: float = 20, duration: float = 0.5) -> None: """Thread-safe: Perform a head shake gesture.""" self._command_queue.put(("shake", (amplitude_deg, duration))) def set_speech_sway( self, x: float, y: float, z: float, roll: float, pitch: float, yaw: float ) -> None: """Thread-safe: Set speech-driven sway offsets. These offsets are applied on top of the current animation to create audio-synchronized head motion during TTS playback. Args: x, y, z: Position offsets in meters roll, pitch, yaw: Orientation offsets in radians """ self._command_queue.put(("speech_sway", (x, y, z, roll, pitch, yaw))) def reset_to_neutral(self, duration: float = 0.5) -> None: """Thread-safe: Reset to neutral position.""" action = PendingAction( name="neutral", target_pitch=0.0, target_yaw=0.0, target_roll=0.0, target_x=0.0, target_y=0.0, target_z=0.0, duration=duration, ) self._command_queue.put(("action", action)) def set_camera_server(self, camera_server) -> None: """Set the camera server for face tracking offsets. Args: camera_server: MJPEGCameraServer instance with face tracking """ self._camera_server = camera_server logger.info("Camera server set for face tracking") def set_face_tracking_offsets(self, offsets: Tuple[float, float, float, float, float, float]) -> None: """Thread-safe: Update face tracking offsets manually. Args: offsets: Tuple of (x, y, z, roll, pitch, yaw) in meters/radians """ with self._face_tracking_lock: self._face_tracking_offsets = offsets def set_target_pose( self, x: Optional[float] = None, y: Optional[float] = None, z: Optional[float] = None, roll: Optional[float] = None, pitch: Optional[float] = None, yaw: Optional[float] = None, body_yaw: Optional[float] = None, antenna_left: Optional[float] = None, antenna_right: Optional[float] = None, ) -> None: """Thread-safe: Set target pose components. Only provided values will be updated. Values are in meters for position and radians for angles. Args: x, y, z: Head position in meters roll, pitch, yaw: Head orientation in radians body_yaw: Body yaw in radians antenna_left, antenna_right: Antenna angles in radians """ self._command_queue.put(("set_pose", { "x": x, "y": y, "z": z, "roll": roll, "pitch": pitch, "yaw": yaw, "body_yaw": body_yaw, "antenna_left": antenna_left, "antenna_right": antenna_right, })) # ========================================================================= # Internal: Command processing (runs in control loop) # ========================================================================= def _poll_commands(self) -> None: """Process all pending commands from the queue.""" while True: try: cmd, payload = self._command_queue.get_nowait() except Empty: break self._handle_command(cmd, payload) def _handle_command(self, cmd: str, payload: Any) -> None: """Handle a single command.""" if cmd == "set_state": old_state = self.state.robot_state self.state.robot_state = payload self.state.last_activity_time = self._now() # Update animation based on state animation_name = STATE_ANIMATION_MAP.get(payload.value, "idle") self._animation_player.set_animation(animation_name) # State transition logic if payload == RobotState.IDLE and old_state != RobotState.IDLE: self.state.idle_start_time = self._now() # Unfreeze antennas when returning to idle self._start_antenna_unfreeze() # Freeze antennas when entering listening mode if payload == RobotState.LISTENING: self._freeze_antennas() elif old_state == RobotState.LISTENING and payload != RobotState.LISTENING: # Start unfreezing when leaving listening mode self._start_antenna_unfreeze() logger.debug("State changed: %s -> %s, animation: %s", old_state.value, payload.value, animation_name) elif cmd == "action": self._start_action(payload) elif cmd == "nod": amplitude_deg, duration = payload self._do_nod(amplitude_deg, duration) elif cmd == "shake": amplitude_deg, duration = payload self._do_shake(amplitude_deg, duration) elif cmd == "set_pose": # Update target pose from external control (e.g., Home Assistant) if payload.get("x") is not None: self.state.target_x = payload["x"] if payload.get("y") is not None: self.state.target_y = payload["y"] if payload.get("z") is not None: self.state.target_z = payload["z"] if payload.get("roll") is not None: self.state.target_roll = payload["roll"] if payload.get("pitch") is not None: self.state.target_pitch = payload["pitch"] if payload.get("yaw") is not None: self.state.target_yaw = payload["yaw"] if payload.get("body_yaw") is not None: self.state.target_body_yaw = payload["body_yaw"] if payload.get("antenna_left") is not None: self.state.target_antenna_left = payload["antenna_left"] if payload.get("antenna_right") is not None: self.state.target_antenna_right = payload["antenna_right"] logger.debug("External pose update: %s", payload) elif cmd == "speech_sway": # Update speech-driven sway offsets x, y, z, roll, pitch, yaw = payload self.state.sway_x = x self.state.sway_y = y self.state.sway_z = z self.state.sway_roll = roll self.state.sway_pitch = pitch self.state.sway_yaw = yaw def _start_action(self, action: PendingAction) -> None: """Start a new motion action.""" self._pending_action = action self._action_start_time = self._now() self._action_start_pose = { "pitch": self.state.target_pitch, "yaw": self.state.target_yaw, "roll": self.state.target_roll, "x": self.state.target_x, "y": self.state.target_y, "z": self.state.target_z, } logger.debug("Starting action: %s", action.name) def _do_nod(self, amplitude_deg: float, duration: float) -> None: """Execute nod gesture (blocking in control loop context).""" # This is simplified - in production, use action queue amplitude_rad = math.radians(amplitude_deg) half_duration = duration / 2 # Nod down action_down = PendingAction( name="nod_down", target_pitch=amplitude_rad, duration=half_duration, ) self._start_action(action_down) def _do_shake(self, amplitude_deg: float, duration: float) -> None: """Execute shake gesture (blocking in control loop context).""" amplitude_rad = math.radians(amplitude_deg) half_duration = duration / 2 # Shake left action_left = PendingAction( name="shake_left", target_yaw=-amplitude_rad, duration=half_duration, ) self._start_action(action_left) # ========================================================================= # Internal: Motion updates (runs in control loop) # ========================================================================= def _update_action(self, dt: float) -> None: """Update pending action interpolation.""" if self._pending_action is None: return elapsed = self._now() - self._action_start_time progress = min(1.0, elapsed / self._pending_action.duration) # Smooth interpolation (ease in-out) t = progress * progress * (3 - 2 * progress) # Interpolate pose start = self._action_start_pose action = self._pending_action self.state.target_pitch = start["pitch"] + t * (action.target_pitch - start["pitch"]) self.state.target_yaw = start["yaw"] + t * (action.target_yaw - start["yaw"]) self.state.target_roll = start["roll"] + t * (action.target_roll - start["roll"]) self.state.target_x = start["x"] + t * (action.target_x - start["x"]) self.state.target_y = start["y"] + t * (action.target_y - start["y"]) self.state.target_z = start["z"] + t * (action.target_z - start["z"]) # Action complete if progress >= 1.0: if self._pending_action.callback: try: self._pending_action.callback() except Exception as e: logger.error("Action callback error: %s", e) self._pending_action = None def _update_animation(self, dt: float) -> None: """Update animation offsets from AnimationPlayer.""" offsets = self._animation_player.get_offsets(dt) self.state.anim_pitch = offsets["pitch"] self.state.anim_yaw = offsets["yaw"] self.state.anim_roll = offsets["roll"] self.state.anim_x = offsets["x"] self.state.anim_y = offsets["y"] self.state.anim_z = offsets["z"] self.state.anim_antenna_left = offsets["antenna_left"] self.state.anim_antenna_right = offsets["antenna_right"] def _freeze_antennas(self) -> None: """Freeze antennas at current position (for listening mode).""" # Capture current antenna positions current_left = self.state.target_antenna_left + self.state.anim_antenna_left current_right = self.state.target_antenna_right + self.state.anim_antenna_right self.state.antenna_frozen = True self.state.frozen_antenna_left = current_left self.state.frozen_antenna_right = current_right self.state.antenna_blend = 0.0 # Fully frozen logger.debug("Antennas frozen at left=%.2f, right=%.2f", math.degrees(current_left), math.degrees(current_right)) def _start_antenna_unfreeze(self) -> None: """Start unfreezing antennas (smooth blend back to normal).""" if not self.state.antenna_frozen: return self.state.antenna_blend_start_time = self._now() logger.debug("Starting antenna unfreeze") def _update_antenna_blend(self, dt: float) -> None: """Update antenna blend state for smooth unfreezing.""" if not self.state.antenna_frozen: return if self.state.antenna_blend >= 1.0: # Fully unfrozen self.state.antenna_frozen = False return # Calculate blend progress elapsed = self._now() - self.state.antenna_blend_start_time if elapsed > 0: self.state.antenna_blend = min(1.0, elapsed / ANTENNA_BLEND_DURATION) if self.state.antenna_blend >= 1.0: self.state.antenna_frozen = False logger.debug("Antennas unfrozen") def _update_face_tracking(self) -> None: """Get face tracking offsets from camera server with smoothing.""" if self._camera_server is not None: try: raw_offsets = self._camera_server.get_face_tracking_offsets() # Apply exponential moving average smoothing alpha = self._face_smoothing_factor for i in range(6): self._smoothed_face_offsets[i] = ( alpha * raw_offsets[i] + (1 - alpha) * self._smoothed_face_offsets[i] ) with self._face_tracking_lock: self._face_tracking_offsets = tuple(self._smoothed_face_offsets) except Exception as e: logger.debug("Error getting face tracking offsets: %s", e) def _compose_final_pose(self) -> Tuple[np.ndarray, Tuple[float, float], float]: """Compose final pose from all sources using SDK's compose_world_offset. Returns: Tuple of (head_pose_4x4, (antenna_right, antenna_left), body_yaw) """ # Build primary head pose from target state if SDK_UTILS_AVAILABLE: primary_head = create_head_pose( x=self.state.target_x, y=self.state.target_y, z=self.state.target_z, roll=self.state.target_roll, pitch=self.state.target_pitch, yaw=self.state.target_yaw, degrees=False, mm=False, ) else: # Fallback: build matrix manually rotation = R.from_euler('xyz', [ self.state.target_roll, self.state.target_pitch, self.state.target_yaw, ]) primary_head = np.eye(4) primary_head[:3, :3] = rotation.as_matrix() primary_head[0, 3] = self.state.target_x primary_head[1, 3] = self.state.target_y primary_head[2, 3] = self.state.target_z # Build secondary pose from animation + face tracking + speech sway with self._face_tracking_lock: face_offsets = self._face_tracking_offsets secondary_x = self.state.anim_x + self.state.sway_x + face_offsets[0] secondary_y = self.state.anim_y + self.state.sway_y + face_offsets[1] secondary_z = self.state.anim_z + self.state.sway_z + face_offsets[2] secondary_roll = self.state.anim_roll + self.state.sway_roll + face_offsets[3] secondary_pitch = self.state.anim_pitch + self.state.sway_pitch + face_offsets[4] secondary_yaw = self.state.anim_yaw + self.state.sway_yaw + face_offsets[5] if SDK_UTILS_AVAILABLE: secondary_head = create_head_pose( x=secondary_x, y=secondary_y, z=secondary_z, roll=secondary_roll, pitch=secondary_pitch, yaw=secondary_yaw, degrees=False, mm=False, ) # Compose using SDK's compose_world_offset (same as conversation_app) final_head = compose_world_offset(primary_head, secondary_head, reorthonormalize=True) else: # Fallback: simple addition (less accurate but works) secondary_rotation = R.from_euler('xyz', [secondary_roll, secondary_pitch, secondary_yaw]) secondary_head = np.eye(4) secondary_head[:3, :3] = secondary_rotation.as_matrix() secondary_head[0, 3] = secondary_x secondary_head[1, 3] = secondary_y secondary_head[2, 3] = secondary_z # Simple composition: R_final = R_secondary @ R_primary, t_final = t_primary + t_secondary final_head = np.eye(4) final_head[:3, :3] = secondary_head[:3, :3] @ primary_head[:3, :3] final_head[:3, 3] = primary_head[:3, 3] + secondary_head[:3, 3] # Antenna pose with freeze blending target_antenna_left = self.state.target_antenna_left + self.state.anim_antenna_left target_antenna_right = self.state.target_antenna_right + self.state.anim_antenna_right # Apply antenna freeze blending (listening mode) blend = self.state.antenna_blend if blend < 1.0: # Blend between frozen position and target position antenna_left = (self.state.frozen_antenna_left * (1.0 - blend) + target_antenna_left * blend) antenna_right = (self.state.frozen_antenna_right * (1.0 - blend) + target_antenna_right * blend) else: antenna_left = target_antenna_left antenna_right = target_antenna_right return final_head, (antenna_right, antenna_left), self.state.target_body_yaw # ========================================================================= # Internal: Robot control (runs in control loop) # ========================================================================= def _issue_control_command(self, head_pose: np.ndarray, antennas: Tuple[float, float], body_yaw: float) -> None: """Send control command to robot with error throttling and connection health tracking.""" if self.robot is None: return # Check if pose changed significantly (prevent unnecessary commands) # Extract euler angles for comparison rotation = R.from_matrix(head_pose[:3, :3]) euler = rotation.as_euler('xyz') # [roll, pitch, yaw] current_pose = { "x": head_pose[0, 3], "y": head_pose[1, 3], "z": head_pose[2, 3], "roll": euler[0], "pitch": euler[1], "yaw": euler[2], "antenna_right": antennas[0], "antenna_left": antennas[1], "body_yaw": body_yaw, } if self._last_sent_pose is not None: max_diff = max( abs(current_pose[k] - self._last_sent_pose.get(k, 0.0)) for k in current_pose.keys() ) if max_diff < self._pose_change_threshold: # No significant change, skip sending command return now = self._now() # Check if we should skip due to connection loss (but always try periodically) if self._connection_lost: if now - self._last_reconnect_attempt < self._reconnect_attempt_interval: # Skip sending commands to reduce error spam return # Time to try reconnecting self._last_reconnect_attempt = now logger.debug("Attempting to send command after connection loss...") try: # Send to robot (single control point!) # head_pose is already a 4x4 matrix from _compose_final_pose self.robot.set_target( head=head_pose, antennas=list(antennas), body_yaw=body_yaw, ) # Command succeeded - update connection health and cache self._last_successful_command = now self._last_sent_pose = current_pose.copy() # Cache sent pose self._consecutive_errors = 0 # Reset error counter if self._connection_lost: logger.info("✓ Connection to robot restored") self._connection_lost = False self._suppressed_errors = 0 except Exception as e: error_msg = str(e) self._consecutive_errors += 1 # Check if this is a connection error is_connection_error = "Lost connection" in error_msg or "ZError" in error_msg if is_connection_error: if not self._connection_lost: # First time detecting connection loss if self._consecutive_errors >= self._max_consecutive_errors: logger.warning(f"Connection unstable after {self._consecutive_errors} errors: {error_msg}") logger.warning(" Will retry connection every %.1fs...", self._reconnect_attempt_interval) self._connection_lost = True self._last_reconnect_attempt = now else: # Transient error, log but don't mark as lost yet self._log_error_throttled(f"Transient connection error ({self._consecutive_errors}/{self._max_consecutive_errors}): {error_msg}") else: # Already in lost state, use throttled logging self._log_error_throttled(f"Connection still lost: {error_msg}") else: # Non-connection error - log but don't affect connection state self._log_error_throttled(f"Failed to set robot target: {error_msg}") def _log_error_throttled(self, message: str) -> None: """Log error with throttling to prevent log explosion.""" now = self._now() if now - self._last_error_time >= self._error_interval: if self._suppressed_errors > 0: message += f" (suppressed {self._suppressed_errors} repeats)" self._suppressed_errors = 0 logger.error(message) self._last_error_time = now else: self._suppressed_errors += 1 # ========================================================================= # Control loop # ========================================================================= def _control_loop(self) -> None: """Main 10Hz control loop.""" logger.info("Movement manager control loop started (%.0f Hz)", CONTROL_LOOP_FREQUENCY_HZ) last_time = self._now() while not self._stop_event.is_set(): loop_start = self._now() dt = loop_start - last_time last_time = loop_start try: # 1. Process commands from queue self._poll_commands() # 2. Update action interpolation self._update_action(dt) # 3. Update animation offsets (JSON-driven) self._update_animation(dt) # 4. Update antenna blend (listening mode freeze/unfreeze) self._update_antenna_blend(dt) # 5. Update face tracking offsets from camera server self._update_face_tracking() # 6. Compose final pose (returns head_pose matrix, antennas tuple, body_yaw) head_pose, antennas, body_yaw = self._compose_final_pose() # 7. Send to robot (single control point!) self._issue_control_command(head_pose, antennas, body_yaw) except Exception as e: self._log_error_throttled(f"Control loop error: {e}") # Adaptive sleep elapsed = self._now() - loop_start sleep_time = max(0.0, TARGET_PERIOD - elapsed) if sleep_time > 0: time.sleep(sleep_time) logger.info("Movement manager control loop stopped") # ========================================================================= # Lifecycle # ========================================================================= def start(self) -> None: """Start the control loop.""" if self._thread is not None and self._thread.is_alive(): logger.warning("Movement manager already running") return self._stop_event.clear() self._thread = threading.Thread( target=self._control_loop, daemon=True, name="MovementManager", ) self._thread.start() logger.info("Movement manager started") def stop(self) -> None: """Stop the control loop and reset robot.""" if self._thread is None or not self._thread.is_alive(): return logger.info("Stopping movement manager...") # Signal stop self._stop_event.set() # Wait for thread with shorter timeout self._thread.join(timeout=0.5) if self._thread.is_alive(): logger.warning("Movement manager thread did not stop in time") # Skip reset to neutral - let the app manager handle it # This speeds up shutdown significantly logger.info("Movement manager stopped") def _reset_to_neutral_blocking(self) -> None: """Reset robot to neutral position (blocking).""" if self.robot is None: return try: neutral_pose = np.eye(4) self.robot.goto_target( head=neutral_pose, antennas=[0.0, 0.0], body_yaw=0.0, duration=0.3, # Faster reset ) logger.info("Robot reset to neutral position") except Exception as e: logger.error("Failed to reset robot: %s", e) @property def is_running(self) -> bool: """Check if control loop is running.""" return self._thread is not None and self._thread.is_alive()