| """ |
| Unified Movement Manager for Reachy Mini. |
| |
| This module provides a centralized control system for robot movements, |
| inspired by the reachy_mini_conversation_app architecture. |
| |
| Key features: |
| - Single 100Hz control loop (prevents race conditions) |
| - Command queue pattern (thread-safe external API) |
| - Error throttling (prevents log explosion) |
| - Speech-driven head sway |
| - Breathing animation during idle |
| - Graceful shutdown |
| """ |
|
|
| import logging |
| import math |
| import threading |
| import time |
| from dataclasses import dataclass, field |
| from enum import Enum |
| from queue import Queue, Empty |
| from typing import Any, Callable, Dict, List, Optional, Tuple, TYPE_CHECKING |
|
|
| import numpy as np |
| from scipy.spatial.transform import Rotation as R |
|
|
| if TYPE_CHECKING: |
| from reachy_mini import ReachyMini |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| CONTROL_LOOP_FREQUENCY_HZ = 100 |
| TARGET_PERIOD = 1.0 / CONTROL_LOOP_FREQUENCY_HZ |
|
|
| |
| |
| SWAY_A_PITCH_DEG = 4.5 |
| SWAY_A_YAW_DEG = 7.5 |
| SWAY_A_ROLL_DEG = 2.25 |
| |
| SWAY_F_PITCH = 2.2 |
| SWAY_F_YAW = 0.6 |
| SWAY_F_ROLL = 1.3 |
| |
| SWAY_A_X_MM = 4.5 |
| SWAY_A_Y_MM = 3.75 |
| SWAY_A_Z_MM = 2.25 |
| |
| SWAY_F_X = 0.35 |
| SWAY_F_Y = 0.45 |
| SWAY_F_Z = 0.25 |
| |
| SWAY_MASTER = 1.5 |
|
|
| |
| BREATHING_Z_AMPLITUDE = 0.005 |
| BREATHING_FREQUENCY = 0.1 |
| ANTENNA_SWAY_AMPLITUDE_DEG = 15.0 |
| ANTENNA_FREQUENCY = 0.5 |
|
|
| |
| VAD_DB_ON = -35 |
| VAD_DB_OFF = -45 |
|
|
| |
| ANTENNA_BLEND_DURATION = 0.5 |
|
|
|
|
| class RobotState(Enum): |
| """Robot state machine states.""" |
| IDLE = "idle" |
| LISTENING = "listening" |
| THINKING = "thinking" |
| SPEAKING = "speaking" |
|
|
|
|
| @dataclass |
| class MovementState: |
| """Internal movement state (only modified by control loop).""" |
| |
| robot_state: RobotState = RobotState.IDLE |
|
|
| |
| speech_pitch: float = 0.0 |
| speech_yaw: float = 0.0 |
| speech_roll: float = 0.0 |
| speech_x: float = 0.0 |
| speech_y: float = 0.0 |
| speech_z: float = 0.0 |
|
|
| |
| breathing_z: float = 0.0 |
| breathing_antenna_left: float = 0.0 |
| breathing_antenna_right: float = 0.0 |
|
|
| |
| target_pitch: float = 0.0 |
| target_yaw: float = 0.0 |
| target_roll: float = 0.0 |
| target_x: float = 0.0 |
| target_y: float = 0.0 |
| target_z: float = 0.0 |
| target_antenna_left: float = 0.0 |
| target_antenna_right: float = 0.0 |
| target_body_yaw: float = 0.0 |
|
|
| |
| last_activity_time: float = 0.0 |
| idle_start_time: float = 0.0 |
|
|
| |
| sway_time: float = 0.0 |
| sway_envelope: float = 0.0 |
| sway_phase_pitch: float = 0.0 |
| sway_phase_yaw: float = 0.0 |
| sway_phase_roll: float = 0.0 |
| sway_phase_x: float = 0.0 |
| sway_phase_y: float = 0.0 |
| sway_phase_z: float = 0.0 |
|
|
| |
| breathing_time: float = 0.0 |
| breathing_active: bool = False |
|
|
| |
| antenna_frozen: bool = False |
| frozen_antenna_left: float = 0.0 |
| frozen_antenna_right: float = 0.0 |
| antenna_blend: float = 1.0 |
| antenna_blend_start_time: float = 0.0 |
|
|
|
|
| @dataclass |
| class PendingAction: |
| """A pending motion action.""" |
| name: str |
| target_pitch: float = 0.0 |
| target_yaw: float = 0.0 |
| target_roll: float = 0.0 |
| target_x: float = 0.0 |
| target_y: float = 0.0 |
| target_z: float = 0.0 |
| duration: float = 0.5 |
| callback: Optional[Callable] = None |
|
|
|
|
| class SpeechSwayGenerator: |
| """ |
| Generates speech-driven head sway based on audio loudness. |
| |
| Uses multiple sine wave oscillators at different frequencies |
| to create natural-looking "Lissajous" motion. |
| Includes both rotation (pitch/yaw/roll) and translation (x/y/z). |
| """ |
|
|
| def __init__(self, seed: int = 7): |
| |
| rng = np.random.default_rng(seed) |
| self.phase_pitch = float(rng.random() * 2 * math.pi) |
| self.phase_yaw = float(rng.random() * 2 * math.pi) |
| self.phase_roll = float(rng.random() * 2 * math.pi) |
| self.phase_x = float(rng.random() * 2 * math.pi) |
| self.phase_y = float(rng.random() * 2 * math.pi) |
| self.phase_z = float(rng.random() * 2 * math.pi) |
|
|
| |
| self.t = 0.0 |
| self.vad_on = False |
| self.envelope = 0.0 |
| self.last_db = -100.0 |
|
|
| def reset(self): |
| """Reset state.""" |
| self.t = 0.0 |
| self.vad_on = False |
| self.envelope = 0.0 |
| self.last_db = -100.0 |
|
|
| def update(self, dt: float, loudness_db: float) -> Tuple[float, float, float, float, float, float]: |
| """ |
| Update sway based on audio loudness. |
| |
| Args: |
| dt: Time delta in seconds |
| loudness_db: Audio loudness in dBFS |
| |
| Returns: |
| Tuple of (pitch_rad, yaw_rad, roll_rad, x_m, y_m, z_m) offsets |
| """ |
| self.t += dt |
| self.last_db = loudness_db |
|
|
| |
| if loudness_db >= VAD_DB_ON: |
| self.vad_on = True |
| elif loudness_db <= VAD_DB_OFF: |
| self.vad_on = False |
|
|
| |
| target = 1.0 if self.vad_on else 0.0 |
| self.envelope += 0.1 * (target - self.envelope) |
| self.envelope = max(0.0, min(1.0, self.envelope)) |
|
|
| |
| loud = max(0.0, min(1.0, (loudness_db + 50) / 30)) |
|
|
| |
| env = self.envelope * SWAY_MASTER |
|
|
| |
| pitch = (math.radians(SWAY_A_PITCH_DEG) * loud * env * |
| math.sin(2 * math.pi * SWAY_F_PITCH * self.t + self.phase_pitch)) |
| yaw = (math.radians(SWAY_A_YAW_DEG) * loud * env * |
| math.sin(2 * math.pi * SWAY_F_YAW * self.t + self.phase_yaw)) |
| roll = (math.radians(SWAY_A_ROLL_DEG) * loud * env * |
| math.sin(2 * math.pi * SWAY_F_ROLL * self.t + self.phase_roll)) |
|
|
| |
| x = (SWAY_A_X_MM / 1000.0 * loud * env * |
| math.sin(2 * math.pi * SWAY_F_X * self.t + self.phase_x)) |
| y = (SWAY_A_Y_MM / 1000.0 * loud * env * |
| math.sin(2 * math.pi * SWAY_F_Y * self.t + self.phase_y)) |
| z = (SWAY_A_Z_MM / 1000.0 * loud * env * |
| math.sin(2 * math.pi * SWAY_F_Z * self.t + self.phase_z)) |
|
|
| return pitch, yaw, roll, x, y, z |
|
|
|
|
| class BreathingAnimation: |
| """ |
| Generates idle breathing animation. |
| |
| Creates subtle Z-axis movement and antenna sway to make |
| the robot appear more alive when idle. |
| """ |
|
|
| def __init__(self): |
| self.t = 0.0 |
| self.active = False |
| self.blend = 0.0 |
|
|
| def reset(self): |
| """Reset animation.""" |
| self.t = 0.0 |
| self.blend = 0.0 |
|
|
| def set_active(self, active: bool): |
| """Set whether breathing is active.""" |
| self.active = active |
|
|
| def update(self, dt: float) -> Tuple[float, float, float]: |
| """ |
| Update breathing animation. |
| |
| Args: |
| dt: Time delta in seconds |
| |
| Returns: |
| Tuple of (z_offset_m, antenna_left_rad, antenna_right_rad) |
| """ |
| self.t += dt |
|
|
| |
| target_blend = 1.0 if self.active else 0.0 |
| blend_speed = 0.5 |
| self.blend += blend_speed * dt * (target_blend - self.blend) |
| self.blend = max(0.0, min(1.0, self.blend)) |
|
|
| if self.blend < 0.001: |
| return 0.0, 0.0, 0.0 |
|
|
| |
| z_offset = (BREATHING_Z_AMPLITUDE * self.blend * |
| math.sin(2 * math.pi * BREATHING_FREQUENCY * self.t)) |
|
|
| |
| antenna_angle = (math.radians(ANTENNA_SWAY_AMPLITUDE_DEG) * self.blend * |
| math.sin(2 * math.pi * ANTENNA_FREQUENCY * self.t)) |
|
|
| return z_offset, antenna_angle, -antenna_angle |
|
|
|
|
| class MovementManager: |
| """ |
| Unified movement manager with 100Hz control loop. |
| |
| All external interactions go through the command queue, |
| ensuring thread safety and preventing race conditions. |
| """ |
|
|
| def __init__(self, reachy_mini: Optional["ReachyMini"] = None): |
| self.robot = reachy_mini |
| self._now = time.monotonic |
|
|
| |
| self._command_queue: Queue[Tuple[str, Any]] = Queue() |
|
|
| |
| self.state = MovementState() |
| self.state.last_activity_time = self._now() |
| self.state.idle_start_time = self._now() |
|
|
| |
| rng = np.random.default_rng(42) |
| self.state.sway_phase_pitch = float(rng.random() * 2 * math.pi) |
| self.state.sway_phase_yaw = float(rng.random() * 2 * math.pi) |
| self.state.sway_phase_roll = float(rng.random() * 2 * math.pi) |
|
|
| |
| self._speech_sway = SpeechSwayGenerator() |
| self._breathing = BreathingAnimation() |
|
|
| |
| self._stop_event = threading.Event() |
| self._thread: Optional[threading.Thread] = None |
|
|
| |
| self._last_error_time = 0.0 |
| self._error_interval = 1.0 |
| self._suppressed_errors = 0 |
|
|
| |
| self._connection_lost = False |
| self._last_successful_command = self._now() |
| self._connection_timeout = 5.0 |
| self._reconnect_attempt_interval = 10.0 |
| self._last_reconnect_attempt = 0.0 |
|
|
| |
| self._pending_action: Optional[PendingAction] = None |
| self._action_start_time: float = 0.0 |
| self._action_start_pose: Dict[str, float] = {} |
|
|
| |
| self._audio_loudness_db: float = -100.0 |
| self._audio_lock = threading.Lock() |
|
|
| logger.info("MovementManager initialized") |
|
|
| |
| |
| |
|
|
| def set_state(self, new_state: RobotState) -> None: |
| """Thread-safe: Set robot state.""" |
| self._command_queue.put(("set_state", new_state)) |
|
|
| def set_listening(self, listening: bool) -> None: |
| """Thread-safe: Set listening state.""" |
| state = RobotState.LISTENING if listening else RobotState.IDLE |
| self._command_queue.put(("set_state", state)) |
|
|
| def set_thinking(self) -> None: |
| """Thread-safe: Set thinking state.""" |
| self._command_queue.put(("set_state", RobotState.THINKING)) |
|
|
| def set_speaking(self, speaking: bool) -> None: |
| """Thread-safe: Set speaking state.""" |
| state = RobotState.SPEAKING if speaking else RobotState.IDLE |
| self._command_queue.put(("set_state", state)) |
|
|
| def set_idle(self) -> None: |
| """Thread-safe: Return to idle state.""" |
| self._command_queue.put(("set_state", RobotState.IDLE)) |
|
|
| def queue_action(self, action: PendingAction) -> None: |
| """Thread-safe: Queue a motion action.""" |
| self._command_queue.put(("action", action)) |
|
|
| def turn_to_angle(self, yaw_deg: float, duration: float = 0.8) -> None: |
| """Thread-safe: Turn head to face a direction.""" |
| action = PendingAction( |
| name="turn_to", |
| target_yaw=math.radians(yaw_deg), |
| duration=duration, |
| ) |
| self._command_queue.put(("action", action)) |
|
|
| def nod(self, amplitude_deg: float = 15, duration: float = 0.5) -> None: |
| """Thread-safe: Perform a nod gesture.""" |
| self._command_queue.put(("nod", (amplitude_deg, duration))) |
|
|
| def shake(self, amplitude_deg: float = 20, duration: float = 0.5) -> None: |
| """Thread-safe: Perform a head shake gesture.""" |
| self._command_queue.put(("shake", (amplitude_deg, duration))) |
|
|
| def update_audio_loudness(self, loudness_db: float) -> None: |
| """Thread-safe: Update audio loudness for speech sway.""" |
| with self._audio_lock: |
| self._audio_loudness_db = loudness_db |
|
|
| def reset_to_neutral(self, duration: float = 0.5) -> None: |
| """Thread-safe: Reset to neutral position.""" |
| action = PendingAction( |
| name="neutral", |
| target_pitch=0.0, |
| target_yaw=0.0, |
| target_roll=0.0, |
| target_x=0.0, |
| target_y=0.0, |
| target_z=0.0, |
| duration=duration, |
| ) |
| self._command_queue.put(("action", action)) |
|
|
| |
| |
| |
|
|
| def _poll_commands(self) -> None: |
| """Process all pending commands from the queue.""" |
| while True: |
| try: |
| cmd, payload = self._command_queue.get_nowait() |
| except Empty: |
| break |
|
|
| self._handle_command(cmd, payload) |
|
|
| def _handle_command(self, cmd: str, payload: Any) -> None: |
| """Handle a single command.""" |
| if cmd == "set_state": |
| old_state = self.state.robot_state |
| self.state.robot_state = payload |
| self.state.last_activity_time = self._now() |
|
|
| |
| if payload == RobotState.IDLE and old_state != RobotState.IDLE: |
| self.state.idle_start_time = self._now() |
| self._breathing.set_active(True) |
| self._speech_sway.reset() |
| |
| self._start_antenna_unfreeze() |
| elif payload != RobotState.IDLE: |
| self._breathing.set_active(False) |
|
|
| if payload == RobotState.SPEAKING: |
| self._speech_sway.reset() |
|
|
| |
| if payload == RobotState.LISTENING: |
| self._freeze_antennas() |
| elif old_state == RobotState.LISTENING and payload != RobotState.LISTENING: |
| |
| self._start_antenna_unfreeze() |
|
|
| logger.debug("State changed: %s -> %s", old_state.value, payload.value) |
|
|
| elif cmd == "action": |
| self._start_action(payload) |
|
|
| elif cmd == "nod": |
| amplitude_deg, duration = payload |
| self._do_nod(amplitude_deg, duration) |
|
|
| elif cmd == "shake": |
| amplitude_deg, duration = payload |
| self._do_shake(amplitude_deg, duration) |
|
|
| def _start_action(self, action: PendingAction) -> None: |
| """Start a new motion action.""" |
| self._pending_action = action |
| self._action_start_time = self._now() |
| self._action_start_pose = { |
| "pitch": self.state.target_pitch, |
| "yaw": self.state.target_yaw, |
| "roll": self.state.target_roll, |
| "x": self.state.target_x, |
| "y": self.state.target_y, |
| "z": self.state.target_z, |
| } |
| logger.debug("Starting action: %s", action.name) |
|
|
| def _do_nod(self, amplitude_deg: float, duration: float) -> None: |
| """Execute nod gesture (blocking in control loop context).""" |
| |
| amplitude_rad = math.radians(amplitude_deg) |
| half_duration = duration / 2 |
|
|
| |
| action_down = PendingAction( |
| name="nod_down", |
| target_pitch=amplitude_rad, |
| duration=half_duration, |
| ) |
| self._start_action(action_down) |
|
|
| def _do_shake(self, amplitude_deg: float, duration: float) -> None: |
| """Execute shake gesture (blocking in control loop context).""" |
| amplitude_rad = math.radians(amplitude_deg) |
| half_duration = duration / 2 |
|
|
| |
| action_left = PendingAction( |
| name="shake_left", |
| target_yaw=-amplitude_rad, |
| duration=half_duration, |
| ) |
| self._start_action(action_left) |
|
|
| |
| |
| |
|
|
| def _update_action(self, dt: float) -> None: |
| """Update pending action interpolation.""" |
| if self._pending_action is None: |
| return |
|
|
| elapsed = self._now() - self._action_start_time |
| progress = min(1.0, elapsed / self._pending_action.duration) |
|
|
| |
| t = progress * progress * (3 - 2 * progress) |
|
|
| |
| start = self._action_start_pose |
| action = self._pending_action |
|
|
| self.state.target_pitch = start["pitch"] + t * (action.target_pitch - start["pitch"]) |
| self.state.target_yaw = start["yaw"] + t * (action.target_yaw - start["yaw"]) |
| self.state.target_roll = start["roll"] + t * (action.target_roll - start["roll"]) |
| self.state.target_x = start["x"] + t * (action.target_x - start["x"]) |
| self.state.target_y = start["y"] + t * (action.target_y - start["y"]) |
| self.state.target_z = start["z"] + t * (action.target_z - start["z"]) |
|
|
| |
| if progress >= 1.0: |
| if self._pending_action.callback: |
| try: |
| self._pending_action.callback() |
| except Exception as e: |
| logger.error("Action callback error: %s", e) |
| self._pending_action = None |
|
|
| def _update_speech_sway(self, dt: float) -> None: |
| """Update speech-driven head sway.""" |
| if self.state.robot_state != RobotState.SPEAKING: |
| |
| self.state.speech_pitch *= 0.9 |
| self.state.speech_yaw *= 0.9 |
| self.state.speech_roll *= 0.9 |
| self.state.speech_x *= 0.9 |
| self.state.speech_y *= 0.9 |
| self.state.speech_z *= 0.9 |
| return |
|
|
| |
| with self._audio_lock: |
| loudness_db = self._audio_loudness_db |
|
|
| |
| pitch, yaw, roll, x, y, z = self._speech_sway.update(dt, loudness_db) |
|
|
| self.state.speech_pitch = pitch |
| self.state.speech_yaw = yaw |
| self.state.speech_roll = roll |
| self.state.speech_x = x |
| self.state.speech_y = y |
| self.state.speech_z = z |
|
|
| def _update_breathing(self, dt: float) -> None: |
| """Update breathing animation.""" |
| z_offset, antenna_left, antenna_right = self._breathing.update(dt) |
|
|
| self.state.breathing_z = z_offset |
| self.state.breathing_antenna_left = antenna_left |
| self.state.breathing_antenna_right = antenna_right |
|
|
| def _freeze_antennas(self) -> None: |
| """Freeze antennas at current position (for listening mode).""" |
| |
| current_left = self.state.target_antenna_left + self.state.breathing_antenna_left |
| current_right = self.state.target_antenna_right + self.state.breathing_antenna_right |
|
|
| self.state.antenna_frozen = True |
| self.state.frozen_antenna_left = current_left |
| self.state.frozen_antenna_right = current_right |
| self.state.antenna_blend = 0.0 |
| logger.debug("Antennas frozen at left=%.2f, right=%.2f", |
| math.degrees(current_left), math.degrees(current_right)) |
|
|
| def _start_antenna_unfreeze(self) -> None: |
| """Start unfreezing antennas (smooth blend back to normal).""" |
| if not self.state.antenna_frozen: |
| return |
|
|
| self.state.antenna_blend_start_time = self._now() |
| logger.debug("Starting antenna unfreeze") |
|
|
| def _update_antenna_blend(self, dt: float) -> None: |
| """Update antenna blend state for smooth unfreezing.""" |
| if not self.state.antenna_frozen: |
| return |
|
|
| if self.state.antenna_blend >= 1.0: |
| |
| self.state.antenna_frozen = False |
| return |
|
|
| |
| elapsed = self._now() - self.state.antenna_blend_start_time |
| if elapsed > 0: |
| self.state.antenna_blend = min(1.0, elapsed / ANTENNA_BLEND_DURATION) |
|
|
| if self.state.antenna_blend >= 1.0: |
| self.state.antenna_frozen = False |
| logger.debug("Antennas unfrozen") |
|
|
| def _compose_final_pose(self) -> Dict[str, float]: |
| """Compose final pose from all sources.""" |
| |
| pitch = self.state.target_pitch |
| yaw = self.state.target_yaw |
| roll = self.state.target_roll |
| x = self.state.target_x |
| y = self.state.target_y |
| z = self.state.target_z |
|
|
| |
| pitch += self.state.speech_pitch |
| yaw += self.state.speech_yaw |
| roll += self.state.speech_roll |
| x += self.state.speech_x |
| y += self.state.speech_y |
| z += self.state.speech_z |
|
|
| |
| z += self.state.breathing_z |
|
|
| |
| target_antenna_left = self.state.target_antenna_left + self.state.breathing_antenna_left |
| target_antenna_right = self.state.target_antenna_right + self.state.breathing_antenna_right |
|
|
| |
| blend = self.state.antenna_blend |
| if blend < 1.0: |
| |
| antenna_left = (self.state.frozen_antenna_left * (1.0 - blend) + |
| target_antenna_left * blend) |
| antenna_right = (self.state.frozen_antenna_right * (1.0 - blend) + |
| target_antenna_right * blend) |
| else: |
| antenna_left = target_antenna_left |
| antenna_right = target_antenna_right |
|
|
| return { |
| "pitch": pitch, |
| "yaw": yaw, |
| "roll": roll, |
| "x": x, |
| "y": y, |
| "z": z, |
| "antenna_left": antenna_left, |
| "antenna_right": antenna_right, |
| "body_yaw": self.state.target_body_yaw, |
| } |
|
|
| |
| |
| |
|
|
| def _issue_control_command(self, pose: Dict[str, float]) -> None: |
| """Send control command to robot with error throttling and connection health tracking.""" |
| if self.robot is None: |
| return |
|
|
| |
| now = self._now() |
| if self._connection_lost: |
| time_since_last_success = now - self._last_successful_command |
|
|
| |
| if now - self._last_reconnect_attempt >= self._reconnect_attempt_interval: |
| self._last_reconnect_attempt = now |
| logger.info("Attempting to send command after connection loss...") |
| else: |
| |
| return |
|
|
| try: |
| |
| rotation = R.from_euler('xyz', [ |
| pose["pitch"], |
| pose["roll"], |
| pose["yaw"], |
| ]) |
|
|
| head_pose = np.eye(4) |
| head_pose[:3, :3] = rotation.as_matrix() |
| head_pose[0, 3] = pose["x"] |
| head_pose[1, 3] = pose["y"] |
| head_pose[2, 3] = pose["z"] |
|
|
| |
| self.robot.set_target( |
| head=head_pose, |
| antennas=[pose["antenna_right"], pose["antenna_left"]], |
| body_yaw=pose["body_yaw"], |
| ) |
|
|
| |
| self._last_successful_command = now |
| if self._connection_lost: |
| logger.info("✓ Connection to robot restored") |
| self._connection_lost = False |
| self._suppressed_errors = 0 |
|
|
| except Exception as e: |
| error_msg = str(e) |
|
|
| |
| if "Lost connection" in error_msg or "ZError" in error_msg: |
| time_since_last_success = now - self._last_successful_command |
|
|
| if not self._connection_lost and time_since_last_success > self._connection_timeout: |
| |
| logger.error(f"✗ Lost connection to robot daemon: {error_msg}") |
| logger.error(" Troubleshooting steps:") |
| logger.error(" 1. Check if Reachy Mini Daemon is running: sudo systemctl status reachy-mini-daemon") |
| logger.error(" 2. Verify Zenoh service on port 7447: netstat -tlnp | grep 7447") |
| logger.error(" 3. Check robot hardware connections") |
| logger.error(" 4. Review daemon logs: sudo journalctl -u reachy-mini-daemon -n 50") |
| logger.error(f" Will retry connection every {self._reconnect_attempt_interval}s...") |
| self._connection_lost = True |
| self._last_reconnect_attempt = now |
| elif self._connection_lost: |
| |
| self._log_error_throttled(f"Connection still lost: {error_msg}") |
| else: |
| |
| self._log_error_throttled(f"Failed to set robot target: {error_msg}") |
| else: |
| |
| self._log_error_throttled(f"Failed to set robot target: {error_msg}") |
|
|
| def _log_error_throttled(self, message: str) -> None: |
| """Log error with throttling to prevent log explosion.""" |
| now = self._now() |
| if now - self._last_error_time >= self._error_interval: |
| if self._suppressed_errors > 0: |
| message += f" (suppressed {self._suppressed_errors} repeats)" |
| self._suppressed_errors = 0 |
| logger.error(message) |
| self._last_error_time = now |
| else: |
| self._suppressed_errors += 1 |
|
|
| |
| |
| |
|
|
| def _control_loop(self) -> None: |
| """Main 100Hz control loop.""" |
| logger.info("Movement manager control loop started (%.0f Hz)", CONTROL_LOOP_FREQUENCY_HZ) |
|
|
| last_time = self._now() |
|
|
| while not self._stop_event.is_set(): |
| loop_start = self._now() |
| dt = loop_start - last_time |
| last_time = loop_start |
|
|
| try: |
| |
| self._poll_commands() |
|
|
| |
| self._update_action(dt) |
|
|
| |
| self._update_speech_sway(dt) |
|
|
| |
| self._update_breathing(dt) |
|
|
| |
| self._update_antenna_blend(dt) |
|
|
| |
| pose = self._compose_final_pose() |
|
|
| |
| self._issue_control_command(pose) |
|
|
| except Exception as e: |
| self._log_error_throttled(f"Control loop error: {e}") |
|
|
| |
| elapsed = self._now() - loop_start |
| sleep_time = max(0.0, TARGET_PERIOD - elapsed) |
| if sleep_time > 0: |
| time.sleep(sleep_time) |
|
|
| logger.info("Movement manager control loop stopped") |
|
|
| |
| |
| |
|
|
| def start(self) -> None: |
| """Start the control loop.""" |
| if self._thread is not None and self._thread.is_alive(): |
| logger.warning("Movement manager already running") |
| return |
|
|
| self._stop_event.clear() |
| self._thread = threading.Thread( |
| target=self._control_loop, |
| daemon=True, |
| name="MovementManager", |
| ) |
| self._thread.start() |
| logger.info("Movement manager started") |
|
|
| def stop(self) -> None: |
| """Stop the control loop and reset robot.""" |
| if self._thread is None or not self._thread.is_alive(): |
| return |
|
|
| logger.info("Stopping movement manager...") |
|
|
| |
| self._stop_event.set() |
|
|
| |
| self._thread.join(timeout=2.0) |
| if self._thread.is_alive(): |
| logger.warning("Movement manager thread did not stop in time") |
|
|
| |
| self._reset_to_neutral_blocking() |
|
|
| logger.info("Movement manager stopped") |
|
|
| def _reset_to_neutral_blocking(self) -> None: |
| """Reset robot to neutral position (blocking).""" |
| if self.robot is None: |
| return |
|
|
| try: |
| neutral_pose = np.eye(4) |
| self.robot.goto_target( |
| head=neutral_pose, |
| antennas=[0.0, 0.0], |
| body_yaw=0.0, |
| duration=1.0, |
| ) |
| logger.info("Robot reset to neutral position") |
| except Exception as e: |
| logger.error("Failed to reset robot: %s", e) |
|
|
| @property |
| def is_running(self) -> bool: |
| """Check if control loop is running.""" |
| return self._thread is not None and self._thread.is_alive() |
|
|