File size: 7,415 Bytes
b8cfa60 7f421c9 b8cfa60 7f421c9 b8cfa60 7f421c9 f35a5cc 7f421c9 3115532 b8cfa60 b0aeccc f0dc6f2 f35a5cc b0aeccc b8cfa60 7f421c9 b8cfa60 60be3de b8cfa60 7f421c9 b8cfa60 f0dc6f2 f35a5cc f0dc6f2 f35a5cc f0dc6f2 f35a5cc f0dc6f2 5238074 f0dc6f2 5238074 f0dc6f2 f35a5cc b0aeccc b8cfa60 b0aeccc b8cfa60 58f00d3 b8cfa60 c99f503 b8cfa60 b0aeccc c99f503 b0aeccc c99f503 b8cfa60 c99f503 b8cfa60 3115532 b8cfa60 3115532 b8cfa60 3115532 b8cfa60 3115532 b8cfa60 f0dc6f2 b0aeccc 60be3de f35a5cc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 | """
Reachy Mini Home Assistant Voice Assistant Application
This is the main entry point for the Reachy Mini application that integrates
with Home Assistant via ESPHome protocol for voice control.
"""
import asyncio
import logging
import socket
import threading
import time
from typing import Optional
logger = logging.getLogger(__name__)
def _check_zenoh_available(timeout: float = 1.0) -> bool:
"""Check if Zenoh service is available."""
try:
with socket.create_connection(("127.0.0.1", 7447), timeout=timeout):
return True
except (socket.timeout, ConnectionRefusedError, OSError):
return False
# Only import ReachyMiniApp if we're running as an app
try:
from reachy_mini import ReachyMini, ReachyMiniApp
REACHY_MINI_AVAILABLE = True
except ImportError:
REACHY_MINI_AVAILABLE = False
# Create a dummy base class
class ReachyMiniApp:
custom_app_url = None
def __init__(self):
self.stop_event = threading.Event()
def wrapped_run(self, *args, **kwargs):
pass
def stop(self):
self.stop_event.set()
ReachyMini = None
from .voice_assistant import VoiceAssistantService
from .motion import ReachyMiniMotion
class ReachyMiniHaVoice(ReachyMiniApp):
"""
Reachy Mini Home Assistant Voice Assistant Application.
This app runs an ESPHome-compatible voice satellite that connects
to Home Assistant for STT/TTS processing while providing local
wake word detection and robot motion feedback.
"""
# No custom web UI needed - configuration is automatic via Home Assistant
custom_app_url: Optional[str] = None
def __init__(self, *args, **kwargs):
"""Initialize the app."""
super().__init__(*args, **kwargs)
if not hasattr(self, 'stop_event'):
self.stop_event = threading.Event()
def wrapped_run(self, *args, **kwargs) -> None:
"""
Override wrapped_run to handle Zenoh connection failures gracefully.
If Zenoh is not available, run in standalone mode without robot control.
"""
logger.info("Starting Reachy Mini HA Voice App...")
# Check if Zenoh is available before trying to connect
if not _check_zenoh_available():
logger.warning("Zenoh service not available (port 7447)")
logger.info("Running in standalone mode without robot control")
self._run_standalone()
return
# Zenoh is available, try normal startup with ReachyMini
if REACHY_MINI_AVAILABLE:
try:
logger.info("Attempting to connect to Reachy Mini...")
super().wrapped_run(*args, **kwargs)
except TimeoutError as e:
logger.warning(f"Timeout connecting to Reachy Mini: {e}")
logger.info("Falling back to standalone mode")
self._run_standalone()
except Exception as e:
error_str = str(e)
if "Unable to connect" in error_str or "ZError" in error_str or "Timeout" in error_str:
logger.warning(f"Failed to connect to Reachy Mini: {e}")
logger.info("Falling back to standalone mode")
self._run_standalone()
else:
raise
else:
logger.info("Reachy Mini SDK not available, running standalone")
self._run_standalone()
def _run_standalone(self) -> None:
"""Run in standalone mode without robot."""
self.run(None, self.stop_event)
def run(self, reachy_mini, stop_event: threading.Event) -> None:
"""
Main application entry point.
Args:
reachy_mini: The Reachy Mini robot instance (can be None)
stop_event: Event to signal graceful shutdown
"""
logger.info("Starting Home Assistant Voice Assistant...")
# Create and run the voice assistant service
service = VoiceAssistantService(reachy_mini)
# Try to get existing event loop, create new one if needed
try:
loop = asyncio.get_running_loop()
logger.debug("Using existing event loop")
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
logger.debug("Created new event loop")
try:
loop.run_until_complete(service.start())
logger.info("=" * 50)
logger.info("Home Assistant Voice Assistant Started!")
logger.info("=" * 50)
logger.info("ESPHome Server: 0.0.0.0:6053")
logger.info("Camera Server: 0.0.0.0:8081")
logger.info("Wake word: Okay Nabu")
if reachy_mini:
logger.info("Motion control: enabled")
logger.info("Camera: enabled (Reachy Mini)")
else:
logger.info("Motion control: disabled (no robot)")
logger.info("Camera: test pattern (no robot)")
logger.info("=" * 50)
logger.info("To connect from Home Assistant:")
logger.info(" Settings -> Devices & Services -> Add Integration")
logger.info(" -> ESPHome -> Enter this device's IP:6053")
logger.info(" -> Generic Camera -> http://<ip>:8081/stream")
logger.info("=" * 50)
# Wait for stop signal - use simple sleep to avoid blocking event loop
while not stop_event.is_set():
time.sleep(0.1)
except KeyboardInterrupt:
logger.info("Keyboard interruption in main thread... closing server.")
except Exception as e:
logger.error(f"Error running voice assistant: {e}")
raise
finally:
logger.info("Shutting down voice assistant...")
try:
loop.run_until_complete(service.stop())
except Exception as e:
logger.error(f"Error stopping service: {e}")
# Clean up robot connection if available
if reachy_mini is not None:
try:
# Ensure media is explicitly closed before disconnecting
if hasattr(reachy_mini, 'media'):
reachy_mini.media.close()
logger.debug("Robot media closed")
except Exception as e:
logger.debug(f"Error closing media during shutdown: {e}")
try:
# Prevent connection from keeping threads alive
reachy_mini.client.disconnect()
logger.debug("Robot client disconnected")
except Exception as e:
logger.debug(f"Error disconnecting client during shutdown: {e}")
# Close event loop
try:
loop.close()
except Exception as e:
logger.debug(f"Error closing event loop: {e}")
logger.info("Voice assistant stopped.")
# This is called when running as: python -m reachy_mini_ha_voice.main
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
app = ReachyMiniHaVoice()
try:
app.wrapped_run()
except KeyboardInterrupt:
app.stop()
|