Desmond-Dong's picture
fix: pass current body_yaw to SDK for proper automatic body yaw calculation
9181fcd
Raw
History Blame
34.2 kB
"""Reachy Mini controller wrapper for ESPHome entities."""
import logging
import time
from typing import Any, Dict, Optional, TYPE_CHECKING
import math
import numpy as np
from scipy.spatial.transform import Rotation as R
import requests
if TYPE_CHECKING:
from reachy_mini import ReachyMini
logger = logging.getLogger(__name__)
class _ReSpeakerContext:
"""Context manager for thread-safe ReSpeaker access."""
def __init__(self, respeaker, lock):
self._respeaker = respeaker
self._lock = lock
def __enter__(self):
self._lock.acquire()
return self._respeaker
def __exit__(self, exc_type, exc_val, exc_tb):
self._lock.release()
return False
class ReachyController:
"""
Wrapper class for Reachy Mini control operations.
Provides safe access to Reachy Mini SDK functions with error handling
and fallback for standalone mode (when robot is not available).
"""
def __init__(self, reachy_mini: Optional["ReachyMini"] = None):
"""
Initialize the controller.
Args:
reachy_mini: ReachyMini instance, or None for standalone mode
"""
self.reachy = reachy_mini
self._speaker_volume = 100 # Default volume
self._movement_manager = None # Set later via set_movement_manager()
# Status caching - only for get_status() which may trigger I/O
# Note: get_current_head_pose() and get_current_joint_positions() are
# non-blocking in the SDK (they return cached Zenoh data), so no caching needed
self._state_cache: Dict[str, Any] = {}
self._cache_ttl = 2.0 # 2 second cache TTL for status queries (increased from 1s)
self._last_status_query = 0.0
# Thread lock for ReSpeaker USB access to prevent conflicts with GStreamer audio pipeline
self._respeaker_lock = __import__('threading').Lock()
def set_movement_manager(self, movement_manager) -> None:
"""Set the MovementManager instance for pose control.
Args:
movement_manager: MovementManager instance
"""
self._movement_manager = movement_manager
logger.info("MovementManager set for ReachyController")
@property
def is_available(self) -> bool:
"""Check if robot is available."""
return self.reachy is not None
# ========== Phase 1: Basic Status & Volume ==========
def _get_cached_status(self) -> Optional[Dict]:
"""Get cached daemon status to reduce query frequency.
Note: get_status() may trigger I/O, so we cache it.
Unlike get_current_head_pose() and get_current_joint_positions()
which are non-blocking in the SDK.
"""
now = time.time()
if now - self._last_status_query < self._cache_ttl:
return self._state_cache.get('status')
if not self.is_available:
return None
try:
status = self.reachy.client.get_status(wait=False)
self._state_cache['status'] = status
self._last_status_query = now
return status
except Exception as e:
logger.error(f"Error getting status: {e}")
return self._state_cache.get('status') # Return stale cache on error
def get_daemon_state(self) -> str:
"""Get daemon state with caching."""
status = self._get_cached_status()
if status is None:
return "not_available"
return status.get('state', 'unknown')
def get_backend_ready(self) -> bool:
"""Check if backend is ready with caching."""
status = self._get_cached_status()
if status is None:
return False
return status.get('state') == 'running'
def get_error_message(self) -> str:
"""Get current error message with caching."""
status = self._get_cached_status()
if status is None:
return "Robot not available"
return status.get('error') or ""
def get_speaker_volume(self) -> float:
"""Get speaker volume (0-100) with caching."""
if not self.is_available:
return self._speaker_volume
try:
# Get volume from daemon API (use cached status for IP)
status = self._get_cached_status()
if status is None:
return self._speaker_volume
wlan_ip = status.get('wlan_ip', 'localhost')
response = requests.get(f"http://{wlan_ip}:8000/api/volume/current", timeout=2)
if response.status_code == 200:
data = response.json()
self._speaker_volume = float(data.get('volume', self._speaker_volume))
except Exception as e:
logger.debug(f"Could not get volume from API: {e}")
return self._speaker_volume
def set_speaker_volume(self, volume: float) -> None:
"""
Set speaker volume (0-100) with cached status.
Args:
volume: Volume level 0-100
"""
volume = max(0.0, min(100.0, volume))
self._speaker_volume = volume
if not self.is_available:
logger.warning("Cannot set volume: robot not available")
return
try:
# Set volume via daemon API (use cached status for IP)
status = self._get_cached_status()
if status is None:
logger.error("Cannot get daemon status for volume control")
return
wlan_ip = status.get('wlan_ip', 'localhost')
response = requests.post(
f"http://{wlan_ip}:8000/api/volume/set",
json={"volume": int(volume)},
timeout=5
)
if response.status_code == 200:
logger.info(f"Speaker volume set to {volume}%")
else:
logger.error(f"Failed to set volume: {response.status_code} {response.text}")
except Exception as e:
logger.error(f"Error setting speaker volume: {e}")
def get_microphone_volume(self) -> float:
"""Get microphone volume (0-100) using daemon HTTP API."""
if not self.is_available:
return getattr(self, '_microphone_volume', 50.0)
try:
# Get WLAN IP from cached daemon status
status = self._get_cached_status()
if status is None:
return getattr(self, '_microphone_volume', 50.0)
wlan_ip = status.get('wlan_ip', 'localhost')
# Call the daemon API to get microphone volume
response = requests.get(
f"http://{wlan_ip}:8000/api/volume/microphone/current",
timeout=2
)
if response.status_code == 200:
data = response.json()
self._microphone_volume = float(data.get('volume', 50))
return self._microphone_volume
except Exception as e:
logger.debug(f"Could not get microphone volume from API: {e}")
return getattr(self, '_microphone_volume', 50.0)
def set_microphone_volume(self, volume: float) -> None:
"""
Set microphone volume (0-100) using daemon HTTP API.
Args:
volume: Volume level 0-100
"""
volume = max(0.0, min(100.0, volume))
self._microphone_volume = volume
if not self.is_available:
logger.warning("Cannot set microphone volume: robot not available")
return
try:
# Get WLAN IP from cached daemon status
status = self._get_cached_status()
if status is None:
logger.error("Cannot get daemon status for microphone volume control")
return
wlan_ip = status.get('wlan_ip', 'localhost')
# Call the daemon API to set microphone volume
response = requests.post(
f"http://{wlan_ip}:8000/api/volume/microphone/set",
json={"volume": int(volume)},
timeout=5
)
if response.status_code == 200:
logger.info(f"Microphone volume set to {volume}%")
else:
logger.error(f"Failed to set microphone volume: {response.status_code} {response.text}")
except Exception as e:
logger.error(f"Error setting microphone volume: {e}")
# ========== Phase 2: Motor Control ==========
def get_motors_enabled(self) -> bool:
"""Check if motors are enabled with caching."""
status = self._get_cached_status()
if status is None:
return False
try:
backend_status = status.get('backend_status')
if backend_status and isinstance(backend_status, dict):
motor_mode = backend_status.get('motor_control_mode', 'disabled')
return motor_mode == 'enabled'
return status.get('state') == 'running'
except Exception as e:
logger.error(f"Error getting motor state: {e}")
return False
def set_motors_enabled(self, enabled: bool) -> None:
"""
Enable or disable motors.
Args:
enabled: True to enable, False to disable
"""
if not self.is_available:
logger.warning("Cannot control motors: robot not available")
return
try:
if enabled:
self.reachy.enable_motors()
logger.info("Motors enabled")
else:
self.reachy.disable_motors()
logger.info("Motors disabled")
except Exception as e:
logger.error(f"Error setting motor state: {e}")
def get_motor_mode(self) -> str:
"""Get current motor control mode with caching."""
status = self._get_cached_status()
if status is None:
return "disabled"
try:
backend_status = status.get('backend_status')
if backend_status and isinstance(backend_status, dict):
motor_mode = backend_status.get('motor_control_mode', 'disabled')
return motor_mode
if status.get('state') == 'running':
return "enabled"
return "disabled"
except Exception as e:
logger.error(f"Error getting motor mode: {e}")
return "error"
def set_motor_mode(self, mode: str) -> None:
"""
Set motor control mode.
Args:
mode: One of "enabled", "disabled", "gravity_compensation"
"""
if not self.is_available:
logger.warning("Cannot set motor mode: robot not available")
return
try:
if mode == "enabled":
self.reachy.enable_motors()
elif mode == "disabled":
self.reachy.disable_motors()
elif mode == "gravity_compensation":
self.reachy.enable_gravity_compensation()
else:
logger.warning(f"Invalid motor mode: {mode}")
return
logger.info(f"Motor mode set to {mode}")
except Exception as e:
logger.error(f"Error setting motor mode: {e}")
def wake_up(self) -> None:
"""Execute wake up animation."""
if not self.is_available:
logger.warning("Cannot wake up: robot not available")
return
try:
self.reachy.wake_up()
logger.info("Wake up animation executed")
except Exception as e:
logger.error(f"Error executing wake up: {e}")
def go_to_sleep(self) -> None:
"""Execute sleep animation."""
if not self.is_available:
logger.warning("Cannot sleep: robot not available")
return
try:
self.reachy.goto_sleep()
logger.info("Sleep animation executed")
except Exception as e:
logger.error(f"Error executing sleep: {e}")
# ========== Phase 3: Pose Control ==========
def _get_head_pose(self) -> Optional[np.ndarray]:
"""Get current head pose from SDK.
Note: SDK's get_current_head_pose() is non-blocking - it returns
cached data from Zenoh subscriptions, so no throttling needed.
"""
if not self.is_available:
return None
try:
return self.reachy.get_current_head_pose()
except Exception as e:
logger.error(f"Error getting head pose: {e}")
return None
def _get_joint_positions(self) -> Optional[tuple]:
"""Get current joint positions from SDK.
Note: SDK's get_current_joint_positions() is non-blocking - it returns
cached data from Zenoh subscriptions, so no throttling needed.
"""
if not self.is_available:
return None
try:
return self.reachy.get_current_joint_positions()
except Exception as e:
logger.error(f"Error getting joint positions: {e}")
return None
def _extract_pose_from_matrix(self, pose_matrix: np.ndarray) -> tuple:
"""
Extract position (x, y, z) and rotation (roll, pitch, yaw) from 4x4 pose matrix.
Args:
pose_matrix: 4x4 homogeneous transformation matrix
Returns:
tuple: (x, y, z, roll, pitch, yaw) where position is in meters and angles in radians
"""
# Extract position from the last column
x = pose_matrix[0, 3]
y = pose_matrix[1, 3]
z = pose_matrix[2, 3]
# Extract rotation matrix and convert to euler angles
rotation_matrix = pose_matrix[:3, :3]
rotation = R.from_matrix(rotation_matrix)
# Use 'xyz' convention for roll, pitch, yaw
roll, pitch, yaw = rotation.as_euler('xyz')
return x, y, z, roll, pitch, yaw
def _get_head_pose_component(self, component: str) -> float:
"""Get a specific component from head pose.
Args:
component: One of 'x', 'y', 'z' (mm), 'roll', 'pitch', 'yaw' (degrees)
Returns:
The component value, or 0.0 on error
"""
pose = self._get_head_pose()
if pose is None:
return 0.0
try:
x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose)
components = {
'x': x * 1000, # m to mm
'y': y * 1000,
'z': z * 1000,
'roll': math.degrees(roll),
'pitch': math.degrees(pitch),
'yaw': math.degrees(yaw),
}
return components.get(component, 0.0)
except Exception as e:
logger.error(f"Error getting head {component}: {e}")
return 0.0
def _disabled_pose_setter(self, name: str) -> None:
"""Log warning when MovementManager is not available."""
logger.warning(f"set_{name} failed - MovementManager not set")
def _set_pose_via_manager(self, **kwargs) -> bool:
"""Set pose via MovementManager if available.
Returns True if successful, False if MovementManager not available.
"""
if self._movement_manager is None:
return False
self._movement_manager.set_target_pose(**kwargs)
return True
# Head position getters and setters
def get_head_x(self) -> float:
"""Get head X position in mm."""
return self._get_head_pose_component('x')
def set_head_x(self, x_mm: float) -> None:
"""Set head X position in mm via MovementManager."""
if not self._set_pose_via_manager(x=x_mm / 1000.0): # mm to m
self._disabled_pose_setter('head_x')
def get_head_y(self) -> float:
"""Get head Y position in mm."""
return self._get_head_pose_component('y')
def set_head_y(self, y_mm: float) -> None:
"""Set head Y position in mm via MovementManager."""
if not self._set_pose_via_manager(y=y_mm / 1000.0): # mm to m
self._disabled_pose_setter('head_y')
def get_head_z(self) -> float:
"""Get head Z position in mm."""
return self._get_head_pose_component('z')
def set_head_z(self, z_mm: float) -> None:
"""Set head Z position in mm via MovementManager."""
if not self._set_pose_via_manager(z=z_mm / 1000.0): # mm to m
self._disabled_pose_setter('head_z')
# Head orientation getters and setters
def get_head_roll(self) -> float:
"""Get head roll angle in degrees."""
return self._get_head_pose_component('roll')
def set_head_roll(self, roll_deg: float) -> None:
"""Set head roll angle in degrees via MovementManager."""
if not self._set_pose_via_manager(roll=math.radians(roll_deg)):
self._disabled_pose_setter('head_roll')
def get_head_pitch(self) -> float:
"""Get head pitch angle in degrees."""
return self._get_head_pose_component('pitch')
def set_head_pitch(self, pitch_deg: float) -> None:
"""Set head pitch angle in degrees via MovementManager."""
if not self._set_pose_via_manager(pitch=math.radians(pitch_deg)):
self._disabled_pose_setter('head_pitch')
def get_head_yaw(self) -> float:
"""Get head yaw angle in degrees."""
return self._get_head_pose_component('yaw')
def set_head_yaw(self, yaw_deg: float) -> None:
"""Set head yaw angle in degrees via MovementManager."""
if not self._set_pose_via_manager(yaw=math.radians(yaw_deg)):
self._disabled_pose_setter('head_yaw')
def get_body_yaw(self) -> float:
"""Get body yaw angle in degrees."""
joints = self._get_joint_positions()
if joints is None:
return 0.0
try:
head_joints, _ = joints
return math.degrees(head_joints[0])
except Exception as e:
logger.error(f"Error getting body yaw: {e}")
return 0.0
def set_body_yaw(self, yaw_deg: float) -> None:
"""Set body yaw angle in degrees.
Note: This directly calls SDK's set_target_body_yaw since automatic body yaw
is enabled. Manual control will temporarily override automatic mode.
"""
if self.reachy is None:
self._disabled_pose_setter('body_yaw')
return
try:
self.reachy.set_target_body_yaw(math.radians(yaw_deg))
except Exception as e:
logger.error(f"Error setting body yaw: {e}")
def get_antenna_left(self) -> float:
"""Get left antenna angle in degrees."""
joints = self._get_joint_positions()
if joints is None:
return 0.0
try:
_, antennas = joints
return math.degrees(antennas[1]) # left is index 1
except Exception as e:
logger.error(f"Error getting left antenna: {e}")
return 0.0
def set_antenna_left(self, angle_deg: float) -> None:
"""Set left antenna angle in degrees via MovementManager."""
if not self._set_pose_via_manager(antenna_left=math.radians(angle_deg)):
self._disabled_pose_setter('antenna_left')
def get_antenna_right(self) -> float:
"""Get right antenna angle in degrees."""
joints = self._get_joint_positions()
if joints is None:
return 0.0
try:
_, antennas = joints
return math.degrees(antennas[0]) # right is index 0
except Exception as e:
logger.error(f"Error getting right antenna: {e}")
return 0.0
def set_antenna_right(self, angle_deg: float) -> None:
"""Set right antenna angle in degrees via MovementManager."""
if not self._set_pose_via_manager(antenna_right=math.radians(angle_deg)):
self._disabled_pose_setter('antenna_right')
# ========== Phase 4: Look At Control ==========
def get_look_at_x(self) -> float:
"""Get look at target X coordinate in world frame (meters)."""
# This is a target position, not a current state
# We'll store it internally
return getattr(self, '_look_at_x', 0.0)
def set_look_at_x(self, x: float) -> None:
"""Set look at target X coordinate."""
self._look_at_x = x
self._update_look_at()
def get_look_at_y(self) -> float:
"""Get look at target Y coordinate in world frame (meters)."""
return getattr(self, '_look_at_y', 0.0)
def set_look_at_y(self, y: float) -> None:
"""Set look at target Y coordinate."""
self._look_at_y = y
self._update_look_at()
def get_look_at_z(self) -> float:
"""Get look at target Z coordinate in world frame (meters)."""
return getattr(self, '_look_at_z', 0.0)
def set_look_at_z(self, z: float) -> None:
"""Set look at target Z coordinate."""
self._look_at_z = z
self._update_look_at()
def _update_look_at(self) -> None:
"""Update robot to look at the target coordinates.
NOTE: Disabled to prevent conflict with MovementManager's control loop.
"""
logger.warning("_update_look_at is disabled - MovementManager controls head pose")
# if not self.is_available:
# return
# try:
# x = getattr(self, '_look_at_x', 0.0)
# y = getattr(self, '_look_at_y', 0.0)
# z = getattr(self, '_look_at_z', 0.0)
# self.reachy.look_at_world(x, y, z)
# logger.info(f"Looking at world coordinates: ({x}, {y}, {z})")
# except Exception as e:
# logger.error(f"Error updating look at: {e}")
# ========== Phase 6: Diagnostic Information ==========
def get_control_loop_frequency(self) -> float:
"""Get control loop frequency in Hz with caching."""
status = self._get_cached_status()
if status is None:
return 0.0
try:
backend_status = status.get('backend_status')
if backend_status and isinstance(backend_status, dict):
control_loop_stats = backend_status.get('control_loop_stats', {})
return control_loop_stats.get('mean_control_loop_frequency', 0.0)
return 0.0
except Exception as e:
logger.error(f"Error getting control loop frequency: {e}")
return 0.0
def get_sdk_version(self) -> str:
"""Get SDK version with caching."""
status = self._get_cached_status()
if status is None:
return "N/A"
return status.get('version') or "unknown"
def get_robot_name(self) -> str:
"""Get robot name with caching."""
status = self._get_cached_status()
if status is None:
return "N/A"
return status.get('robot_name') or "unknown"
def get_wireless_version(self) -> bool:
"""Check if this is a wireless version with caching."""
status = self._get_cached_status()
if status is None:
return False
return status.get('wireless_version', False)
def get_simulation_mode(self) -> bool:
"""Check if simulation mode is enabled with caching."""
status = self._get_cached_status()
if status is None:
return False
return status.get('simulation_enabled', False)
def get_wlan_ip(self) -> str:
"""Get WLAN IP address with caching."""
status = self._get_cached_status()
if status is None:
return "N/A"
return status.get('wlan_ip') or "N/A"
# ========== Phase 7: IMU Sensors (Wireless only) ==========
def _get_imu_value(self, sensor_type: str, index: int) -> float:
"""Get a specific IMU sensor value.
Args:
sensor_type: 'accelerometer', 'gyroscope', or 'temperature'
index: Array index (0=x, 1=y, 2=z) or -1 for scalar values
Returns:
The sensor value, or 0.0 on error
"""
if not self.is_available:
return 0.0
try:
imu_data = self.reachy.imu
if imu_data is None or sensor_type not in imu_data:
return 0.0
value = imu_data[sensor_type]
return float(value[index]) if index >= 0 else float(value)
except Exception as e:
logger.debug(f"Error getting IMU {sensor_type}: {e}")
return 0.0
def get_imu_accel_x(self) -> float:
"""Get IMU X-axis acceleration in m/s²."""
return self._get_imu_value('accelerometer', 0)
def get_imu_accel_y(self) -> float:
"""Get IMU Y-axis acceleration in m/s²."""
return self._get_imu_value('accelerometer', 1)
def get_imu_accel_z(self) -> float:
"""Get IMU Z-axis acceleration in m/s²."""
return self._get_imu_value('accelerometer', 2)
def get_imu_gyro_x(self) -> float:
"""Get IMU X-axis angular velocity in rad/s."""
return self._get_imu_value('gyroscope', 0)
def get_imu_gyro_y(self) -> float:
"""Get IMU Y-axis angular velocity in rad/s."""
return self._get_imu_value('gyroscope', 1)
def get_imu_gyro_z(self) -> float:
"""Get IMU Z-axis angular velocity in rad/s."""
return self._get_imu_value('gyroscope', 2)
def get_imu_temperature(self) -> float:
"""Get IMU temperature in °C."""
return self._get_imu_value('temperature', -1)
# ========== Phase 11: LED Control (DISABLED) ==========
# LED control is disabled because LEDs are hidden inside the robot.
# See PROJECT_PLAN.md principle 8.
def _get_respeaker(self):
"""Get ReSpeaker device from media manager with thread-safe access.
Returns a context manager that holds the lock during ReSpeaker operations.
Usage:
with self._get_respeaker() as respeaker:
if respeaker:
respeaker.read("...")
"""
if not self.is_available:
return _ReSpeakerContext(None, self._respeaker_lock)
try:
if not self.reachy.media or not self.reachy.media.audio:
return _ReSpeakerContext(None, self._respeaker_lock)
respeaker = self.reachy.media.audio._respeaker
return _ReSpeakerContext(respeaker, self._respeaker_lock)
except Exception:
return _ReSpeakerContext(None, self._respeaker_lock)
# ========== Phase 12: Audio Processing (via local SDK with thread-safe access) ==========
def get_agc_enabled(self) -> bool:
"""Get AGC (Automatic Gain Control) enabled status."""
with self._get_respeaker() as respeaker:
if respeaker is None:
return getattr(self, '_agc_enabled', True) # Default to enabled
try:
result = respeaker.read("PP_AGCONOFF")
if result is not None:
self._agc_enabled = bool(result[1])
return self._agc_enabled
except Exception as e:
logger.debug(f"Error getting AGC status: {e}")
return getattr(self, '_agc_enabled', True)
def set_agc_enabled(self, enabled: bool) -> None:
"""Set AGC (Automatic Gain Control) enabled status."""
self._agc_enabled = enabled
with self._get_respeaker() as respeaker:
if respeaker is None:
return
try:
respeaker.write("PP_AGCONOFF", [1 if enabled else 0])
logger.info(f"AGC {'enabled' if enabled else 'disabled'}")
except Exception as e:
logger.error(f"Error setting AGC status: {e}")
def get_agc_max_gain(self) -> float:
"""Get AGC maximum gain in dB (0-40 dB range)."""
with self._get_respeaker() as respeaker:
if respeaker is None:
return getattr(self, '_agc_max_gain', 30.0) # Default to optimized value
try:
result = respeaker.read("PP_AGCMAXGAIN")
if result is not None:
self._agc_max_gain = float(result[0])
return self._agc_max_gain
except Exception as e:
logger.debug(f"Error getting AGC max gain: {e}")
return getattr(self, '_agc_max_gain', 30.0)
def set_agc_max_gain(self, gain: float) -> None:
"""Set AGC maximum gain in dB (0-40 dB range)."""
gain = max(0.0, min(40.0, gain)) # XVF3800 supports up to 40dB
self._agc_max_gain = gain
with self._get_respeaker() as respeaker:
if respeaker is None:
return
try:
respeaker.write("PP_AGCMAXGAIN", [gain])
logger.info(f"AGC max gain set to {gain} dB")
except Exception as e:
logger.error(f"Error setting AGC max gain: {e}")
def get_noise_suppression(self) -> float:
"""Get noise suppression level (0-100%).
PP_MIN_NS represents "minimum signal preservation ratio":
- PP_MIN_NS = 0.85 means "keep at least 85% of signal" = 15% suppression
- PP_MIN_NS = 0.15 means "keep at least 15% of signal" = 85% suppression
We display "noise suppression strength" to user, so:
- suppression_percent = (1.0 - PP_MIN_NS) * 100
"""
with self._get_respeaker() as respeaker:
if respeaker is None:
return getattr(self, '_noise_suppression', 15.0)
try:
result = respeaker.read("PP_MIN_NS")
if result is not None:
raw_value = result[0]
# Convert: PP_MIN_NS=0.85 -> 15% suppression, PP_MIN_NS=0.15 -> 85% suppression
self._noise_suppression = max(0.0, min(100.0, (1.0 - raw_value) * 100.0))
logger.debug(f"Noise suppression: PP_MIN_NS={raw_value:.2f} -> {self._noise_suppression:.1f}%")
return self._noise_suppression
except Exception as e:
logger.debug(f"Error getting noise suppression: {e}")
return getattr(self, '_noise_suppression', 15.0)
def set_noise_suppression(self, level: float) -> None:
"""Set noise suppression level (0-100%)."""
level = max(0.0, min(100.0, level))
self._noise_suppression = level
with self._get_respeaker() as respeaker:
if respeaker is None:
return
try:
# Convert percentage to PP_MIN_NS value (inverted)
value = 1.0 - (level / 100.0)
respeaker.write("PP_MIN_NS", [value])
logger.info(f"Noise suppression set to {level}%")
except Exception as e:
logger.error(f"Error setting noise suppression: {e}")
def get_echo_cancellation_converged(self) -> bool:
"""Check if echo cancellation has converged."""
with self._get_respeaker() as respeaker:
if respeaker is None:
return False
try:
result = respeaker.read("AEC_AECCONVERGED")
if result is not None:
return bool(result[1])
except Exception as e:
logger.debug(f"Error getting AEC converged status: {e}")
return False
# ========== DOA (Direction of Arrival) ==========
def get_doa_angle(self) -> tuple[float, bool] | None:
"""Get Direction of Arrival angle from microphone array.
The DOA angle indicates the direction of the sound source relative to the robot.
Angle is in radians: 0 = left, π/2 = front/back, π = right.
Returns:
Tuple of (angle_radians, speech_detected), or None if unavailable.
- angle_radians: Sound source direction in radians
- speech_detected: Whether speech is currently detected
"""
if not self.is_available:
return None
try:
if self.reachy.media and self.reachy.media.audio:
return self.reachy.media.audio.get_DoA()
except Exception as e:
logger.debug(f"Error getting DOA: {e}")
return None
def get_doa_angle_degrees(self) -> float:
"""Get DOA angle in degrees for Home Assistant entity.
Returns the raw DOA angle in degrees (0-180°).
SDK convention: 0° = left, 90° = front, 180° = right
"""
doa = self.get_doa_angle()
if doa is None:
return 0.0
angle_rad, _ = doa
# Return raw angle in degrees (0-180°)
angle_deg = math.degrees(angle_rad)
return angle_deg
def get_speech_detected(self) -> bool:
"""Get speech detection status from DOA.
Returns True if speech is currently detected.
"""
doa = self.get_doa_angle()
if doa is None:
return False
_, speech_detected = doa
return speech_detected