File size: 7,703 Bytes
6563dc6
 
 
beb8e32
6563dc6
b8cfa60
 
 
6563dc6
581cea0
b8cfa60
 
 
 
 
58f00d3
 
6563dc6
beb8e32
58f00d3
b8cfa60
 
 
6563dc6
770da68
b8cfa60
581cea0
a33ba57
6563dc6
 
 
9ef4811
 
a33ba57
9ef4811
 
 
2c4f920
a33ba57
b8cfa60
 
 
 
6563dc6
 
 
 
 
beb8e32
 
581cea0
beb8e32
 
 
770da68
beb8e32
 
 
 
6563dc6
 
 
 
a33ba57
bf7c901
a33ba57
b8cfa60
58f00d3
6563dc6
 
 
 
 
 
 
 
 
58f00d3
 
 
 
 
beb8e32
 
cdc9926
6563dc6
51ad4f0
cdc9926
beb8e32
6563dc6
 
b8cfa60
6563dc6
51ad4f0
770da68
beb8e32
6563dc6
beb8e32
b8cfa60
58f00d3
6563dc6
cdc9926
6563dc6
58f00d3
6563dc6
58f00d3
6563dc6
 
634577b
b8cfa60
3f1b696
 
581cea0
3f1b696
 
 
 
 
 
581cea0
3f1b696
58f00d3
6563dc6
58f00d3
6563dc6
581cea0
58f00d3
6563dc6
b8cfa60
6563dc6
 
634577b
b8cfa60
58f00d3
 
b8cfa60
6563dc6
581cea0
58f00d3
6563dc6
0d8fe98
b8cfa60
6563dc6
58f00d3
6563dc6
581cea0
b8cfa60
58f00d3
 
b8cfa60
6563dc6
58f00d3
6563dc6
b8cfa60
6563dc6
58f00d3
6563dc6
 
b8cfa60
58f00d3
 
b8cfa60
6563dc6
51ad4f0
58f00d3
6563dc6
b8cfa60
6563dc6
58f00d3
6563dc6
634577b
770da68
51ad4f0
 
770da68
634577b
b8cfa60
58f00d3
 
b8cfa60
6563dc6
58f00d3
6563dc6
b8cfa60
6563dc6
 
 
 
b8cfa60
58f00d3
 
b8cfa60
6563dc6
58f00d3
6563dc6
b8cfa60
6563dc6
 
 
b8cfa60
58f00d3
 
b8cfa60
581cea0
58f00d3
6563dc6
b8cfa60
58f00d3
581cea0
 
6563dc6
581cea0
6563dc6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""Reachy Mini motion control integration.

