reachy_mini_home_assistant / ARCHITECTURE.md
Desmond-Dong's picture
docs: update architecture to reflect Home Assistant STT/TTS
6f4787d
|
raw
history blame
37.4 kB

Reachy Mini Home Assistant Voice Assistant - 架构设计

1. 系统架构概览

┌─────────────────────────────────────────────────────────────────┐
│                        应用层 (Application Layer)                │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐           │
│  │ Home         │  │ Web UI       │  │ Console      │           │
│  │ Assistant    │  │ (Gradio)     │  │ Interface    │           │
│  └──────────────┘  └──────────────┘  └──────────────┘           │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                      业务逻辑层 (Business Logic)                 │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐           │
│  │ Voice        │  │ Motion       │  │ State        │           │
│  │ Manager      │  │ Controller   │  │ Manager      │           │
│  └──────────────┘  └──────────────┘  └──────────────┘           │
│  ┌──────────────┐  ┌──────────────┐                           │
│  │ ESPHome      │  │ Event        │                           │
│  │ Handler      │  │ Dispatcher   │                           │
│  └──────────────┘  └──────────────┘                           │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                        服务层 (Services)                         │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐           │
│  │ Wake Word    │  │ Audio        │  │ Motion       │           │
│  │ Detector     │  │ Processor    │  │ Queue        │           │
│  └──────────────┘  └──────────────┘  └──────────────┘           │
│  ┌──────────────────────────────────────────────────────┐       │
│  │ ESPHome Protocol (Audio Streaming to/from HA)       │       │
│  └──────────────────────────────────────────────────────┘       │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                      硬件抽象层 (HAL)                            │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐           │
│  │ Audio        │  │ Motion       │  │ Camera       │           │
│  │ Adapter      │  │ Adapter      │  │ Adapter      │           │
│  └──────────────┘  └──────────────┘  └──────────────┘           │
│  ┌──────────────┐  ┌──────────────┐                           │
│  │ Reachy Mini  │  │ ESPHome      │                           │
│  │ SDK Wrapper  │  │ Protocol     │                           │
│  └──────────────┘  └──────────────┘                           │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│              Reachy Mini Hardware + Home Assistant              │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐           │
│  │ Microphone   │  │ Head Motors  │  │ Camera       │           │
│  │ Array (4)    │  │ (6 DOF)      │  │ (Wide)       │           │
│  └──────────────┘  └──────────────┘  └──────────────┘           │
│  ┌──────────────┐  ┌──────────────┐                           │
│  │ Speaker      │  │ Antennas     │                           │
│  │ (5W)         │  │ (2)          │                           │
│  └──────────────┘  └──────────────┘                           │
│                                                               │
│  ┌──────────────────────────────────────────────────────┐    │
│  │ Home Assistant (STT/TTS Processing)                 │    │
│  └──────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────┘

2. 核心设计原则

2.1 基于 linux-voice-assistant

本项目基于 OHF-Voice/linux-voice-assistant 的架构设计,主要特点:

  • STT/TTS 由 Home Assistant 处理:音频数据通过 ESPHome 协议传输到 Home Assistant,由 HA 进行语音识别和合成
  • 本地唤醒词检测:使用 microWakeWord 或 openWakeWord 进行离线唤醒词检测
  • ESPHome 协议通信:通过 ESPHome 协议与 Home Assistant 通信
  • 运动控制增强:集成 Reachy Mini 的运动控制能力

2.2 架构特点

  • 模块化设计:音频、语音、运动、ESPHome 各模块独立
  • 异步处理:使用 asyncio 实现高性能异步处理
  • 状态管理:集中的状态管理(ServerState)
  • 事件驱动:基于事件的通信机制

3. 模块设计

3.1 音频模块 (audio/)

职责

  • 音频设备管理(麦克风、扬声器)
  • 音频录制和播放
  • 音频格式转换(16KHz 单声道 PCM)

接口

class AudioAdapter(ABC):
    """音频设备适配器抽象基类"""
    
    @abstractmethod
    async def list_input_devices(self) -> List[AudioDevice]:
        """列出可用的音频输入设备"""
        pass
    
    @abstractmethod
    async def start_recording(
        self,
        device_id: str,
        callback: Callable[[bytes], None],
        sample_rate: int = 16000,
        channels: int = 1,
        block_size: int = 1024
    ):
        """开始录制音频"""
        pass
    
    @abstractmethod
    async def play_audio(
        self,
        audio_data: bytes,
        device_id: str,
        sample_rate: int = 16000,
        channels: int = 1
    ):
        """播放音频"""
        pass


