Desmond-Dong's picture
fix: Add debug logging for noise suppression, fix PP_MIN_NN value
9844c98
raw
history blame
33.9 kB
"""Reachy Mini controller wrapper for ESPHome entities."""
import logging
import time
from typing import Any, Dict, Optional, TYPE_CHECKING
import math
import numpy as np
from scipy.spatial.transform import Rotation as R
import requests
if TYPE_CHECKING:
from reachy_mini import ReachyMini
logger = logging.getLogger(__name__)
class _ReSpeakerContext:
"""Context manager for thread-safe ReSpeaker access."""
def __init__(self, respeaker, lock):
self._respeaker = respeaker
self._lock = lock
def __enter__(self):
self._lock.acquire()
return self._respeaker
def __exit__(self, exc_type, exc_val, exc_tb):
self._lock.release()
return False
class ReachyController:
"""
Wrapper class for Reachy Mini control operations.
Provides safe access to Reachy Mini SDK functions with error handling
and fallback for standalone mode (when robot is not available).
"""
def __init__(self, reachy_mini: Optional["ReachyMini"] = None):
"""
Initialize the controller.
Args:
reachy_mini: ReachyMini instance, or None for standalone mode
"""
self.reachy = reachy_mini
self._speaker_volume = 100 # Default volume
self._movement_manager = None # Set later via set_movement_manager()
# Status caching - only for get_status() which may trigger I/O
# Note: get_current_head_pose() and get_current_joint_positions() are
# non-blocking in the SDK (they return cached Zenoh data), so no caching needed
self._state_cache: Dict[str, Any] = {}
self._cache_ttl = 2.0 # 2 second cache TTL for status queries (increased from 1s)
self._last_status_query = 0.0
# Thread lock for ReSpeaker USB access to prevent conflicts with GStreamer audio pipeline
self._respeaker_lock = __import__('threading').Lock()
def set_movement_manager(self, movement_manager) -> None:
"""Set the MovementManager instance for pose control.
Args:
movement_manager: MovementManager instance
"""
self._movement_manager = movement_manager
logger.info("MovementManager set for ReachyController")
@property
def is_available(self) -> bool:
"""Check if robot is available."""
return self.reachy is not None
# ========== Phase 1: Basic Status & Volume ==========
def _get_cached_status(self) -> Optional[Dict]:
"""Get cached daemon status to reduce query frequency.
Note: get_status() may trigger I/O, so we cache it.
Unlike get_current_head_pose() and get_current_joint_positions()
which are non-blocking in the SDK.
"""
now = time.time()
if now - self._last_status_query < self._cache_ttl:
return self._state_cache.get('status')
if not self.is_available:
return None
try:
status = self.reachy.client.get_status(wait=False)
self._state_cache['status'] = status
self._last_status_query = now
return status
except Exception as e:
logger.error(f"Error getting status: {e}")
return self._state_cache.get('status') # Return stale cache on error
def get_daemon_state(self) -> str:
"""Get daemon state with caching."""
status = self._get_cached_status()
if status is None:
return "not_available"
return status.get('state', 'unknown')
def get_backend_ready(self) -> bool:
"""Check if backend is ready with caching."""
status = self._get_cached_status()
if status is None:
return False
return status.get('state') == 'running'
def get_error_message(self) -> str:
"""Get current error message with caching."""
status = self._get_cached_status()
if status is None:
return "Robot not available"
return status.get('error') or ""
def get_speaker_volume(self) -> float:
"""Get speaker volume (0-100) with caching."""
if not self.is_available:
return self._speaker_volume
try:
# Get volume from daemon API (use cached status for IP)
status = self._get_cached_status()
if status is None:
return self._speaker_volume
wlan_ip = status.get('wlan_ip', 'localhost')
response = requests.get(f"http://{wlan_ip}:8000/api/volume/current", timeout=2)
if response.status_code == 200:
data = response.json()
self._speaker_volume = float(data.get('volume', self._speaker_volume))
except Exception as e:
logger.debug(f"Could not get volume from API: {e}")
return self._speaker_volume
def set_speaker_volume(self, volume: float) -> None:
"""
Set speaker volume (0-100) with cached status.
Args:
volume: Volume level 0-100
"""
volume = max(0.0, min(100.0, volume))
self._speaker_volume = volume
if not self.is_available:
logger.warning("Cannot set volume: robot not available")
return
try:
# Set volume via daemon API (use cached status for IP)
status = self._get_cached_status()
if status is None:
logger.error("Cannot get daemon status for volume control")
return
wlan_ip = status.get('wlan_ip', 'localhost')
response = requests.post(
f"http://{wlan_ip}:8000/api/volume/set",
json={"volume": int(volume)},
timeout=5
)
if response.status_code == 200:
logger.info(f"Speaker volume set to {volume}%")
else:
logger.error(f"Failed to set volume: {response.status_code} {response.text}")
except Exception as e:
logger.error(f"Error setting speaker volume: {e}")
def get_microphone_volume(self) -> float:
"""Get microphone volume (0-100) using daemon HTTP API."""
if not self.is_available:
return getattr(self, '_microphone_volume', 50.0)
try:
# Get WLAN IP from cached daemon status
status = self._get_cached_status()
if status is None:
return getattr(self, '_microphone_volume', 50.0)
wlan_ip = status.get('wlan_ip', 'localhost')
# Call the daemon API to get microphone volume
response = requests.get(
f"http://{wlan_ip}:8000/api/volume/microphone/current",
timeout=2
)
if response.status_code == 200:
data = response.json()
self._microphone_volume = float(data.get('volume', 50))
return self._microphone_volume
except Exception as e:
logger.debug(f"Could not get microphone volume from API: {e}")
return getattr(self, '_microphone_volume', 50.0)
def set_microphone_volume(self, volume: float) -> None:
"""
Set microphone volume (0-100) using daemon HTTP API.
Args:
volume: Volume level 0-100
"""
volume = max(0.0, min(100.0, volume))
self._microphone_volume = volume
if not self.is_available:
logger.warning("Cannot set microphone volume: robot not available")
return
try:
# Get WLAN IP from cached daemon status
status = self._get_cached_status()
if status is None:
logger.error("Cannot get daemon status for microphone volume control")
return
wlan_ip = status.get('wlan_ip', 'localhost')
# Call the daemon API to set microphone volume
response = requests.post(
f"http://{wlan_ip}:8000/api/volume/microphone/set",
json={"volume": int(volume)},
timeout=5
)
if response.status_code == 200:
logger.info(f"Microphone volume set to {volume}%")
else:
logger.error(f"Failed to set microphone volume: {response.status_code} {response.text}")
except Exception as e:
logger.error(f"Error setting microphone volume: {e}")
# ========== Phase 2: Motor Control ==========
def get_motors_enabled(self) -> bool:
"""Check if motors are enabled with caching."""
status = self._get_cached_status()
if status is None:
return False
try:
backend_status = status.get('backend_status')
if backend_status and isinstance(backend_status, dict):
motor_mode = backend_status.get('motor_control_mode', 'disabled')
return motor_mode == 'enabled'
return status.get('state') == 'running'
except Exception as e:
logger.error(f"Error getting motor state: {e}")
return False
def set_motors_enabled(self, enabled: bool) -> None:
"""
Enable or disable motors.
Args:
enabled: True to enable, False to disable
"""
if not self.is_available:
logger.warning("Cannot control motors: robot not available")
return
try:
if enabled:
self.reachy.enable_motors()
logger.info("Motors enabled")
else:
self.reachy.disable_motors()
logger.info("Motors disabled")
except Exception as e:
logger.error(f"Error setting motor state: {e}")
def get_motor_mode(self) -> str:
"""Get current motor control mode with caching."""
status = self._get_cached_status()
if status is None:
return "disabled"
try:
backend_status = status.get('backend_status')
if backend_status and isinstance(backend_status, dict):
motor_mode = backend_status.get('motor_control_mode', 'disabled')
return motor_mode
if status.get('state') == 'running':
return "enabled"
return "disabled"
except Exception as e:
logger.error(f"Error getting motor mode: {e}")
return "error"
def set_motor_mode(self, mode: str) -> None:
"""
Set motor control mode.
Args:
mode: One of "enabled", "disabled", "gravity_compensation"
"""
if not self.is_available:
logger.warning("Cannot set motor mode: robot not available")
return
try:
if mode == "enabled":
self.reachy.enable_motors()
elif mode == "disabled":
self.reachy.disable_motors()
elif mode == "gravity_compensation":
self.reachy.enable_gravity_compensation()
else:
logger.warning(f"Invalid motor mode: {mode}")
return
logger.info(f"Motor mode set to {mode}")
except Exception as e:
logger.error(f"Error setting motor mode: {e}")
def wake_up(self) -> None:
"""Execute wake up animation."""
if not self.is_available:
logger.warning("Cannot wake up: robot not available")
return
try:
self.reachy.wake_up()
logger.info("Wake up animation executed")
except Exception as e:
logger.error(f"Error executing wake up: {e}")
def go_to_sleep(self) -> None:
"""Execute sleep animation."""
if not self.is_available:
logger.warning("Cannot sleep: robot not available")
return
try:
self.reachy.goto_sleep()
logger.info("Sleep animation executed")
except Exception as e:
logger.error(f"Error executing sleep: {e}")
# ========== Phase 3: Pose Control ==========
def _get_head_pose(self) -> Optional[np.ndarray]:
"""Get current head pose from SDK.
Note: SDK's get_current_head_pose() is non-blocking - it returns
cached data from Zenoh subscriptions, so no throttling needed.
"""
if not self.is_available:
return None
try:
return self.reachy.get_current_head_pose()
except Exception as e:
logger.error(f"Error getting head pose: {e}")
return None
def _get_joint_positions(self) -> Optional[tuple]:
"""Get current joint positions from SDK.
Note: SDK's get_current_joint_positions() is non-blocking - it returns
cached data from Zenoh subscriptions, so no throttling needed.
"""
if not self.is_available:
return None
try:
return self.reachy.get_current_joint_positions()
except Exception as e:
logger.error(f"Error getting joint positions: {e}")
return None
def _extract_pose_from_matrix(self, pose_matrix: np.ndarray) -> tuple:
"""
Extract position (x, y, z) and rotation (roll, pitch, yaw) from 4x4 pose matrix.
Args:
pose_matrix: 4x4 homogeneous transformation matrix
Returns:
tuple: (x, y, z, roll, pitch, yaw) where position is in meters and angles in radians
"""
# Extract position from the last column
x = pose_matrix[0, 3]
y = pose_matrix[1, 3]
z = pose_matrix[2, 3]
# Extract rotation matrix and convert to euler angles
rotation_matrix = pose_matrix[:3, :3]
rotation = R.from_matrix(rotation_matrix)
# Use 'xyz' convention for roll, pitch, yaw
roll, pitch, yaw = rotation.as_euler('xyz')
return x, y, z, roll, pitch, yaw
def _get_head_pose_component(self, component: str) -> float:
"""Get a specific component from head pose.
Args:
component: One of 'x', 'y', 'z' (mm), 'roll', 'pitch', 'yaw' (degrees)
Returns:
The component value, or 0.0 on error
"""
pose = self._get_head_pose()
if pose is None:
return 0.0
try:
x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose)
components = {
'x': x * 1000, # m to mm
'y': y * 1000,
'z': z * 1000,
'roll': math.degrees(roll),
'pitch': math.degrees(pitch),
'yaw': math.degrees(yaw),
}
return components.get(component, 0.0)
except Exception as e:
logger.error(f"Error getting head {component}: {e}")
return 0.0
def _disabled_pose_setter(self, name: str) -> None:
"""Log warning when MovementManager is not available."""
logger.warning(f"set_{name} failed - MovementManager not set")
def _set_pose_via_manager(self, **kwargs) -> bool:
"""Set pose via MovementManager if available.
Returns True if successful, False if MovementManager not available.
"""
if self._movement_manager is None:
return False
self._movement_manager.set_target_pose(**kwargs)
return True
# Head position getters and setters
def get_head_x(self) -> float:
"""Get head X position in mm."""
return self._get_head_pose_component('x')
def set_head_x(self, x_mm: float) -> None:
"""Set head X position in mm via MovementManager."""
if not self._set_pose_via_manager(x=x_mm / 1000.0): # mm to m
self._disabled_pose_setter('head_x')
def get_head_y(self) -> float:
"""Get head Y position in mm."""
return self._get_head_pose_component('y')
def set_head_y(self, y_mm: float) -> None:
"""Set head Y position in mm via MovementManager."""
if not self._set_pose_via_manager(y=y_mm / 1000.0): # mm to m
self._disabled_pose_setter('head_y')
def get_head_z(self) -> float:
"""Get head Z position in mm."""
return self._get_head_pose_component('z')
def set_head_z(self, z_mm: float) -> None:
"""Set head Z position in mm via MovementManager."""
if not self._set_pose_via_manager(z=z_mm / 1000.0): # mm to m
self._disabled_pose_setter('head_z')
# Head orientation getters and setters
def get_head_roll(self) -> float:
"""Get head roll angle in degrees."""
return self._get_head_pose_component('roll')
def set_head_roll(self, roll_deg: float) -> None:
"""Set head roll angle in degrees via MovementManager."""
if not self._set_pose_via_manager(roll=math.radians(roll_deg)):
self._disabled_pose_setter('head_roll')
def get_head_pitch(self) -> float:
"""Get head pitch angle in degrees."""
return self._get_head_pose_component('pitch')
def set_head_pitch(self, pitch_deg: float) -> None:
"""Set head pitch angle in degrees via MovementManager."""
if not self._set_pose_via_manager(pitch=math.radians(pitch_deg)):
self._disabled_pose_setter('head_pitch')
def get_head_yaw(self) -> float:
"""Get head yaw angle in degrees."""
return self._get_head_pose_component('yaw')
def set_head_yaw(self, yaw_deg: float) -> None:
"""Set head yaw angle in degrees via MovementManager."""
if not self._set_pose_via_manager(yaw=math.radians(yaw_deg)):
self._disabled_pose_setter('head_yaw')
def get_body_yaw(self) -> float:
"""Get body yaw angle in degrees."""
joints = self._get_joint_positions()
if joints is None:
return 0.0
try:
head_joints, _ = joints
return math.degrees(head_joints[0])
except Exception as e:
logger.error(f"Error getting body yaw: {e}")
return 0.0
def set_body_yaw(self, yaw_deg: float) -> None:
"""Set body yaw angle in degrees via MovementManager."""
if not self._set_pose_via_manager(body_yaw=math.radians(yaw_deg)):
self._disabled_pose_setter('body_yaw')
def get_antenna_left(self) -> float:
"""Get left antenna angle in degrees."""
joints = self._get_joint_positions()
if joints is None:
return 0.0
try:
_, antennas = joints
return math.degrees(antennas[1]) # left is index 1
except Exception as e:
logger.error(f"Error getting left antenna: {e}")
return 0.0
def set_antenna_left(self, angle_deg: float) -> None:
"""Set left antenna angle in degrees via MovementManager."""
if not self._set_pose_via_manager(antenna_left=math.radians(angle_deg)):
self._disabled_pose_setter('antenna_left')
def get_antenna_right(self) -> float:
"""Get right antenna angle in degrees."""
joints = self._get_joint_positions()
if joints is None:
return 0.0
try:
_, antennas = joints
return math.degrees(antennas[0]) # right is index 0
except Exception as e:
logger.error(f"Error getting right antenna: {e}")
return 0.0
def set_antenna_right(self, angle_deg: float) -> None:
"""Set right antenna angle in degrees via MovementManager."""
if not self._set_pose_via_manager(antenna_right=math.radians(angle_deg)):
self._disabled_pose_setter('antenna_right')
# ========== Phase 4: Look At Control ==========
def get_look_at_x(self) -> float:
"""Get look at target X coordinate in world frame (meters)."""
# This is a target position, not a current state
# We'll store it internally
return getattr(self, '_look_at_x', 0.0)
def set_look_at_x(self, x: float) -> None:
"""Set look at target X coordinate."""
self._look_at_x = x
self._update_look_at()
def get_look_at_y(self) -> float:
"""Get look at target Y coordinate in world frame (meters)."""
return getattr(self, '_look_at_y', 0.0)
def set_look_at_y(self, y: float) -> None:
"""Set look at target Y coordinate."""
self._look_at_y = y
self._update_look_at()
def get_look_at_z(self) -> float:
"""Get look at target Z coordinate in world frame (meters)."""
return getattr(self, '_look_at_z', 0.0)
def set_look_at_z(self, z: float) -> None:
"""Set look at target Z coordinate."""
self._look_at_z = z
self._update_look_at()
def _update_look_at(self) -> None:
"""Update robot to look at the target coordinates.
NOTE: Disabled to prevent conflict with MovementManager's control loop.
"""
logger.warning("_update_look_at is disabled - MovementManager controls head pose")
# if not self.is_available:
# return
# try:
# x = getattr(self, '_look_at_x', 0.0)
# y = getattr(self, '_look_at_y', 0.0)
# z = getattr(self, '_look_at_z', 0.0)
# self.reachy.look_at_world(x, y, z)
# logger.info(f"Looking at world coordinates: ({x}, {y}, {z})")
# except Exception as e:
# logger.error(f"Error updating look at: {e}")
# ========== Phase 6: Diagnostic Information ==========
def get_control_loop_frequency(self) -> float:
"""Get control loop frequency in Hz with caching."""
status = self._get_cached_status()
if status is None:
return 0.0
try:
backend_status = status.get('backend_status')
if backend_status and isinstance(backend_status, dict):
control_loop_stats = backend_status.get('control_loop_stats', {})
return control_loop_stats.get('mean_control_loop_frequency', 0.0)
return 0.0
except Exception as e:
logger.error(f"Error getting control loop frequency: {e}")
return 0.0
def get_sdk_version(self) -> str:
"""Get SDK version with caching."""
status = self._get_cached_status()
if status is None:
return "N/A"
return status.get('version') or "unknown"
def get_robot_name(self) -> str:
"""Get robot name with caching."""
status = self._get_cached_status()
if status is None:
return "N/A"
return status.get('robot_name') or "unknown"
def get_wireless_version(self) -> bool:
"""Check if this is a wireless version with caching."""
status = self._get_cached_status()
if status is None:
return False
return status.get('wireless_version', False)
def get_simulation_mode(self) -> bool:
"""Check if simulation mode is enabled with caching."""
status = self._get_cached_status()
if status is None:
return False
return status.get('simulation_enabled', False)
def get_wlan_ip(self) -> str:
"""Get WLAN IP address with caching."""
status = self._get_cached_status()
if status is None:
return "N/A"
return status.get('wlan_ip') or "N/A"
# ========== Phase 7: IMU Sensors (Wireless only) ==========
def _get_imu_value(self, sensor_type: str, index: int) -> float:
"""Get a specific IMU sensor value.
Args:
sensor_type: 'accelerometer', 'gyroscope', or 'temperature'
index: Array index (0=x, 1=y, 2=z) or -1 for scalar values
Returns:
The sensor value, or 0.0 on error
"""
if not self.is_available:
return 0.0
try:
imu_data = self.reachy.imu
if imu_data is None or sensor_type not in imu_data:
return 0.0
value = imu_data[sensor_type]
return float(value[index]) if index >= 0 else float(value)
except Exception as e:
logger.debug(f"Error getting IMU {sensor_type}: {e}")
return 0.0
def get_imu_accel_x(self) -> float:
"""Get IMU X-axis acceleration in m/s²."""
return self._get_imu_value('accelerometer', 0)
def get_imu_accel_y(self) -> float:
"""Get IMU Y-axis acceleration in m/s²."""
return self._get_imu_value('accelerometer', 1)
def get_imu_accel_z(self) -> float:
"""Get IMU Z-axis acceleration in m/s²."""
return self._get_imu_value('accelerometer', 2)
def get_imu_gyro_x(self) -> float:
"""Get IMU X-axis angular velocity in rad/s."""
return self._get_imu_value('gyroscope', 0)
def get_imu_gyro_y(self) -> float:
"""Get IMU Y-axis angular velocity in rad/s."""
return self._get_imu_value('gyroscope', 1)
def get_imu_gyro_z(self) -> float:
"""Get IMU Z-axis angular velocity in rad/s."""
return self._get_imu_value('gyroscope', 2)
def get_imu_temperature(self) -> float:
"""Get IMU temperature in °C."""
return self._get_imu_value('temperature', -1)
# ========== Phase 11: LED Control (DISABLED) ==========
# LED control is disabled because LEDs are hidden inside the robot.
# See PROJECT_PLAN.md principle 8.
def _get_respeaker(self):
"""Get ReSpeaker device from media manager with thread-safe access.
Returns a context manager that holds the lock during ReSpeaker operations.
Usage:
with self._get_respeaker() as respeaker:
if respeaker:
respeaker.read("...")
"""
if not self.is_available:
return _ReSpeakerContext(None, self._respeaker_lock)
try:
if not self.reachy.media or not self.reachy.media.audio:
return _ReSpeakerContext(None, self._respeaker_lock)
respeaker = self.reachy.media.audio._respeaker
return _ReSpeakerContext(respeaker, self._respeaker_lock)
except Exception:
return _ReSpeakerContext(None, self._respeaker_lock)
# ========== Phase 12: Audio Processing (via local SDK with thread-safe access) ==========
def get_agc_enabled(self) -> bool:
"""Get AGC (Automatic Gain Control) enabled status."""
with self._get_respeaker() as respeaker:
if respeaker is None:
return getattr(self, '_agc_enabled', True) # Default to enabled
try:
result = respeaker.read("PP_AGCONOFF")
if result is not None:
self._agc_enabled = bool(result[1])
return self._agc_enabled
except Exception as e:
logger.debug(f"Error getting AGC status: {e}")
return getattr(self, '_agc_enabled', True)
def set_agc_enabled(self, enabled: bool) -> None:
"""Set AGC (Automatic Gain Control) enabled status."""
self._agc_enabled = enabled
with self._get_respeaker() as respeaker:
if respeaker is None:
return
try:
respeaker.write("PP_AGCONOFF", [1 if enabled else 0])
logger.info(f"AGC {'enabled' if enabled else 'disabled'}")
except Exception as e:
logger.error(f"Error setting AGC status: {e}")
def get_agc_max_gain(self) -> float:
"""Get AGC maximum gain in dB (0-40 dB range)."""
with self._get_respeaker() as respeaker:
if respeaker is None:
return getattr(self, '_agc_max_gain', 30.0) # Default to optimized value
try:
result = respeaker.read("PP_AGCMAXGAIN")
if result is not None:
self._agc_max_gain = float(result[0])
return self._agc_max_gain
except Exception as e:
logger.debug(f"Error getting AGC max gain: {e}")
return getattr(self, '_agc_max_gain', 30.0)
def set_agc_max_gain(self, gain: float) -> None:
"""Set AGC maximum gain in dB (0-40 dB range)."""
gain = max(0.0, min(40.0, gain)) # XVF3800 supports up to 40dB
self._agc_max_gain = gain
with self._get_respeaker() as respeaker:
if respeaker is None:
return
try:
respeaker.write("PP_AGCMAXGAIN", [gain])
logger.info(f"AGC max gain set to {gain} dB")
except Exception as e:
logger.error(f"Error setting AGC max gain: {e}")
def get_noise_suppression(self) -> float:
"""Get noise suppression level (0-100%).
PP_MIN_NS represents "minimum signal preservation ratio":
- PP_MIN_NS = 0.85 means "keep at least 85% of signal" = 15% suppression
- PP_MIN_NS = 0.15 means "keep at least 15% of signal" = 85% suppression
We display "noise suppression strength" to user, so:
- suppression_percent = (1.0 - PP_MIN_NS) * 100
"""
with self._get_respeaker() as respeaker:
if respeaker is None:
return getattr(self, '_noise_suppression', 15.0)
try:
result = respeaker.read("PP_MIN_NS")
if result is not None:
raw_value = result[0]
# Convert: PP_MIN_NS=0.85 -> 15% suppression, PP_MIN_NS=0.15 -> 85% suppression
self._noise_suppression = max(0.0, min(100.0, (1.0 - raw_value) * 100.0))
logger.debug(f"Noise suppression: PP_MIN_NS={raw_value:.2f} -> {self._noise_suppression:.1f}%")
return self._noise_suppression
except Exception as e:
logger.debug(f"Error getting noise suppression: {e}")
return getattr(self, '_noise_suppression', 15.0)
def set_noise_suppression(self, level: float) -> None:
"""Set noise suppression level (0-100%)."""
level = max(0.0, min(100.0, level))
self._noise_suppression = level
with self._get_respeaker() as respeaker:
if respeaker is None:
return
try:
# Convert percentage to PP_MIN_NS value (inverted)
value = 1.0 - (level / 100.0)
respeaker.write("PP_MIN_NS", [value])
logger.info(f"Noise suppression set to {level}%")
except Exception as e:
logger.error(f"Error setting noise suppression: {e}")
def get_echo_cancellation_converged(self) -> bool:
"""Check if echo cancellation has converged."""
with self._get_respeaker() as respeaker:
if respeaker is None:
return False
try:
result = respeaker.read("AEC_AECCONVERGED")
if result is not None:
return bool(result[1])
except Exception as e:
logger.debug(f"Error getting AEC converged status: {e}")
return False
# ========== DOA (Direction of Arrival) ==========
def get_doa_angle(self) -> tuple[float, bool] | None:
"""Get Direction of Arrival angle from microphone array.
The DOA angle indicates the direction of the sound source relative to the robot.
Angle is in radians: 0 = left, π/2 = front/back, π = right.
Returns:
Tuple of (angle_radians, speech_detected), or None if unavailable.
- angle_radians: Sound source direction in radians
- speech_detected: Whether speech is currently detected
"""
if not self.is_available:
return None
try:
if self.reachy.media and self.reachy.media.audio:
return self.reachy.media.audio.get_DoA()
except Exception as e:
logger.debug(f"Error getting DOA: {e}")
return None
def get_doa_angle_degrees(self) -> float:
"""Get DOA angle in degrees for Home Assistant entity.
Returns the raw DOA angle in degrees (0-180°).
SDK convention: 0° = left, 90° = front, 180° = right
"""
doa = self.get_doa_angle()
if doa is None:
return 0.0
angle_rad, _ = doa
# Return raw angle in degrees (0-180°)
angle_deg = math.degrees(angle_rad)
return angle_deg
def get_speech_detected(self) -> bool:
"""Get speech detection status from DOA.
Returns True if speech is currently detected.
"""
doa = self.get_doa_angle()
if doa is None:
return False
_, speech_detected = doa
return speech_detected