This module provides a high-level motion API that delegates to the
MovementManager for unified 5Hz control with face tracking.
"""

import logging
from typing import Optional

from .movement_manager import MovementManager, RobotState

_LOGGER = logging.getLogger(__name__)


class ReachyMiniMotion:
    """Reachy Mini motion controller for voice assistant.

    All public motion methods (on_*) are non-blocking. They send commands
    to the MovementManager which handles them in its 5Hz control loop.
    """

    def __init__(self, reachy_mini=None):
        self.reachy_mini = reachy_mini
        self._movement_manager: Optional[MovementManager] = None
        self._camera_server = None  # Reference to camera server for face tracking control
        self._is_speaking = False

        _LOGGER.debug("ReachyMiniMotion.__init__ called with reachy_mini=%s", reachy_mini)

        # Initialize movement manager if robot is available
        if reachy_mini is not None:
            try:
                self._movement_manager = MovementManager(reachy_mini)
                _LOGGER.debug("MovementManager created successfully")
            except Exception as e:
                _LOGGER.error("Failed to create MovementManager: %s", e, exc_info=True)
                self._movement_manager = None
        else:
            _LOGGER.debug("reachy_mini is None, MovementManager not created")

    def set_reachy_mini(self, reachy_mini):
        """Set the Reachy Mini instance."""
        self.reachy_mini = reachy_mini
        if reachy_mini is not None and self._movement_manager is None:
            self._movement_manager = MovementManager(reachy_mini)
        elif reachy_mini is not None and self._movement_manager is not None:
            self._movement_manager.robot = reachy_mini

    def set_camera_server(self, camera_server):
        """Set the camera server for face tracking.

        Args:
            camera_server: MJPEGCameraServer instance with face tracking enabled
        """
        self._camera_server = camera_server
        if self._movement_manager is not None:
            self._movement_manager.set_camera_server(camera_server)
            _LOGGER.info("Camera server connected for face tracking")

    def start(self):
        """Start the movement manager control loop."""
        if self._movement_manager is not None:
            self._movement_manager.start()
            _LOGGER.info("Motion control started")
        else:
            _LOGGER.warning("Motion control not started: movement_manager is None")

    def shutdown(self):
        """Shutdown the motion controller."""
        if self._movement_manager is not None:
            self._movement_manager.stop()
            _LOGGER.info("Motion control stopped")

    @property
    def movement_manager(self) -> Optional[MovementManager]:
        """Get the movement manager instance."""
        return self._movement_manager

    # -------------------------------------------------------------------------
    # Public non-blocking motion methods
    # -------------------------------------------------------------------------

    def on_wakeup(self):
        """Called when wake word is detected.

        Non-blocking: command sent to MovementManager.
        Face tracking is always enabled, so robot will look at user automatically.
        """
        _LOGGER.debug("on_wakeup called")
        if self._movement_manager is None:
            _LOGGER.warning("on_wakeup: movement_manager is None, skipping motion")
            return

        # Face tracking is always enabled, no need to enable it here

        # Set listening state - face tracking will handle looking at user
        self._movement_manager.set_state(RobotState.LISTENING)
        _LOGGER.info("Wake word detected, entering listening state")

    def on_listening(self):
        """Called when listening for speech - attentive pose.

        Non-blocking: command sent to MovementManager.
        """
        if self._movement_manager is None:
            return

        self._movement_manager.set_state(RobotState.LISTENING)
        _LOGGER.debug("Reachy Mini: Listening pose")

    def on_continue_listening(self):
        """Called when continuing to listen in tap conversation mode.

        Non-blocking: command sent to MovementManager.
        """
        if self._movement_manager is None:
            return

        self._movement_manager.set_state(RobotState.LISTENING)
        _LOGGER.debug("Reachy Mini: Continue listening")

    def on_thinking(self):
        """Called when processing speech - thinking pose.

        Non-blocking: command sent to MovementManager.
        Animation offsets are defined in conversation_animations.json.
        """
        if self._movement_manager is None:
            return

        self._movement_manager.set_state(RobotState.THINKING)
        _LOGGER.debug("Reachy Mini: Thinking pose")

    def on_speaking_start(self):
        """Called when TTS starts - start speech-reactive motion.

        Non-blocking: command sent to MovementManager.
        Animation is defined in conversation_animations.json.
        """
        if self._movement_manager is None:
            _LOGGER.warning("MovementManager not initialized, skipping speaking animation")
            return

        self._is_speaking = True
        self._movement_manager.set_state(RobotState.SPEAKING)
        _LOGGER.info("Reachy Mini: Speaking animation started")

    def on_speaking_end(self):
        """Called when TTS ends - stop speech-reactive motion.

        Non-blocking: command sent to MovementManager.
        """
        if self._movement_manager is None:
            return

        self._is_speaking = False
        # Don't change state yet - let on_idle handle that
        _LOGGER.debug("Reachy Mini: Speaking ended")

    def on_idle(self):
        """Called when returning to idle state.

        Non-blocking: command sent to MovementManager.
        Face tracking remains enabled for continuous tracking.
        """
        if self._movement_manager is None:
            return

        self._is_speaking = False
        self._movement_manager.set_state(RobotState.IDLE)
        self._movement_manager.reset_to_neutral(duration=0.5)

        # Note: Face tracking remains enabled for continuous tracking
        # This allows the robot to always look at the user when they approach

        _LOGGER.debug("Reachy Mini: Idle pose")

    def on_timer_finished(self):
        """Called when a timer finishes - alert animation.

        Non-blocking: command sent to MovementManager.
        """
        if self._movement_manager is None:
            return

        # Quick shake to alert
        self._movement_manager.shake(amplitude_deg=15, duration=0.4)
        _LOGGER.debug("Reachy Mini: Timer finished animation")

    def on_error(self):
        """Called on error - shake head.

        Non-blocking: command sent to MovementManager.
        """
        if self._movement_manager is None:
            return

        self._movement_manager.shake(amplitude_deg=10, duration=0.3)
        _LOGGER.debug("Reachy Mini: Error animation")

    def wiggle_antennas(self, happy: bool = True):
        """Wiggle antennas to show emotion.

        Non-blocking: antenna movement is handled by animation system.
        """
        if self._movement_manager is None:
            return

        # Antenna movement is handled by animation system
        # Set appropriate animation state
        if happy:
            self._movement_manager.set_state(RobotState.SPEAKING)
        _LOGGER.debug("Reachy Mini: Antenna wiggle (%s)", "happy" if happy else "sad")