class MicrophoneArray(AudioAdapter):
    """麦克风阵列适配器(Reachy Mini 的 4 麦克风阵列)"""
    
    def __init__(self, sample_rate: int = 16000, channels: int = 1):
        self.sample_rate = sample_rate
        self.channels = channels
        self._stream = None
        self._is_recording = False
        self._callback = None
        self._loop = None


class Speaker(AudioAdapter):
    """扬声器适配器(Reachy Mini 的 5W 扬声器)"""
    
    def __init__(self, sample_rate: int = 16000):
        self.sample_rate = sample_rate

音频处理器

class AudioProcessor:
    """处理音频块,用于唤醒词检测和流式传输"""
    
    def __init__(
        self,
        sample_rate: int = 16000,
        channels: int = 1,
        block_size: int = 1024
    ):
        self.sample_rate = sample_rate
        self.channels = channels
        self.block_size = block_size
        
        self._wake_word_callbacks: list[Callable[[bytes], None]] = []
        self._stream_callbacks: list[Callable[[bytes], None]] = []
    
    def add_wake_word_callback(self, callback: Callable[[bytes], None]):
        """添加唤醒词检测回调"""
        self._wake_word_callbacks.append(callback)
    
    def add_stream_callback(self, callback: Callable[[bytes], None]):
        """添加音频流回调(发送到 Home Assistant)"""
        self._stream_callbacks.append(callback)
    
    async def process_audio_chunk(self, audio_chunk: bytes):
        """处理音频块"""
        # 调用唤醒词检测回调
        for callback in self._wake_word_callbacks:
            callback(audio_chunk)
        
        # 调用流式传输回调
        for callback in self._stream_callbacks:
            callback(audio_chunk)

3.2 语音模块 (voice/)

职责

  • 唤醒词检测(本地离线)
  • STT/TTS 由 Home Assistant 处理(不在此模块)

接口

class WakeWordDetector(ABC):
    """唤醒词检测器抽象基类"""
    
    @abstractmethod
    async def load_model(self, model_path: str):
        """加载唤醒词模型"""
        pass
    
    @abstractmethod
    async def process_audio(self, audio_chunk: bytes) -> bool:
        """处理音频块,返回是否检测到唤醒词"""
        pass


class MicroWakeWordDetector(WakeWordDetector):
    """microWakeWord 检测器(轻量级,适合 Raspberry Pi)"""
    
    def __init__(self, model_path: str):
        self.model = None
        self.features = None
        self.model_path = Path(model_path)
        self._confidence = 0.0
        self._loaded = False
    
    async def load_model(self, model_path: str):
        """加载 microWakeWord 模型"""
        from pymicro_wakeword import MicroWakeWord, MicroWakeWordFeatures
        
        self.features = MicroWakeWordFeatures()
        self.model = MicroWakeWord.from_config(model_path)
        self._loaded = True
    
    async def process_audio(self, audio_chunk: bytes) -> bool:
        """处理音频块"""
        import numpy as np
        audio_array = np.frombuffer(audio_chunk, dtype=np.int16).astype(np.float32) / 32768.0
        
        features = self.features.process_streaming(audio_array)
        for feature in features:
            score = self.model.process_streaming(feature)
            if score is not None and score >= 0.5:
                return True
        return False


class OpenWakeWordDetector(WakeWordDetector):
    """openWakeWord 检测器(更多唤醒词选择)"""
    
    def __init__(self, model_path: str):
        self.model = None
        self.features = None
        self.model_path = Path(model_path)
        self._confidence = 0.0
        self._loaded = False
    
    async def load_model(self, model_path: str):
        """加载 openWakeWord 模型"""
        from pyopen_wakeword import OpenWakeWord, OpenWakeWordFeatures
        
        self.features = OpenWakeWordFeatures.from_builtin()
        self.model = OpenWakeWord(model_path)
        self._loaded = True
    
    async def process_audio(self, audio_chunk: bytes) -> bool:
        """处理音频块"""
        import numpy as np
        audio_array = np.frombuffer(audio_chunk, dtype=np.int16).astype(np.float32) / 32768.0
        
        features = self.features.process_streaming(audio_array)
        for feature in features:
            scores = self.model.process_streaming(feature)
            for score in scores:
                if score >= 0.5:
                    return True
        return False

