File size: 8,837 Bytes
6563dc6
 
 
 
 
b8cfa60
 
6563dc6
b8cfa60
6563dc6
 
b8cfa60
 
 
 
 
58f00d3
 
6563dc6
 
58f00d3
b8cfa60
 
 
6563dc6
b8cfa60
6563dc6
 
 
 
b8cfa60
 
 
 
6563dc6
 
 
 
 
 
 
 
 
 
b8cfa60
58f00d3
6563dc6
 
 
 
 
 
 
 
 
58f00d3
 
 
 
 
cdc9926
6563dc6
cdc9926
6563dc6
58f00d3
cdc9926
6563dc6
 
cdc9926
6563dc6
 
 
b8cfa60
6563dc6
 
 
 
 
 
 
 
 
 
 
 
 
 
b8cfa60
58f00d3
6563dc6
cdc9926
6563dc6
58f00d3
6563dc6
58f00d3
6563dc6
 
 
b8cfa60
58f00d3
6563dc6
58f00d3
6563dc6
58f00d3
6563dc6
b8cfa60
6563dc6
 
 
 
 
 
 
 
 
 
 
 
b8cfa60
58f00d3
 
b8cfa60
6563dc6
58f00d3
6563dc6
b8cfa60
6563dc6
58f00d3
6563dc6
 
 
 
 
 
 
 
 
 
b8cfa60
58f00d3
 
b8cfa60
6563dc6
58f00d3
6563dc6
b8cfa60
6563dc6
58f00d3
6563dc6
 
b8cfa60
58f00d3
 
b8cfa60
6563dc6
58f00d3
6563dc6
b8cfa60
6563dc6
58f00d3
6563dc6
 
 
b8cfa60
58f00d3
 
b8cfa60
6563dc6
58f00d3
6563dc6
b8cfa60
6563dc6
 
 
 
b8cfa60
58f00d3
 
b8cfa60
6563dc6
58f00d3
6563dc6
b8cfa60
6563dc6
 
 
b8cfa60
58f00d3
 
b8cfa60
6563dc6
58f00d3
6563dc6
b8cfa60
58f00d3
6563dc6
 
 
 
 
 
 
 
 
 
 
 
58f00d3
6563dc6
 
b8cfa60
6563dc6
 
b8cfa60
6563dc6
 
89da2bd
6563dc6
 
89da2bd
6563dc6
 
58f00d3
 
6563dc6
b8cfa60
 
 
6563dc6
 
b8cfa60
6563dc6
 
b8cfa60
 
6563dc6
 
b8cfa60
6563dc6
 
b8cfa60
 
6563dc6
 
b8cfa60
6563dc6
b8cfa60
 
6563dc6
 
58f00d3
6563dc6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
"""Reachy Mini motion control integration.

This module provides a high-level motion API that delegates to the
MovementManager for unified 100Hz control.
"""

import logging
import math
from typing import Optional

from .movement_manager import MovementManager, RobotState, PendingAction

_LOGGER = logging.getLogger(__name__)


