File size: 8,837 Bytes
6563dc6 b8cfa60 6563dc6 b8cfa60 6563dc6 b8cfa60 58f00d3 6563dc6 58f00d3 b8cfa60 6563dc6 b8cfa60 6563dc6 b8cfa60 6563dc6 b8cfa60 58f00d3 6563dc6 58f00d3 cdc9926 6563dc6 cdc9926 6563dc6 58f00d3 cdc9926 6563dc6 cdc9926 6563dc6 b8cfa60 6563dc6 b8cfa60 58f00d3 6563dc6 cdc9926 6563dc6 58f00d3 6563dc6 58f00d3 6563dc6 b8cfa60 58f00d3 6563dc6 58f00d3 6563dc6 58f00d3 6563dc6 b8cfa60 6563dc6 b8cfa60 58f00d3 b8cfa60 6563dc6 58f00d3 6563dc6 b8cfa60 6563dc6 58f00d3 6563dc6 b8cfa60 58f00d3 b8cfa60 6563dc6 58f00d3 6563dc6 b8cfa60 6563dc6 58f00d3 6563dc6 b8cfa60 58f00d3 b8cfa60 6563dc6 58f00d3 6563dc6 b8cfa60 6563dc6 58f00d3 6563dc6 b8cfa60 58f00d3 b8cfa60 6563dc6 58f00d3 6563dc6 b8cfa60 6563dc6 b8cfa60 58f00d3 b8cfa60 6563dc6 58f00d3 6563dc6 b8cfa60 6563dc6 b8cfa60 58f00d3 b8cfa60 6563dc6 58f00d3 6563dc6 b8cfa60 58f00d3 6563dc6 58f00d3 6563dc6 b8cfa60 6563dc6 b8cfa60 6563dc6 89da2bd 6563dc6 89da2bd 6563dc6 58f00d3 6563dc6 b8cfa60 6563dc6 b8cfa60 6563dc6 b8cfa60 6563dc6 b8cfa60 6563dc6 b8cfa60 6563dc6 b8cfa60 6563dc6 b8cfa60 6563dc6 58f00d3 6563dc6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 | """Reachy Mini motion control integration.
This module provides a high-level motion API that delegates to the
MovementManager for unified 100Hz control.
"""
import logging
import math
from typing import Optional
from .movement_manager import MovementManager, RobotState, PendingAction
_LOGGER = logging.getLogger(__name__)
class ReachyMiniMotion:
"""Reachy Mini motion controller for voice assistant.
All public motion methods (on_*) are non-blocking. They send commands
to the MovementManager which handles them in its 100Hz control loop.
"""
def __init__(self, reachy_mini=None):
self.reachy_mini = reachy_mini
self._movement_manager: Optional[MovementManager] = None
self._is_speaking = False
# Initialize movement manager if robot is available
if reachy_mini is not None:
self._movement_manager = MovementManager(reachy_mini)
def set_reachy_mini(self, reachy_mini):
"""Set the Reachy Mini instance."""
self.reachy_mini = reachy_mini
if reachy_mini is not None and self._movement_manager is None:
self._movement_manager = MovementManager(reachy_mini)
elif reachy_mini is not None and self._movement_manager is not None:
self._movement_manager.robot = reachy_mini
def start(self):
"""Start the movement manager control loop."""
if self._movement_manager is not None:
self._movement_manager.start()
_LOGGER.info("Motion control started")
def shutdown(self):
"""Shutdown the motion controller."""
if self._movement_manager is not None:
self._movement_manager.stop()
_LOGGER.info("Motion control stopped")
@property
def movement_manager(self) -> Optional[MovementManager]:
"""Get the movement manager instance."""
return self._movement_manager
# -------------------------------------------------------------------------
# Public non-blocking motion methods
# -------------------------------------------------------------------------
def on_wakeup(self, doa_angle_deg: Optional[float] = None):
"""Called when wake word is detected - turn to sound source.
Non-blocking: command sent to MovementManager.
Args:
doa_angle_deg: Direction of arrival angle in degrees
(0=front, positive=right, negative=left)
"""
_LOGGER.debug("on_wakeup called with doa_angle_deg=%s", doa_angle_deg)
if self._movement_manager is None:
_LOGGER.warning("on_wakeup: movement_manager is None, skipping motion")
return
# Turn to sound source if DOA available
if doa_angle_deg is not None:
# Clamp to reasonable head rotation limits
yaw_deg = max(-60, min(60, doa_angle_deg))
self._movement_manager.turn_to_angle(yaw_deg, duration=0.8)
_LOGGER.info("Turning to sound source at %.1f degrees", yaw_deg)
else:
# Look forward
self._movement_manager.reset_to_neutral(duration=0.3)
_LOGGER.debug("DOA angle is None, looking forward")
# Set listening state
self._movement_manager.set_state(RobotState.LISTENING)
def on_listening(self):
"""Called when listening for speech - attentive pose.
Non-blocking: command sent to MovementManager.
"""
if self._movement_manager is None:
return
self._movement_manager.set_state(RobotState.LISTENING)
_LOGGER.debug("Reachy Mini: Listening pose")
def on_thinking(self):
"""Called when processing speech - thinking pose.
Non-blocking: command sent to MovementManager.
"""
if self._movement_manager is None:
return
self._movement_manager.set_state(RobotState.THINKING)
# Look up slightly (thinking gesture)
action = PendingAction(
name="thinking",
target_pitch=math.radians(-10), # Look up
target_yaw=math.radians(5), # Slight turn
duration=0.4,
)
self._movement_manager.queue_action(action)
_LOGGER.debug("Reachy Mini: Thinking pose")
def on_speaking_start(self):
"""Called when TTS starts - start speech-reactive motion.
Non-blocking: command sent to MovementManager.
"""
if self._movement_manager is None:
return
self._is_speaking = True
self._movement_manager.set_state(RobotState.SPEAKING)
# Gentle nod to indicate speaking
action = PendingAction(
name="speaking_start",
target_pitch=math.radians(5), # Slight nod down
duration=0.3,
)
self._movement_manager.queue_action(action)
_LOGGER.debug("Reachy Mini: Speaking started")
def on_speaking_end(self):
"""Called when TTS ends - stop speech-reactive motion.
Non-blocking: command sent to MovementManager.
"""
if self._movement_manager is None:
return
self._is_speaking = False
# Don't change state yet - let on_idle handle that
_LOGGER.debug("Reachy Mini: Speaking ended")
def on_idle(self):
"""Called when returning to idle state.
Non-blocking: command sent to MovementManager.
"""
if self._movement_manager is None:
return
self._is_speaking = False
self._movement_manager.set_state(RobotState.IDLE)
self._movement_manager.reset_to_neutral(duration=0.5)
_LOGGER.debug("Reachy Mini: Idle pose")
def on_timer_finished(self):
"""Called when a timer finishes - alert animation.
Non-blocking: command sent to MovementManager.
"""
if self._movement_manager is None:
return
# Quick shake to alert
self._movement_manager.shake(amplitude_deg=15, duration=0.4)
_LOGGER.debug("Reachy Mini: Timer finished animation")
def on_error(self):
"""Called on error - shake head.
Non-blocking: command sent to MovementManager.
"""
if self._movement_manager is None:
return
self._movement_manager.shake(amplitude_deg=10, duration=0.3)
_LOGGER.debug("Reachy Mini: Error animation")
def wiggle_antennas(self, happy: bool = True):
"""Wiggle antennas to show emotion.
Non-blocking: command sent to MovementManager.
"""
if self._movement_manager is None:
return
# Queue antenna wiggle action
if happy:
action = PendingAction(
name="antenna_happy",
duration=0.2,
)
# Note: antenna control is handled in MovementManager state
else:
action = PendingAction(
name="antenna_sad",
duration=0.2,
)
self._movement_manager.queue_action(action)
_LOGGER.debug("Reachy Mini: Antenna wiggle (%s)", "happy" if happy else "sad")
def update_audio_loudness(self, loudness_db: float):
"""Update audio loudness for speech-driven sway.
Call this periodically during TTS playback to enable
natural head movements synchronized with speech.
Args:
loudness_db: Audio loudness in dBFS (typically -60 to 0)
"""
if self._movement_manager is not None:
self._movement_manager.update_audio_loudness(loudness_db)
# -------------------------------------------------------------------------
# Legacy compatibility methods (deprecated, use MovementManager directly)
# -------------------------------------------------------------------------
def _nod(self, count: int = 1, amplitude: float = 15, duration: float = 0.5):
"""Nod head up and down (legacy)."""
if self._movement_manager is None:
return
for _ in range(count):
self._movement_manager.nod(amplitude_deg=amplitude, duration=duration)
def _shake(self, count: int = 1, amplitude: float = 20, duration: float = 0.5):
"""Shake head left and right (legacy)."""
if self._movement_manager is None:
return
for _ in range(count):
self._movement_manager.shake(amplitude_deg=amplitude, duration=duration)
def _look_at_user(self):
"""Look at user (legacy)."""
if self._movement_manager is None:
return
self._movement_manager.reset_to_neutral(duration=0.3)
def _return_to_neutral(self):
"""Return to neutral position (legacy)."""
if self._movement_manager is None:
return
self._movement_manager.reset_to_neutral(duration=0.5)
|