3.3 运动模块 (motion/)

职责

  • 头部运动控制(6 自由度)
  • 天线控制(2 个天线)
  • 运动队列管理
  • 语音反应性运动

接口

class MotionController(ABC):
    """运动控制器抽象基类"""
    
    @abstractmethod
    async def connect(self, host: str = 'localhost'):
        """连接到机器人"""
        pass
    
    @abstractmethod
    async def wake_up(self):
        """唤醒机器人"""
        pass
    
    @abstractmethod
    async def turn_off(self):
        """关闭机器人"""
        pass
    
    @abstractmethod
    async def move_head(self, pose: np.ndarray, duration: float = 1.0):
        """移动头部到姿态"""
        pass
    
    @abstractmethod
    async def move_antennas(self, left: float, right: float, duration: float = 1.0):
        """移动天线"""
        pass
    
    @abstractmethod
    async def nod(self, count: int = 1, duration: float = 0.5):
        """点头"""
        pass
    
    @abstractmethod
    async def shake(self, count: int = 1, duration: float = 0.5):
        """摇头"""
        pass
    
    @abstractmethod
    async def start_speech_reactive_motion(self):
        """开始语音反应性运动"""
        pass
    
    @abstractmethod
    async def stop_speech_reactive_motion(self):
        """停止语音反应性运动"""
        pass


class ReachyMiniMotionController(MotionController):
    """Reachy Mini 运动控制器"""
    
    def __init__(self):
        self.reachy_mini = None
        self._connected = False
        self._speech_reactive = False
        self._speech_task = None
    
    async def connect(self, host: str = 'localhost'):
        """连接到 Reachy Mini"""
        from reachy_mini import ReachyMini
        
        self.reachy_mini = ReachyMini(host=host)
        self._connected = True
    
    async def wake_up(self):
        """唤醒机器人"""
        self.reachy_mini.wake_up()
    
    async def turn_off(self):
        """关闭机器人"""
        self.reachy_mini.turn_off()
    
    async def move_head(self, pose: np.ndarray, duration: float = 1.0):
        """移动头部到姿态"""
        self.reachy_mini.goto_target(head=pose, duration=duration)
    
    async def move_antennas(self, left: float, right: float, duration: float = 1.0):
        """移动天线"""
        self.reachy_mini.goto_target(antennas=[left, right], duration=duration)
    
    async def nod(self, count: int = 1, duration: float = 0.5):
        """点头"""
        import numpy as np
        from scipy.spatial.transform import Rotation as R
        
        for _ in range(count):
            # 点头
            pose_down = np.eye(4)
            pose_down[:3, :3] = R.from_euler('xyz', [15, 0, 0], degrees=True).as_matrix()
            await self.move_head(pose_down, duration=duration / 2)
            
            pose_up = np.eye(4)
            pose_up[:3, :3] = R.from_euler('xyz', [-15, 0, 0], degrees=True).as_matrix()
            await self.move_head(pose_up, duration=duration / 2)
    
    async def shake(self, count: int = 1, duration: float = 0.5):
        """摇头"""
        import numpy as np
        from scipy.spatial.transform import Rotation as R
        
        for _ in range(count):
            # 摇头
            pose_left = np.eye(4)
            pose_left[:3, :3] = R.from_euler('xyz', [0, 0, -20], degrees=True).as_matrix()
            await self.move_head(pose_left, duration=duration / 2)
            
            pose_right = np.eye(4)
            pose_right[:3, :3] = R.from_euler('xyz', [0, 0, 20], degrees=True).as_matrix()
            await self.move_head(pose_right, duration=duration / 2)
    
    async def start_speech_reactive_motion(self):
        """开始语音反应性运动(说话时的微动)"""
        self._speech_reactive = True
        self._speech_task = asyncio.create_task(self._speech_reactive_loop())
    
    async def stop_speech_reactive_motion(self):
        """停止语音反应性运动"""
        self._speech_reactive = False
        if self._speech_task:
            self._speech_task.cancel()
    
    async def _speech_reactive_loop(self):
        """语音反应性运动循环"""
        import numpy as np
        from scipy.spatial.transform import Rotation as R
        
        while self._speech_reactive:
            # 生成微小的摆动
            roll = np.sin(asyncio.get_event_loop().time() * 2) * 3
            pose = np.eye(4)
            pose[:3, :3] = R.from_euler('xyz', [0, 0, roll], degrees=True).as_matrix()
            
            await self.move_head(pose, duration=0.1)
            await asyncio.sleep(0.1)

