Reachy Mini Home Assistant Voice Assistant - 架构设计
1. 系统架构概览
┌─────────────────────────────────────────────────────────────────┐
│ 应用层 (Application Layer) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Home │ │ Web UI │ │ Console │ │
│ │ Assistant │ │ (Gradio) │ │ Interface │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 业务逻辑层 (Business Logic) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Voice │ │ Motion │ │ State │ │
│ │ Manager │ │ Controller │ │ Manager │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ ESPHome │ │ Event │ │
│ │ Handler │ │ Dispatcher │ │
│ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 服务层 (Services) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Wake Word │ │ Audio │ │ Motion │ │
│ │ Detector │ │ Processor │ │ Queue │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ ESPHome Protocol (Audio Streaming to/from HA) │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 硬件抽象层 (HAL) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Audio │ │ Motion │ │ Camera │ │
│ │ Adapter │ │ Adapter │ │ Adapter │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Reachy Mini │ │ ESPHome │ │
│ │ SDK Wrapper │ │ Protocol │ │
│ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ Reachy Mini Hardware + Home Assistant │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Microphone │ │ Head Motors │ │ Camera │ │
│ │ Array (4) │ │ (6 DOF) │ │ (Wide) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Speaker │ │ Antennas │ │
│ │ (5W) │ │ (2) │ │
│ └──────────────┘ └──────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Home Assistant (STT/TTS Processing) │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
2. 核心设计原则
2.1 基于 linux-voice-assistant
本项目基于 OHF-Voice/linux-voice-assistant 的架构设计,主要特点:
- STT/TTS 由 Home Assistant 处理:音频数据通过 ESPHome 协议传输到 Home Assistant,由 HA 进行语音识别和合成
- 本地唤醒词检测:使用 microWakeWord 或 openWakeWord 进行离线唤醒词检测
- ESPHome 协议通信:通过 ESPHome 协议与 Home Assistant 通信
- 运动控制增强:集成 Reachy Mini 的运动控制能力
2.2 架构特点
- 模块化设计:音频、语音、运动、ESPHome 各模块独立
- 异步处理:使用 asyncio 实现高性能异步处理
- 状态管理:集中的状态管理(ServerState)
- 事件驱动:基于事件的通信机制
3. 模块设计
3.1 音频模块 (audio/)
职责:
- 音频设备管理(麦克风、扬声器)
- 音频录制和播放
- 音频格式转换(16KHz 单声道 PCM)
接口:
class AudioAdapter(ABC):
"""音频设备适配器抽象基类"""
@abstractmethod
async def list_input_devices(self) -> List[AudioDevice]:
"""列出可用的音频输入设备"""
pass
@abstractmethod
async def start_recording(
self,
device_id: str,
callback: Callable[[bytes], None],
sample_rate: int = 16000,
channels: int = 1,
block_size: int = 1024
):
"""开始录制音频"""
pass
@abstractmethod
async def play_audio(
self,
audio_data: bytes,
device_id: str,
sample_rate: int = 16000,
channels: int = 1
):
"""播放音频"""
pass
class MicrophoneArray(AudioAdapter):
"""麦克风阵列适配器(Reachy Mini 的 4 麦克风阵列)"""
def __init__(self, sample_rate: int = 16000, channels: int = 1):
self.sample_rate = sample_rate
self.channels = channels
self._stream = None
self._is_recording = False
self._callback = None
self._loop = None
class Speaker(AudioAdapter):
"""扬声器适配器(Reachy Mini 的 5W 扬声器)"""
def __init__(self, sample_rate: int = 16000):
self.sample_rate = sample_rate
音频处理器:
class AudioProcessor:
"""处理音频块,用于唤醒词检测和流式传输"""
def __init__(
self,
sample_rate: int = 16000,
channels: int = 1,
block_size: int = 1024
):
self.sample_rate = sample_rate
self.channels = channels
self.block_size = block_size
self._wake_word_callbacks: list[Callable[[bytes], None]] = []
self._stream_callbacks: list[Callable[[bytes], None]] = []
def add_wake_word_callback(self, callback: Callable[[bytes], None]):
"""添加唤醒词检测回调"""
self._wake_word_callbacks.append(callback)
def add_stream_callback(self, callback: Callable[[bytes], None]):
"""添加音频流回调(发送到 Home Assistant)"""
self._stream_callbacks.append(callback)
async def process_audio_chunk(self, audio_chunk: bytes):
"""处理音频块"""
# 调用唤醒词检测回调
for callback in self._wake_word_callbacks:
callback(audio_chunk)
# 调用流式传输回调
for callback in self._stream_callbacks:
callback(audio_chunk)
3.2 语音模块 (voice/)
职责:
- 唤醒词检测(本地离线)
- STT/TTS 由 Home Assistant 处理(不在此模块)
接口:
class WakeWordDetector(ABC):
"""唤醒词检测器抽象基类"""
@abstractmethod
async def load_model(self, model_path: str):
"""加载唤醒词模型"""
pass
@abstractmethod
async def process_audio(self, audio_chunk: bytes) -> bool:
"""处理音频块,返回是否检测到唤醒词"""
pass
class MicroWakeWordDetector(WakeWordDetector):
"""microWakeWord 检测器(轻量级,适合 Raspberry Pi)"""
def __init__(self, model_path: str):
self.model = None
self.features = None
self.model_path = Path(model_path)
self._confidence = 0.0
self._loaded = False
async def load_model(self, model_path: str):
"""加载 microWakeWord 模型"""
from pymicro_wakeword import MicroWakeWord, MicroWakeWordFeatures
self.features = MicroWakeWordFeatures()
self.model = MicroWakeWord.from_config(model_path)
self._loaded = True
async def process_audio(self, audio_chunk: bytes) -> bool:
"""处理音频块"""
import numpy as np
audio_array = np.frombuffer(audio_chunk, dtype=np.int16).astype(np.float32) / 32768.0
features = self.features.process_streaming(audio_array)
for feature in features:
score = self.model.process_streaming(feature)
if score is not None and score >= 0.5:
return True
return False
class OpenWakeWordDetector(WakeWordDetector):
"""openWakeWord 检测器(更多唤醒词选择)"""
def __init__(self, model_path: str):
self.model = None
self.features = None
self.model_path = Path(model_path)
self._confidence = 0.0
self._loaded = False
async def load_model(self, model_path: str):
"""加载 openWakeWord 模型"""
from pyopen_wakeword import OpenWakeWord, OpenWakeWordFeatures
self.features = OpenWakeWordFeatures.from_builtin()
self.model = OpenWakeWord(model_path)
self._loaded = True
async def process_audio(self, audio_chunk: bytes) -> bool:
"""处理音频块"""
import numpy as np
audio_array = np.frombuffer(audio_chunk, dtype=np.int16).astype(np.float32) / 32768.0
features = self.features.process_streaming(audio_array)
for feature in features:
scores = self.model.process_streaming(feature)
for score in scores:
if score >= 0.5:
return True
return False
3.3 运动模块 (motion/)
职责:
- 头部运动控制(6 自由度)
- 天线控制(2 个天线)
- 运动队列管理
- 语音反应性运动
接口:
class MotionController(ABC):
"""运动控制器抽象基类"""
@abstractmethod
async def connect(self, host: str = 'localhost'):
"""连接到机器人"""
pass
@abstractmethod
async def wake_up(self):
"""唤醒机器人"""
pass
@abstractmethod
async def turn_off(self):
"""关闭机器人"""
pass
@abstractmethod
async def move_head(self, pose: np.ndarray, duration: float = 1.0):
"""移动头部到姿态"""
pass
@abstractmethod
async def move_antennas(self, left: float, right: float, duration: float = 1.0):
"""移动天线"""
pass
@abstractmethod
async def nod(self, count: int = 1, duration: float = 0.5):
"""点头"""
pass
@abstractmethod
async def shake(self, count: int = 1, duration: float = 0.5):
"""摇头"""
pass
@abstractmethod
async def start_speech_reactive_motion(self):
"""开始语音反应性运动"""
pass
@abstractmethod
async def stop_speech_reactive_motion(self):
"""停止语音反应性运动"""
pass
class ReachyMiniMotionController(MotionController):
"""Reachy Mini 运动控制器"""
def __init__(self):
self.reachy_mini = None
self._connected = False
self._speech_reactive = False
self._speech_task = None
async def connect(self, host: str = 'localhost'):
"""连接到 Reachy Mini"""
from reachy_mini import ReachyMini
self.reachy_mini = ReachyMini(host=host)
self._connected = True
async def wake_up(self):
"""唤醒机器人"""
self.reachy_mini.wake_up()
async def turn_off(self):
"""关闭机器人"""
self.reachy_mini.turn_off()
async def move_head(self, pose: np.ndarray, duration: float = 1.0):
"""移动头部到姿态"""
self.reachy_mini.goto_target(head=pose, duration=duration)
async def move_antennas(self, left: float, right: float, duration: float = 1.0):
"""移动天线"""
self.reachy_mini.goto_target(antennas=[left, right], duration=duration)
async def nod(self, count: int = 1, duration: float = 0.5):
"""点头"""
import numpy as np
from scipy.spatial.transform import Rotation as R
for _ in range(count):
# 点头
pose_down = np.eye(4)
pose_down[:3, :3] = R.from_euler('xyz', [15, 0, 0], degrees=True).as_matrix()
await self.move_head(pose_down, duration=duration / 2)
pose_up = np.eye(4)
pose_up[:3, :3] = R.from_euler('xyz', [-15, 0, 0], degrees=True).as_matrix()
await self.move_head(pose_up, duration=duration / 2)
async def shake(self, count: int = 1, duration: float = 0.5):
"""摇头"""
import numpy as np
from scipy.spatial.transform import Rotation as R
for _ in range(count):
# 摇头
pose_left = np.eye(4)
pose_left[:3, :3] = R.from_euler('xyz', [0, 0, -20], degrees=True).as_matrix()
await self.move_head(pose_left, duration=duration / 2)
pose_right = np.eye(4)
pose_right[:3, :3] = R.from_euler('xyz', [0, 0, 20], degrees=True).as_matrix()
await self.move_head(pose_right, duration=duration / 2)
async def start_speech_reactive_motion(self):
"""开始语音反应性运动(说话时的微动)"""
self._speech_reactive = True
self._speech_task = asyncio.create_task(self._speech_reactive_loop())
async def stop_speech_reactive_motion(self):
"""停止语音反应性运动"""
self._speech_reactive = False
if self._speech_task:
self._speech_task.cancel()
async def _speech_reactive_loop(self):
"""语音反应性运动循环"""
import numpy as np
from scipy.spatial.transform import Rotation as R
while self._speech_reactive:
# 生成微小的摆动
roll = np.sin(asyncio.get_event_loop().time() * 2) * 3
pose = np.eye(4)
pose[:3, :3] = R.from_euler('xyz', [0, 0, roll], degrees=True).as_matrix()
await self.move_head(pose, duration=0.1)
await asyncio.sleep(0.1)
运动队列:
class MotionQueue:
"""运动队列管理器"""
def __init__(self):
self.high_priority = asyncio.Queue()
self.medium_priority = asyncio.Queue()
self.low_priority = asyncio.Queue()
self.is_running = False
self._current_motion = None
self._task = None
async def add_motion(self, motion: Motion):
"""添加运动到队列"""
if motion.priority == MotionPriority.HIGH:
await self.high_priority.put(motion)
elif motion.priority == MotionPriority.MEDIUM:
await self.medium_priority.put(motion)
elif motion.priority == MotionPriority.LOW:
await self.low_priority.put(motion)
async def start(self):
"""开始处理运动队列"""
self.is_running = True
self._task = asyncio.create_task(self._process_queue())
async def stop(self):
"""停止处理运动队列"""
self.is_running = False
if self._task:
self._task.cancel()
async def _process_queue(self):
"""处理运动队列"""
while self.is_running:
# 优先级:HIGH > MEDIUM > LOW
motion = await self._get_next_motion()
if motion is None:
await asyncio.sleep(0.01)
continue
self._current_motion = motion
await motion.execute()
self._current_motion = None
async def _get_next_motion(self) -> Optional[Motion]:
"""获取下一个运动"""
if not self.high_priority.empty():
return await self.high_priority.get()
elif not self.medium_priority.empty():
return await self.medium_priority.get()
elif not self.low_priority.empty():
return await self.low_priority.get()
else:
return None
3.4 ESPHome 模块 (esphome/)
职责:
- ESPHome 协议实现
- 与 Home Assistant 通信
- 音频流传输
- 事件处理
接口:
class ESPHomeServer:
"""ESPHome 协议服务器"""
def __init__(self, host: str = "0.0.0.0", port: int = 6053):
self.host = host
self.port = port
self._server = None
self._is_running = False
self._clients = []
self._audio_callback = None
self._event_callback = None
async def start(self):
"""启动 ESPHome 服务器"""
self._server = await asyncio.start_server(
self._handle_client,
self.host,
self.port
)
self._is_running = True
async def stop(self):
"""停止 ESPHome 服务器"""
self._is_running = False
for client in self._clients:
client.close()
self._clients.clear()
if self._server:
self._server.close()
await self._server.wait_closed()
def set_audio_callback(self, callback: Callable[[bytes], None]):
"""设置音频回调(接收来自 Home Assistant 的 TTS 音频)"""
self._audio_callback = callback
def set_event_callback(self, callback: Callable[[VoiceAssistantEventType, dict], None]):
"""设置事件回调(接收来自 Home Assistant 的事件)"""
self._event_callback = callback
async def send_audio(self, audio_data: bytes):
"""发送音频数据到 Home Assistant(STT 输入)"""
for client in self._clients:
try:
client.write(audio_data)
await client.drain()
except Exception as e:
logger.error(f"Error sending audio to client: {e}")
async def send_event(self, event_type: VoiceAssistantEventType, data: dict):
"""发送事件到 Home Assistant"""
if self._event_callback:
self._event_callback(event_type, data)
async def _handle_client(self, reader, writer):
"""处理客户端连接"""
client_addr = writer.get_extra_info('peername')
self._clients.append(writer)
try:
while self._is_running:
data = await reader.read(4096)
if not data:
break
# 处理来自 Home Assistant 的数据
await self._process_data(data)
except Exception as e:
logger.error(f"Error handling client {client_addr}: {e}")
finally:
self._clients.remove(writer)
writer.close()
await writer.wait_closed()
class VoiceSatelliteProtocol:
"""语音卫星协议处理器"""
def __init__(self, state: ServerState):
self.state = state
self._is_streaming = False
self._refractory_period = 2.0
self._last_wake_word_time = 0.0
async def handle_audio(self, audio_chunk: bytes):
"""处理音频块(发送到 Home Assistant)"""
if self._is_streaming and self.state.esphome_server:
await self.state.esphome_server.send_audio(audio_chunk)
async def handle_wake_word(self):
"""处理唤醒词检测"""
current_time = asyncio.get_event_loop().time()
# 检查冷却期
if current_time - self._last_wake_word_time < self._refractory_period:
return
self._last_wake_word_time = current_time
# 发送唤醒词事件到 Home Assistant
if self.state.esphome_server:
await self.state.esphome_server.send_event(
VoiceAssistantEventType.VOICE_ASSISTANT_WAKE_WORD_END,
{"wake_word": "detected"}
)
# 开始流式传输
self._is_streaming = True
async def stop_streaming(self):
"""停止流式传输"""
self._is_streaming = False
class VoiceAssistantEventType(Enum):
"""语音助手事件类型"""
VOICE_ASSISTANT_START = 0
VOICE_ASSISTANT_END = 1
VOICE_ASSISTANT_ERROR = 2
VOICE_ASSISTANT_STT_START = 3
VOICE_ASSISTANT_STT_END = 4
VOICE_ASSISTANT_TTS_START = 5
VOICE_ASSISTANT_TTS_END = 6
VOICE_ASSISTANT_WAKE_WORD_START = 9
VOICE_ASSISTANT_WAKE_WORD_END = 10
3.5 配置模块 (config/)
职责:
- 配置文件管理
- 用户偏好存储
- 运行时配置
接口:
class ConfigManager:
"""配置管理器"""
def __init__(self, config_path: str = "config.json"):
self.config_path = Path(config_path)
self.config = self.load_config()
def load_config(self) -> dict:
"""加载配置文件"""
if self.config_path.exists():
with open(self.config_path, 'r', encoding='utf-8') as f:
return json.load(f)
return self.get_default_config()
def save_config(self):
"""保存配置文件"""
with open(self.config_path, 'w', encoding='utf-8') as f:
json.dump(self.config, f, indent=2, ensure_ascii=False)
def get_default_config(self) -> dict:
"""获取默认配置"""
return {
"audio": {
"input_device": None,
"output_device": None,
"sample_rate": 16000,
"channels": 1,
"block_size": 1024
},
"voice": {
"wake_word": "okay_nabu",
"wake_word_dirs": ["wakewords"]
},
"motion": {
"enabled": True,
"speech_reactive": True
},
"esphome": {
"host": "0.0.0.0",
"port": 6053,
"name": "Reachy Mini"
},
"robot": {
"host": "localhost",
"wireless": False
}
}
def get(self, key: str, default: Any = None) -> Any:
"""获取配置值(支持嵌套键)"""
keys = key.split('.')
value = self.config
for k in keys:
if isinstance(value, dict):
value = value.get(k, default)
else:
return default
return value
def set(self, key: str, value: Any):
"""设置配置值(支持嵌套键)"""
keys = key.split('.')
config = self.config
for k in keys[:-1]:
config = config.setdefault(k, {})
config[keys[-1]] = value
self.save_config()
3.6 状态管理 (state.py)
职责:
- 全局状态管理
- 组件生命周期管理
接口:
@dataclass
class ServerState:
"""全局服务器状态"""
name: str
# 配置
config: Optional[ConfigManager] = None
# 音频
microphone: Optional[MicrophoneArray] = None
speaker: Optional[Speaker] = None
audio_queue: Queue = field(default_factory=Queue)
# 语音
wake_word_detector: Optional[WakeWordDetector] = None
active_wake_words: list = field(default_factory=list)
# 运动
motion_controller: Optional[MotionController] = None
motion_queue: Optional[MotionQueue] = None
# ESPHome
esphome_server: Optional[ESPHomeServer] = None
voice_satellite: Optional[VoiceSatelliteProtocol] = None
# 状态
is_running: bool = False
is_streaming: bool = False
# 回调
on_wake_word: Optional[callable] = None
on_stt_result: Optional[callable] = None
on_tts_audio: Optional[callable] = None
async def cleanup(self):
"""清理资源"""
if self.microphone:
await self.microphone.stop_recording()
if self.motion_controller:
await self.motion_controller.stop_speech_reactive_motion()
await self.motion_controller.turn_off()
await self.motion_controller.disconnect()
if self.motion_queue:
await self.motion_queue.stop()
if self.esphome_server:
await self.esphome_server.stop()
3.7 主应用 (app.py)
职责:
- 应用生命周期管理
- 组件初始化和协调
- 事件处理
接口:
class ReachyMiniVoiceApp:
"""主应用类"""
def __init__(
self,
name: str,
config: ConfigManager,
audio_input_device: Optional[str] = None,
audio_output_device: Optional[str] = None,
wake_model: Optional[str] = None,
wake_word_dirs: Optional[list] = None,
host: str = "0.0.0.0",
port: int = 6053,
robot_host: str = "localhost",
wireless: bool = False,
gradio: bool = False
):
self.name = name
self.config = config
self.audio_input_device = audio_input_device
self.audio_output_device = audio_output_device
self.wake_model = wake_model
self.wake_word_dirs = wake_word_dirs
self.host = host
self.port = port
self.robot_host = robot_host
self.wireless = wireless
self.gradio = gradio
self.state = ServerState(name)
self._is_running = False
async def start(self):
"""启动应用"""
# 初始化状态
await self.state.initialize(self.config)
# 设置回调
self._setup_callbacks()
# 启动音频录制
await self.state.microphone.start_recording(
self.audio_input_device,
self._audio_callback,
sample_rate=self.config.get("audio.sample_rate", 16000),
channels=self.config.get("audio.channels", 1),
block_size=self.config.get("audio.block_size", 1024)
)
# 启动 ESPHome 服务器
await self.state.esphome_server.start()
# 注册 mDNS 发现
await self._register_mdns()
self._is_running = True
# 保持运行
while self._is_running:
await asyncio.sleep(1)
async def stop(self):
"""停止应用"""
self._is_running = False
await self.state.cleanup()
def _setup_callbacks(self):
"""设置回调"""
self.state.audio_processor.add_wake_word_callback(self._on_audio_chunk)
self.state.audio_processor.add_stream_callback(self._on_stream_audio)
async def _audio_callback(self, audio_chunk: bytes):
"""音频录制回调"""
await self.state.audio_processor.process_audio_chunk(audio_chunk)
async def _on_audio_chunk(self, audio_chunk: bytes):
"""唤醒词检测回调"""
if self.state.wake_word_detector:
detected = await self.state.wake_word_detector.process_audio(audio_chunk)
if detected:
await self._on_wake_word_detected()
async def _on_stream_audio(self, audio_chunk: bytes):
"""音频流传输回调(发送到 Home Assistant)"""
if self.state.voice_satellite:
await self.state.voice_satellite.handle_audio(audio_chunk)
async def _on_wake_word_detected(self):
"""唤醒词检测回调"""
# 点头确认
if self.state.motion_controller:
await self.state.motion_controller.nod(count=1, duration=0.3)
# 触发语音卫星
if self.state.voice_satellite:
await self.state.voice_satellite.handle_wake_word()
async def handle_tts_audio(self, audio_data: bytes):
"""处理来自 Home Assistant 的 TTS 音频"""
# 播放音频
if self.state.speaker:
await self.state.speaker.play_audio(
audio_data,
self.audio_output_device,
sample_rate=self.config.get("audio.sample_rate", 16000),
channels=self.config.get("audio.channels", 1)
)
async def handle_stt_result(self, text: str):
"""处理来自 Home Assistant 的 STT 结果"""
# 处理文本(添加自定义逻辑)
pass
async def _register_mdns(self):
"""注册 mDNS 服务发现"""
from zeroconf import ServiceInfo, Zeroconf
info = ServiceInfo(
"_esphomelib._tcp.local.",
f"{self.name}._esphomelib._tcp.local.",
addresses=[],
port=self.port,
properties={
"version": "1.0",
"name": self.name,
"platform": "reachy_mini"
}
)
zeroconf = Zeroconf()
zeroconf.register_service(info)
4. 数据流
4.1 音频输入流程
麦克风阵列 (4 麦克风)
↓ (16KHz PCM)
音频块 (1024 samples)
↓
┌─────────────────┐
│ 唤醒词检测 │
│ (micro/oww) │
└────────┬────────┘
│
↓ (检测到唤醒词)
触发唤醒事件
│
↓
┌─────────────────┐
│ 开始流式传输 │
│ (ESPHome) │
└────────┬────────┘
│
↓
┌─────────────────┐
│ 发送到 HA │
│ (STT 输入) │
└─────────────────┘
4.2 音频输出流程
Home Assistant (TTS 输出)
↓
┌─────────────────┐
│ ESPHome 服务器 │
│ (接收音频) │
└────────┬────────┘
│
↓
┌─────────────────┐
│ 播放音频 │
│ (扬声器) │
└─────────────────┘
4.3 运动控制流程
唤醒词检测 / STT 结果 / TTS 事件
↓
┌─────────────────┐
│ 运动队列管理 │
└────────┬────────┘
│
↓
┌─────────────────┐
│ 高优先级运动 │
│ (唤醒词确认) │
└────────┬────────┘
│
↓
┌─────────────────┐
│ 中优先级运动 │
│ (用户命令) │
└────────┬────────┘
│
↓
┌─────────────────┐
│ 低优先级运动 │
│ (语音反应) │
└────────┬────────┘
│
↓
执行运动
│
↓
┌─────────────────┐
│ Reachy Mini │
│ SDK │
└─────────────────┘
5. 依赖项
5.1 核心依赖
dependencies = [
# Reachy Mini SDK
"reachy-mini",
# 音频处理
"sounddevice>=0.4.6",
"numpy>=1.24.0",
# 语音处理
"pymicro-wakeword>=2.0.0,<3.0.0",
"pyopen-wakeword>=1.0.0,<2.0.0",
# ESPHome
"aioesphomeapi>=42.0.0",
"zeroconf>=0.100.0",
# 运动控制
"scipy>=1.10.0",
# Web UI (可选)
"gradio>=4.0.0",
]
5.2 可选依赖
[project.optional-dependencies]
wireless = [
"reachy-mini[wireless]",
]
vision = [
"pollen-vision",
"opencv-python>=4.8.0",
"mediapipe>=0.10.0",
]
dev = [
"pytest>=7.4.0",
"pytest-asyncio>=0.21.0",
"ruff>=0.1.0",
]
6. 性能优化
6.1 音频处理
- 使用异步 I/O 减少阻塞
- 音频块大小优化(1024 samples)
- 使用 numpy 加速数值计算
- 预分配缓冲区减少内存分配
6.2 运动控制
- 运动队列优先级管理
- 运动平滑插值
- 批量运动命令合并
- 延迟预算管理
6.3 网络
- ESPHome 连接池
- 消息批量发送
- 压缩音频数据
- 心跳检测
7. 安全考虑
音频隐私:
- 不存储用户音频(除非明确授权)
- 本地处理优先
- 加密传输
运动安全:
- 角度限制
- 速度限制
- 碰撞检测
- 紧急停止
网络安全:
- ESPHome 认证
- TLS 加密
- 防火墙配置
- 访问控制
8. 部署
8.1 安装步骤
# 创建虚拟环境
python -m venv .venv
source .venv/bin/activate
# 安装基础依赖
pip install -e .
# 安装可选依赖
pip install -e .[wireless,vision,dev]
8.2 运行
# 启动应用
python -m reachy_mini_ha_voice
# 启动 Web UI
python -m reachy_mini_ha_voice --gradio
# 启动无线版本
python -m reachy_mini_ha_voice --wireless
8.3 Home Assistant 集成
- 在 Home Assistant 中添加 ESPHome 集成
- 输入 Reachy Mini 的 IP 地址和端口(6053)
- 配置 STT/TTS 服务
- 创建自动化和脚本