class ReachyMiniMotion:
    """Reachy Mini motion controller for voice assistant.

    All public motion methods (on_*) are non-blocking. They send commands
    to the MovementManager which handles them in its 100Hz control loop.
    """

    def __init__(self, reachy_mini=None):
        self.reachy_mini = reachy_mini
        self._movement_manager: Optional[MovementManager] = None
        self._is_speaking = False

        # Initialize movement manager if robot is available
        if reachy_mini is not None:
            self._movement_manager = MovementManager(reachy_mini)

    def set_reachy_mini(self, reachy_mini):
        """Set the Reachy Mini instance."""
        self.reachy_mini = reachy_mini
        if reachy_mini is not None and self._movement_manager is None:
            self._movement_manager = MovementManager(reachy_mini)
        elif reachy_mini is not None and self._movement_manager is not None:
            self._movement_manager.robot = reachy_mini

    def start(self):
        """Start the movement manager control loop."""
        if self._movement_manager is not None:
            self._movement_manager.start()
            _LOGGER.info("Motion control started")

    def shutdown(self):
        """Shutdown the motion controller."""
        if self._movement_manager is not None:
            self._movement_manager.stop()
            _LOGGER.info("Motion control stopped")

    @property
    def movement_manager(self) -> Optional[MovementManager]:
        """Get the movement manager instance."""
        return self._movement_manager

    # -------------------------------------------------------------------------
    # Public non-blocking motion methods
    # -------------------------------------------------------------------------

    def on_wakeup(self, doa_angle_deg: Optional[float] = None):
        """Called when wake word is detected - turn to sound source.

        Non-blocking: command sent to MovementManager.

        Args:
            doa_angle_deg: Direction of arrival angle in degrees
                          (0=front, positive=right, negative=left)
        """
        _LOGGER.debug("on_wakeup called with doa_angle_deg=%s", doa_angle_deg)
        if self._movement_manager is None:
            _LOGGER.warning("on_wakeup: movement_manager is None, skipping motion")
            return

        # Turn to sound source if DOA available
        if doa_angle_deg is not None:
            # Clamp to reasonable head rotation limits
            yaw_deg = max(-60, min(60, doa_angle_deg))
            self._movement_manager.turn_to_angle(yaw_deg, duration=0.8)
            _LOGGER.info("Turning to sound source at %.1f degrees", yaw_deg)
        else:
            # Look forward
            self._movement_manager.reset_to_neutral(duration=0.3)
            _LOGGER.debug("DOA angle is None, looking forward")

        # Set listening state
        self._movement_manager.set_state(RobotState.LISTENING)

    def on_listening(self):
        """Called when listening for speech - attentive pose.

        Non-blocking: command sent to MovementManager.
        """
        if self._movement_manager is None:
            return

        self._movement_manager.set_state(RobotState.LISTENING)
        _LOGGER.debug("Reachy Mini: Listening pose")

    def on_thinking(self):
        """Called when processing speech - thinking pose.

        Non-blocking: command sent to MovementManager.
        """
        if self._movement_manager is None:
            return

        self._movement_manager.set_state(RobotState.THINKING)

        # Look up slightly (thinking gesture)
        action = PendingAction(
            name="thinking",
            target_pitch=math.radians(-10),  # Look up
            target_yaw=math.radians(5),      # Slight turn
            duration=0.4,
        )
        self._movement_manager.queue_action(action)
        _LOGGER.debug("Reachy Mini: Thinking pose")

    def on_speaking_start(self):
        """Called when TTS starts - start speech-reactive motion.

        Non-blocking: command sent to MovementManager.
        """
        if self._movement_manager is None:
            return

        self._is_speaking = True
        self._movement_manager.set_state(RobotState.SPEAKING)

        # Gentle nod to indicate speaking
        action = PendingAction(
            name="speaking_start",
            target_pitch=math.radians(5),  # Slight nod down
            duration=0.3,
        )
        self._movement_manager.queue_action(action)
        _LOGGER.debug("Reachy Mini: Speaking started")

    def on_speaking_end(self):
        """Called when TTS ends - stop speech-reactive motion.

        Non-blocking: command sent to MovementManager.
        """
        if self._movement_manager is None:
            return

        self._is_speaking = False
        # Don't change state yet - let on_idle handle that
        _LOGGER.debug("Reachy Mini: Speaking ended")

    def on_idle(self):
        """Called when returning to idle state.

        Non-blocking: command sent to MovementManager.
        """
        if self._movement_manager is None:
            return

        self._is_speaking = False
        self._movement_manager.set_state(RobotState.IDLE)
        self._movement_manager.reset_to_neutral(duration=0.5)
        _LOGGER.debug("Reachy Mini: Idle pose")

    def on_timer_finished(self):
        """Called when a timer finishes - alert animation.

        Non-blocking: command sent to MovementManager.
        """
        if self._movement_manager is None:
            return

        # Quick shake to alert
        self._movement_manager.shake(amplitude_deg=15, duration=0.4)
        _LOGGER.debug("Reachy Mini: Timer finished animation")

    def on_error(self):
        """Called on error - shake head.

        Non-blocking: command sent to MovementManager.
        """
        if self._movement_manager is None:
            return

        self._movement_manager.shake(amplitude_deg=10, duration=0.3)
        _LOGGER.debug("Reachy Mini: Error animation")

    def wiggle_antennas(self, happy: bool = True):
        """Wiggle antennas to show emotion.

        Non-blocking: command sent to MovementManager.
        """
        if self._movement_manager is None:
            return

        # Queue antenna wiggle action
        if happy:
            action = PendingAction(
                name="antenna_happy",
                duration=0.2,
            )
            # Note: antenna control is handled in MovementManager state
        else:
            action = PendingAction(
                name="antenna_sad",
                duration=0.2,
            )

        self._movement_manager.queue_action(action)
        _LOGGER.debug("Reachy Mini: Antenna wiggle (%s)", "happy" if happy else "sad")

    def update_audio_loudness(self, loudness_db: float):
        """Update audio loudness for speech-driven sway.

        Call this periodically during TTS playback to enable
        natural head movements synchronized with speech.

        Args:
            loudness_db: Audio loudness in dBFS (typically -60 to 0)
        """
        if self._movement_manager is not None:
            self._movement_manager.update_audio_loudness(loudness_db)

    # -------------------------------------------------------------------------
    # Legacy compatibility methods (deprecated, use MovementManager directly)
    # -------------------------------------------------------------------------

    def _nod(self, count: int = 1, amplitude: float = 15, duration: float = 0.5):
        """Nod head up and down (legacy)."""
        if self._movement_manager is None:
            return
        for _ in range(count):
            self._movement_manager.nod(amplitude_deg=amplitude, duration=duration)

    def _shake(self, count: int = 1, amplitude: float = 20, duration: float = 0.5):
        """Shake head left and right (legacy)."""
        if self._movement_manager is None:
            return
        for _ in range(count):
            self._movement_manager.shake(amplitude_deg=amplitude, duration=duration)

    def _look_at_user(self):
        """Look at user (legacy)."""
        if self._movement_manager is None:
            return
        self._movement_manager.reset_to_neutral(duration=0.3)

    def _return_to_neutral(self):
        """Return to neutral position (legacy)."""
        if self._movement_manager is None:
            return
        self._movement_manager.reset_to_neutral(duration=0.5)