运动队列

class MotionQueue:
    """运动队列管理器"""
    
    def __init__(self):
        self.high_priority = asyncio.Queue()
        self.medium_priority = asyncio.Queue()
        self.low_priority = asyncio.Queue()
        self.is_running = False
        self._current_motion = None
        self._task = None
    
    async def add_motion(self, motion: Motion):
        """添加运动到队列"""
        if motion.priority == MotionPriority.HIGH:
            await self.high_priority.put(motion)
        elif motion.priority == MotionPriority.MEDIUM:
            await self.medium_priority.put(motion)
        elif motion.priority == MotionPriority.LOW:
            await self.low_priority.put(motion)
    
    async def start(self):
        """开始处理运动队列"""
        self.is_running = True
        self._task = asyncio.create_task(self._process_queue())
    
    async def stop(self):
        """停止处理运动队列"""
        self.is_running = False
        if self._task:
            self._task.cancel()
    
    async def _process_queue(self):
        """处理运动队列"""
        while self.is_running:
            # 优先级:HIGH > MEDIUM > LOW
            motion = await self._get_next_motion()
            
            if motion is None:
                await asyncio.sleep(0.01)
                continue
            
            self._current_motion = motion
            await motion.execute()
            self._current_motion = None
    
    async def _get_next_motion(self) -> Optional[Motion]:
        """获取下一个运动"""
        if not self.high_priority.empty():
            return await self.high_priority.get()
        elif not self.medium_priority.empty():
            return await self.medium_priority.get()
        elif not self.low_priority.empty():
            return await self.low_priority.get()
        else:
            return None

3.4 ESPHome 模块 (esphome/)

职责

  • ESPHome 协议实现
  • 与 Home Assistant 通信
  • 音频流传输
  • 事件处理

接口

class ESPHomeServer:
    """ESPHome 协议服务器"""
    
    def __init__(self, host: str = "0.0.0.0", port: int = 6053):
        self.host = host
        self.port = port
        self._server = None
        self._is_running = False
        self._clients = []
        self._audio_callback = None
        self._event_callback = None
    
    async def start(self):
        """启动 ESPHome 服务器"""
        self._server = await asyncio.start_server(
            self._handle_client,
            self.host,
            self.port
        )
        self._is_running = True
    
    async def stop(self):
        """停止 ESPHome 服务器"""
        self._is_running = False
        
        for client in self._clients:
            client.close()
        self._clients.clear()
        
        if self._server:
            self._server.close()
            await self._server.wait_closed()
    
    def set_audio_callback(self, callback: Callable[[bytes], None]):
        """设置音频回调(接收来自 Home Assistant 的 TTS 音频)"""
        self._audio_callback = callback
    
    def set_event_callback(self, callback: Callable[[VoiceAssistantEventType, dict], None]):
        """设置事件回调(接收来自 Home Assistant 的事件)"""
        self._event_callback = callback
    
    async def send_audio(self, audio_data: bytes):
        """发送音频数据到 Home Assistant(STT 输入)"""
        for client in self._clients:
            try:
                client.write(audio_data)
                await client.drain()
            except Exception as e:
                logger.error(f"Error sending audio to client: {e}")
    
    async def send_event(self, event_type: VoiceAssistantEventType, data: dict):
        """发送事件到 Home Assistant"""
        if self._event_callback:
            self._event_callback(event_type, data)
    
    async def _handle_client(self, reader, writer):
        """处理客户端连接"""
        client_addr = writer.get_extra_info('peername')
        self._clients.append(writer)
        
        try:
            while self._is_running:
                data = await reader.read(4096)
                if not data:
                    break
                
                # 处理来自 Home Assistant 的数据
                await self._process_data(data)
        except Exception as e:
            logger.error(f"Error handling client {client_addr}: {e}")
        finally:
            self._clients.remove(writer)
            writer.close()
            await writer.wait_closed()


class VoiceSatelliteProtocol:
    """语音卫星协议处理器"""
    
    def __init__(self, state: ServerState):
        self.state = state
        self._is_streaming = False
        self._refractory_period = 2.0
        self._last_wake_word_time = 0.0
    
    async def handle_audio(self, audio_chunk: bytes):
        """处理音频块(发送到 Home Assistant)"""
        if self._is_streaming and self.state.esphome_server:
            await self.state.esphome_server.send_audio(audio_chunk)
    
    async def handle_wake_word(self):
        """处理唤醒词检测"""
        current_time = asyncio.get_event_loop().time()
        
        # 检查冷却期
        if current_time - self._last_wake_word_time < self._refractory_period:
            return
        
        self._last_wake_word_time = current_time
        
        # 发送唤醒词事件到 Home Assistant
        if self.state.esphome_server:
            await self.state.esphome_server.send_event(
                VoiceAssistantEventType.VOICE_ASSISTANT_WAKE_WORD_END,
                {"wake_word": "detected"}
            )
        
        # 开始流式传输
        self._is_streaming = True
    
    async def stop_streaming(self):
        """停止流式传输"""
        self._is_streaming = False


class VoiceAssistantEventType(Enum):
    """语音助手事件类型"""
    VOICE_ASSISTANT_START = 0
    VOICE_ASSISTANT_END = 1
    VOICE_ASSISTANT_ERROR = 2
    VOICE_ASSISTANT_STT_START = 3
    VOICE_ASSISTANT_STT_END = 4
    VOICE_ASSISTANT_TTS_START = 5
    VOICE_ASSISTANT_TTS_END = 6
    VOICE_ASSISTANT_WAKE_WORD_START = 9
    VOICE_ASSISTANT_WAKE_WORD_END = 10

3.5 配置模块 (config/)

职责

  • 配置文件管理
  • 用户偏好存储
  • 运行时配置

接口

class ConfigManager:
    """配置管理器"""
    
    def __init__(self, config_path: str = "config.json"):
        self.config_path = Path(config_path)
        self.config = self.load_config()
    
    def load_config(self) -> dict:
        """加载配置文件"""
        if self.config_path.exists():
            with open(self.config_path, 'r', encoding='utf-8') as f:
                return json.load(f)
        return self.get_default_config()
    
    def save_config(self):
        """保存配置文件"""
        with open(self.config_path, 'w', encoding='utf-8') as f:
            json.dump(self.config, f, indent=2, ensure_ascii=False)
    
    def get_default_config(self) -> dict:
        """获取默认配置"""
        return {
            "audio": {
                "input_device": None,
                "output_device": None,
                "sample_rate": 16000,
                "channels": 1,
                "block_size": 1024
            },
            "voice": {
                "wake_word": "okay_nabu",
                "wake_word_dirs": ["wakewords"]
            },
            "motion": {
                "enabled": True,
                "speech_reactive": True
            },
            "esphome": {
                "host": "0.0.0.0",
                "port": 6053,
                "name": "Reachy Mini"
            },
            "robot": {
                "host": "localhost",
                "wireless": False
            }
        }
    
    def get(self, key: str, default: Any = None) -> Any:
        """获取配置值(支持嵌套键)"""
        keys = key.split('.')
        value = self.config
        for k in keys:
            if isinstance(value, dict):
                value = value.get(k, default)
            else:
                return default
        return value
    
    def set(self, key: str, value: Any):
        """设置配置值(支持嵌套键)"""
        keys = key.split('.')
        config = self.config
        for k in keys[:-1]:
            config = config.setdefault(k, {})
        config[keys[-1]] = value
        self.save_config()

3.6 状态管理 (state.py)

职责

  • 全局状态管理
  • 组件生命周期管理

接口

@dataclass
class ServerState:
    """全局服务器状态"""
    name: str
    
    # 配置
    config: Optional[ConfigManager] = None
    
    # 音频
    microphone: Optional[MicrophoneArray] = None
    speaker: Optional[Speaker] = None
    audio_queue: Queue = field(default_factory=Queue)
    
    # 语音
    wake_word_detector: Optional[WakeWordDetector] = None
    active_wake_words: list = field(default_factory=list)
    
    # 运动
    motion_controller: Optional[MotionController] = None
    motion_queue: Optional[MotionQueue] = None
    
    # ESPHome
    esphome_server: Optional[ESPHomeServer] = None
    voice_satellite: Optional[VoiceSatelliteProtocol] = None
    
    # 状态
    is_running: bool = False
    is_streaming: bool = False
    
    # 回调
    on_wake_word: Optional[callable] = None
    on_stt_result: Optional[callable] = None
    on_tts_audio: Optional[callable] = None
    
    async def cleanup(self):
        """清理资源"""
        if self.microphone:
            await self.microphone.stop_recording()
        
        if self.motion_controller:
            await self.motion_controller.stop_speech_reactive_motion()
            await self.motion_controller.turn_off()
            await self.motion_controller.disconnect()
        
        if self.motion_queue:
            await self.motion_queue.stop()
        
        if self.esphome_server:
            await self.esphome_server.stop()

3.7 主应用 (app.py)

职责

  • 应用生命周期管理
  • 组件初始化和协调
  • 事件处理

接口

class ReachyMiniVoiceApp:
    """主应用类"""
    
    def __init__(
        self,
        name: str,
        config: ConfigManager,
        audio_input_device: Optional[str] = None,
        audio_output_device: Optional[str] = None,
        wake_model: Optional[str] = None,
        wake_word_dirs: Optional[list] = None,
        host: str = "0.0.0.0",
        port: int = 6053,
        robot_host: str = "localhost",
        wireless: bool = False,
        gradio: bool = False
    ):
        self.name = name
        self.config = config
        self.audio_input_device = audio_input_device
        self.audio_output_device = audio_output_device
        self.wake_model = wake_model
        self.wake_word_dirs = wake_word_dirs
        self.host = host
        self.port = port
        self.robot_host = robot_host
        self.wireless = wireless
        self.gradio = gradio
        
        self.state = ServerState(name)
        self._is_running = False
    
    async def start(self):
        """启动应用"""
        # 初始化状态
        await self.state.initialize(self.config)
        
        # 设置回调
        self._setup_callbacks()
        
        # 启动音频录制
        await self.state.microphone.start_recording(
            self.audio_input_device,
            self._audio_callback,
            sample_rate=self.config.get("audio.sample_rate", 16000),
            channels=self.config.get("audio.channels", 1),
            block_size=self.config.get("audio.block_size", 1024)
        )
        
        # 启动 ESPHome 服务器
        await self.state.esphome_server.start()
        
        # 注册 mDNS 发现
        await self._register_mdns()
        
        self._is_running = True
        
        # 保持运行
        while self._is_running:
            await asyncio.sleep(1)
    
    async def stop(self):
        """停止应用"""
        self._is_running = False
        await self.state.cleanup()
    
    def _setup_callbacks(self):
        """设置回调"""
        self.state.audio_processor.add_wake_word_callback(self._on_audio_chunk)
        self.state.audio_processor.add_stream_callback(self._on_stream_audio)
    
    async def _audio_callback(self, audio_chunk: bytes):
        """音频录制回调"""
        await self.state.audio_processor.process_audio_chunk(audio_chunk)
    
    async def _on_audio_chunk(self, audio_chunk: bytes):
        """唤醒词检测回调"""
        if self.state.wake_word_detector:
            detected = await self.state.wake_word_detector.process_audio(audio_chunk)
            if detected:
                await self._on_wake_word_detected()
    
    async def _on_stream_audio(self, audio_chunk: bytes):
        """音频流传输回调(发送到 Home Assistant)"""
        if self.state.voice_satellite:
            await self.state.voice_satellite.handle_audio(audio_chunk)
    
    async def _on_wake_word_detected(self):
        """唤醒词检测回调"""
        # 点头确认
        if self.state.motion_controller:
            await self.state.motion_controller.nod(count=1, duration=0.3)
        
        # 触发语音卫星
        if self.state.voice_satellite:
            await self.state.voice_satellite.handle_wake_word()
    
    async def handle_tts_audio(self, audio_data: bytes):
        """处理来自 Home Assistant 的 TTS 音频"""
        # 播放音频
        if self.state.speaker:
            await self.state.speaker.play_audio(
                audio_data,
                self.audio_output_device,
                sample_rate=self.config.get("audio.sample_rate", 16000),
                channels=self.config.get("audio.channels", 1)
            )
    
    async def handle_stt_result(self, text: str):
        """处理来自 Home Assistant 的 STT 结果"""
        # 处理文本(添加自定义逻辑)
        pass
    
    async def _register_mdns(self):
        """注册 mDNS 服务发现"""
        from zeroconf import ServiceInfo, Zeroconf
        
        info = ServiceInfo(
            "_esphomelib._tcp.local.",
            f"{self.name}._esphomelib._tcp.local.",
            addresses=[],
            port=self.port,
            properties={
                "version": "1.0",
                "name": self.name,
                "platform": "reachy_mini"
            }
        )
        
        zeroconf = Zeroconf()
        zeroconf.register_service(info)

4. 数据流

4.1 音频输入流程

麦克风阵列 (4 麦克风)
    ↓ (16KHz PCM)
音频块 (1024 samples)
    ↓
┌─────────────────┐
│  唤醒词检测     │
│  (micro/oww)    │
└────────┬────────┘
         │
         ↓ (检测到唤醒词)
    触发唤醒事件
         │
         ↓
┌─────────────────┐
│  开始流式传输   │
│  (ESPHome)      │
└────────┬────────┘
         │
         ↓
┌─────────────────┐
│  发送到 HA      │
│  (STT 输入)     │
└─────────────────┘

4.2 音频输出流程

Home Assistant (TTS 输出)
    ↓
┌─────────────────┐
│  ESPHome 服务器  │
│  (接收音频)     │
└────────┬────────┘
         │
         ↓
┌─────────────────┐
│  播放音频       │
│  (扬声器)       │
└─────────────────┘

4.3 运动控制流程

唤醒词检测 / STT 结果 / TTS 事件
    ↓
┌─────────────────┐
│  运动队列管理   │
└────────┬────────┘
         │
         ↓
┌─────────────────┐
│  高优先级运动   │
│  (唤醒词确认)   │
└────────┬────────┘
         │
         ↓
┌─────────────────┐
│  中优先级运动   │
│  (用户命令)     │
└────────┬────────┘
         │
         ↓
┌─────────────────┐
│  低优先级运动   │
│  (语音反应)     │
└────────┬────────┘
         │
         ↓
    执行运动
         │
         ↓
┌─────────────────┐
│  Reachy Mini    │
│  SDK            │
└─────────────────┘

5. 依赖项

5.1 核心依赖

dependencies = [
    # Reachy Mini SDK
    "reachy-mini",
    
    # 音频处理
    "sounddevice>=0.4.6",
    "numpy>=1.24.0",
    
    # 语音处理
    "pymicro-wakeword>=2.0.0,<3.0.0",
    "pyopen-wakeword>=1.0.0,<2.0.0",
    
    # ESPHome
    "aioesphomeapi>=42.0.0",
    "zeroconf>=0.100.0",
    
    # 运动控制
    "scipy>=1.10.0",
    
    # Web UI (可选)
    "gradio>=4.0.0",
]

5.2 可选依赖

[project.optional-dependencies]
wireless = [
    "reachy-mini[wireless]",
]

vision = [
    "pollen-vision",
    "opencv-python>=4.8.0",
    "mediapipe>=0.10.0",
]

dev = [
    "pytest>=7.4.0",
    "pytest-asyncio>=0.21.0",
    "ruff>=0.1.0",
]

6. 性能优化

6.1 音频处理

  • 使用异步 I/O 减少阻塞
  • 音频块大小优化(1024 samples)
  • 使用 numpy 加速数值计算
  • 预分配缓冲区减少内存分配

6.2 运动控制

  • 运动队列优先级管理
  • 运动平滑插值
  • 批量运动命令合并
  • 延迟预算管理

6.3 网络

  • ESPHome 连接池
  • 消息批量发送
  • 压缩音频数据
  • 心跳检测

7. 安全考虑

  1. 音频隐私

    • 不存储用户音频(除非明确授权)
    • 本地处理优先
    • 加密传输
  2. 运动安全

    • 角度限制
    • 速度限制
    • 碰撞检测
    • 紧急停止
  3. 网络安全

    • ESPHome 认证
    • TLS 加密
    • 防火墙配置
    • 访问控制

8. 部署

8.1 安装步骤

# 创建虚拟环境
python -m venv .venv
source .venv/bin/activate

# 安装基础依赖
pip install -e .

# 安装可选依赖
pip install -e .[wireless,vision,dev]

8.2 运行

# 启动应用
python -m reachy_mini_ha_voice

# 启动 Web UI
python -m reachy_mini_ha_voice --gradio

# 启动无线版本
python -m reachy_mini_ha_voice --wireless

8.3 Home Assistant 集成

  1. 在 Home Assistant 中添加 ESPHome 集成
  2. 输入 Reachy Mini 的 IP 地址和端口(6053)
  3. 配置 STT/TTS 服务
  4. 创建自动化和脚本