| """Reachy Mini controller wrapper for ESPHome entities."""
|
|
|
| import logging
|
| import time
|
| from typing import Any, Dict, Optional, TYPE_CHECKING
|
| import math
|
| import numpy as np
|
| from scipy.spatial.transform import Rotation as R
|
| import requests
|
|
|
| if TYPE_CHECKING:
|
| from reachy_mini import ReachyMini
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class _ReSpeakerContext:
|
| """Context manager for thread-safe ReSpeaker access."""
|
|
|
| def __init__(self, respeaker, lock):
|
| self._respeaker = respeaker
|
| self._lock = lock
|
|
|
| def __enter__(self):
|
| self._lock.acquire()
|
| return self._respeaker
|
|
|
| def __exit__(self, exc_type, exc_val, exc_tb):
|
| self._lock.release()
|
| return False
|
|
|
|
|
| class ReachyController:
|
| """
|
| Wrapper class for Reachy Mini control operations.
|
|
|
| Provides safe access to Reachy Mini SDK functions with error handling
|
| and fallback for standalone mode (when robot is not available).
|
| """
|
|
|
| def __init__(self, reachy_mini: Optional["ReachyMini"] = None):
|
| """
|
| Initialize the controller.
|
|
|
| Args:
|
| reachy_mini: ReachyMini instance, or None for standalone mode
|
| """
|
| self.reachy = reachy_mini
|
| self._speaker_volume = 100
|
| self._movement_manager = None
|
|
|
|
|
|
|
|
|
| self._state_cache: Dict[str, Any] = {}
|
| self._cache_ttl = 2.0
|
| self._last_status_query = 0.0
|
|
|
|
|
| self._respeaker_lock = __import__('threading').Lock()
|
|
|
| def set_movement_manager(self, movement_manager) -> None:
|
| """Set the MovementManager instance for pose control.
|
|
|
| Args:
|
| movement_manager: MovementManager instance
|
| """
|
| self._movement_manager = movement_manager
|
| logger.info("MovementManager set for ReachyController")
|
|
|
| @property
|
| def is_available(self) -> bool:
|
| """Check if robot is available."""
|
| return self.reachy is not None
|
|
|
|
|
|
|
| def _get_cached_status(self) -> Optional[Dict]:
|
| """Get cached daemon status to reduce query frequency.
|
|
|
| Note: get_status() may trigger I/O, so we cache it.
|
| Unlike get_current_head_pose() and get_current_joint_positions()
|
| which are non-blocking in the SDK.
|
| """
|
| now = time.time()
|
| if now - self._last_status_query < self._cache_ttl:
|
| return self._state_cache.get('status')
|
|
|
| if not self.is_available:
|
| return None
|
|
|
| try:
|
| status = self.reachy.client.get_status(wait=False)
|
| self._state_cache['status'] = status
|
| self._last_status_query = now
|
| return status
|
| except Exception as e:
|
| logger.error(f"Error getting status: {e}")
|
| return self._state_cache.get('status')
|
|
|
| def get_daemon_state(self) -> str:
|
| """Get daemon state with caching."""
|
| status = self._get_cached_status()
|
| if status is None:
|
| return "not_available"
|
| return status.get('state', 'unknown')
|
|
|
| def get_backend_ready(self) -> bool:
|
| """Check if backend is ready with caching."""
|
| status = self._get_cached_status()
|
| if status is None:
|
| return False
|
| return status.get('state') == 'running'
|
|
|
| def get_error_message(self) -> str:
|
| """Get current error message with caching."""
|
| status = self._get_cached_status()
|
| if status is None:
|
| return "Robot not available"
|
| return status.get('error') or ""
|
|
|
| def get_speaker_volume(self) -> float:
|
| """Get speaker volume (0-100) with caching."""
|
| if not self.is_available:
|
| return self._speaker_volume
|
| try:
|
|
|
| status = self._get_cached_status()
|
| if status is None:
|
| return self._speaker_volume
|
| wlan_ip = status.get('wlan_ip', 'localhost')
|
| response = requests.get(f"http://{wlan_ip}:8000/api/volume/current", timeout=2)
|
| if response.status_code == 200:
|
| data = response.json()
|
| self._speaker_volume = float(data.get('volume', self._speaker_volume))
|
| except Exception as e:
|
| logger.debug(f"Could not get volume from API: {e}")
|
| return self._speaker_volume
|
|
|
| def set_speaker_volume(self, volume: float) -> None:
|
| """
|
| Set speaker volume (0-100) with cached status.
|
|
|
| Args:
|
| volume: Volume level 0-100
|
| """
|
| volume = max(0.0, min(100.0, volume))
|
| self._speaker_volume = volume
|
|
|
| if not self.is_available:
|
| logger.warning("Cannot set volume: robot not available")
|
| return
|
|
|
| try:
|
|
|
| status = self._get_cached_status()
|
| if status is None:
|
| logger.error("Cannot get daemon status for volume control")
|
| return
|
| wlan_ip = status.get('wlan_ip', 'localhost')
|
| response = requests.post(
|
| f"http://{wlan_ip}:8000/api/volume/set",
|
| json={"volume": int(volume)},
|
| timeout=5
|
| )
|
| if response.status_code == 200:
|
| logger.info(f"Speaker volume set to {volume}%")
|
| else:
|
| logger.error(f"Failed to set volume: {response.status_code} {response.text}")
|
| except Exception as e:
|
| logger.error(f"Error setting speaker volume: {e}")
|
|
|
| def get_microphone_volume(self) -> float:
|
| """Get microphone volume (0-100) using daemon HTTP API."""
|
| if not self.is_available:
|
| return getattr(self, '_microphone_volume', 50.0)
|
|
|
| try:
|
|
|
| status = self._get_cached_status()
|
| if status is None:
|
| return getattr(self, '_microphone_volume', 50.0)
|
| wlan_ip = status.get('wlan_ip', 'localhost')
|
|
|
|
|
| response = requests.get(
|
| f"http://{wlan_ip}:8000/api/volume/microphone/current",
|
| timeout=2
|
| )
|
| if response.status_code == 200:
|
| data = response.json()
|
| self._microphone_volume = float(data.get('volume', 50))
|
| return self._microphone_volume
|
| except Exception as e:
|
| logger.debug(f"Could not get microphone volume from API: {e}")
|
|
|
| return getattr(self, '_microphone_volume', 50.0)
|
|
|
| def set_microphone_volume(self, volume: float) -> None:
|
| """
|
| Set microphone volume (0-100) using daemon HTTP API.
|
|
|
| Args:
|
| volume: Volume level 0-100
|
| """
|
| volume = max(0.0, min(100.0, volume))
|
| self._microphone_volume = volume
|
|
|
| if not self.is_available:
|
| logger.warning("Cannot set microphone volume: robot not available")
|
| return
|
|
|
| try:
|
|
|
| status = self._get_cached_status()
|
| if status is None:
|
| logger.error("Cannot get daemon status for microphone volume control")
|
| return
|
| wlan_ip = status.get('wlan_ip', 'localhost')
|
|
|
|
|
| response = requests.post(
|
| f"http://{wlan_ip}:8000/api/volume/microphone/set",
|
| json={"volume": int(volume)},
|
| timeout=5
|
| )
|
| if response.status_code == 200:
|
| logger.info(f"Microphone volume set to {volume}%")
|
| else:
|
| logger.error(f"Failed to set microphone volume: {response.status_code} {response.text}")
|
| except Exception as e:
|
| logger.error(f"Error setting microphone volume: {e}")
|
|
|
|
|
|
|
| def get_motors_enabled(self) -> bool:
|
| """Check if motors are enabled with caching."""
|
| status = self._get_cached_status()
|
| if status is None:
|
| return False
|
| try:
|
| backend_status = status.get('backend_status')
|
| if backend_status and isinstance(backend_status, dict):
|
| motor_mode = backend_status.get('motor_control_mode', 'disabled')
|
| return motor_mode == 'enabled'
|
| return status.get('state') == 'running'
|
| except Exception as e:
|
| logger.error(f"Error getting motor state: {e}")
|
| return False
|
|
|
| def set_motors_enabled(self, enabled: bool) -> None:
|
| """
|
| Enable or disable motors.
|
|
|
| Args:
|
| enabled: True to enable, False to disable
|
| """
|
| if not self.is_available:
|
| logger.warning("Cannot control motors: robot not available")
|
| return
|
|
|
| try:
|
| if enabled:
|
| self.reachy.enable_motors()
|
| logger.info("Motors enabled")
|
| else:
|
| self.reachy.disable_motors()
|
| logger.info("Motors disabled")
|
| except Exception as e:
|
| logger.error(f"Error setting motor state: {e}")
|
|
|
| def get_motor_mode(self) -> str:
|
| """Get current motor control mode with caching."""
|
| status = self._get_cached_status()
|
| if status is None:
|
| return "disabled"
|
| try:
|
| backend_status = status.get('backend_status')
|
| if backend_status and isinstance(backend_status, dict):
|
| motor_mode = backend_status.get('motor_control_mode', 'disabled')
|
| return motor_mode
|
| if status.get('state') == 'running':
|
| return "enabled"
|
| return "disabled"
|
| except Exception as e:
|
| logger.error(f"Error getting motor mode: {e}")
|
| return "error"
|
|
|
| def set_motor_mode(self, mode: str) -> None:
|
| """
|
| Set motor control mode.
|
|
|
| Args:
|
| mode: One of "enabled", "disabled", "gravity_compensation"
|
| """
|
| if not self.is_available:
|
| logger.warning("Cannot set motor mode: robot not available")
|
| return
|
|
|
| try:
|
| if mode == "enabled":
|
| self.reachy.enable_motors()
|
| elif mode == "disabled":
|
| self.reachy.disable_motors()
|
| elif mode == "gravity_compensation":
|
| self.reachy.enable_gravity_compensation()
|
| else:
|
| logger.warning(f"Invalid motor mode: {mode}")
|
| return
|
| logger.info(f"Motor mode set to {mode}")
|
| except Exception as e:
|
| logger.error(f"Error setting motor mode: {e}")
|
|
|
| def wake_up(self) -> None:
|
| """Execute wake up animation."""
|
| if not self.is_available:
|
| logger.warning("Cannot wake up: robot not available")
|
| return
|
|
|
| try:
|
| self.reachy.wake_up()
|
| logger.info("Wake up animation executed")
|
| except Exception as e:
|
| logger.error(f"Error executing wake up: {e}")
|
|
|
| def go_to_sleep(self) -> None:
|
| """Execute sleep animation."""
|
| if not self.is_available:
|
| logger.warning("Cannot sleep: robot not available")
|
| return
|
|
|
| try:
|
| self.reachy.goto_sleep()
|
| logger.info("Sleep animation executed")
|
| except Exception as e:
|
| logger.error(f"Error executing sleep: {e}")
|
|
|
|
|
|
|
| def _get_head_pose(self) -> Optional[np.ndarray]:
|
| """Get current head pose from SDK.
|
|
|
| Note: SDK's get_current_head_pose() is non-blocking - it returns
|
| cached data from Zenoh subscriptions, so no throttling needed.
|
| """
|
| if not self.is_available:
|
| return None
|
|
|
| try:
|
| return self.reachy.get_current_head_pose()
|
| except Exception as e:
|
| logger.error(f"Error getting head pose: {e}")
|
| return None
|
|
|
| def _get_joint_positions(self) -> Optional[tuple]:
|
| """Get current joint positions from SDK.
|
|
|
| Note: SDK's get_current_joint_positions() is non-blocking - it returns
|
| cached data from Zenoh subscriptions, so no throttling needed.
|
| """
|
| if not self.is_available:
|
| return None
|
|
|
| try:
|
| return self.reachy.get_current_joint_positions()
|
| except Exception as e:
|
| logger.error(f"Error getting joint positions: {e}")
|
| return None
|
|
|
| def _extract_pose_from_matrix(self, pose_matrix: np.ndarray) -> tuple:
|
| """
|
| Extract position (x, y, z) and rotation (roll, pitch, yaw) from 4x4 pose matrix.
|
|
|
| Args:
|
| pose_matrix: 4x4 homogeneous transformation matrix
|
|
|
| Returns:
|
| tuple: (x, y, z, roll, pitch, yaw) where position is in meters and angles in radians
|
| """
|
|
|
| x = pose_matrix[0, 3]
|
| y = pose_matrix[1, 3]
|
| z = pose_matrix[2, 3]
|
|
|
|
|
| rotation_matrix = pose_matrix[:3, :3]
|
| rotation = R.from_matrix(rotation_matrix)
|
|
|
| roll, pitch, yaw = rotation.as_euler('xyz')
|
|
|
| return x, y, z, roll, pitch, yaw
|
|
|
| def _get_head_pose_component(self, component: str) -> float:
|
| """Get a specific component from head pose.
|
|
|
| Args:
|
| component: One of 'x', 'y', 'z' (mm), 'roll', 'pitch', 'yaw' (degrees)
|
|
|
| Returns:
|
| The component value, or 0.0 on error
|
| """
|
| pose = self._get_head_pose()
|
| if pose is None:
|
| return 0.0
|
| try:
|
| x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose)
|
| components = {
|
| 'x': x * 1000,
|
| 'y': y * 1000,
|
| 'z': z * 1000,
|
| 'roll': math.degrees(roll),
|
| 'pitch': math.degrees(pitch),
|
| 'yaw': math.degrees(yaw),
|
| }
|
| return components.get(component, 0.0)
|
| except Exception as e:
|
| logger.error(f"Error getting head {component}: {e}")
|
| return 0.0
|
|
|
| def _disabled_pose_setter(self, name: str) -> None:
|
| """Log warning when MovementManager is not available."""
|
| logger.warning(f"set_{name} failed - MovementManager not set")
|
|
|
| def _set_pose_via_manager(self, **kwargs) -> bool:
|
| """Set pose via MovementManager if available.
|
|
|
| Returns True if successful, False if MovementManager not available.
|
| """
|
| if self._movement_manager is None:
|
| return False
|
| self._movement_manager.set_target_pose(**kwargs)
|
| return True
|
|
|
|
|
| def get_head_x(self) -> float:
|
| """Get head X position in mm."""
|
| return self._get_head_pose_component('x')
|
|
|
| def set_head_x(self, x_mm: float) -> None:
|
| """Set head X position in mm via MovementManager."""
|
| if not self._set_pose_via_manager(x=x_mm / 1000.0):
|
| self._disabled_pose_setter('head_x')
|
|
|
| def get_head_y(self) -> float:
|
| """Get head Y position in mm."""
|
| return self._get_head_pose_component('y')
|
|
|
| def set_head_y(self, y_mm: float) -> None:
|
| """Set head Y position in mm via MovementManager."""
|
| if not self._set_pose_via_manager(y=y_mm / 1000.0):
|
| self._disabled_pose_setter('head_y')
|
|
|
| def get_head_z(self) -> float:
|
| """Get head Z position in mm."""
|
| return self._get_head_pose_component('z')
|
|
|
| def set_head_z(self, z_mm: float) -> None:
|
| """Set head Z position in mm via MovementManager."""
|
| if not self._set_pose_via_manager(z=z_mm / 1000.0):
|
| self._disabled_pose_setter('head_z')
|
|
|
|
|
| def get_head_roll(self) -> float:
|
| """Get head roll angle in degrees."""
|
| return self._get_head_pose_component('roll')
|
|
|
| def set_head_roll(self, roll_deg: float) -> None:
|
| """Set head roll angle in degrees via MovementManager."""
|
| if not self._set_pose_via_manager(roll=math.radians(roll_deg)):
|
| self._disabled_pose_setter('head_roll')
|
|
|
| def get_head_pitch(self) -> float:
|
| """Get head pitch angle in degrees."""
|
| return self._get_head_pose_component('pitch')
|
|
|
| def set_head_pitch(self, pitch_deg: float) -> None:
|
| """Set head pitch angle in degrees via MovementManager."""
|
| if not self._set_pose_via_manager(pitch=math.radians(pitch_deg)):
|
| self._disabled_pose_setter('head_pitch')
|
|
|
| def get_head_yaw(self) -> float:
|
| """Get head yaw angle in degrees."""
|
| return self._get_head_pose_component('yaw')
|
|
|
| def set_head_yaw(self, yaw_deg: float) -> None:
|
| """Set head yaw angle in degrees via MovementManager."""
|
| if not self._set_pose_via_manager(yaw=math.radians(yaw_deg)):
|
| self._disabled_pose_setter('head_yaw')
|
|
|
| def get_body_yaw(self) -> float:
|
| """Get body yaw angle in degrees."""
|
| joints = self._get_joint_positions()
|
| if joints is None:
|
| return 0.0
|
| try:
|
| head_joints, _ = joints
|
| return math.degrees(head_joints[0])
|
| except Exception as e:
|
| logger.error(f"Error getting body yaw: {e}")
|
| return 0.0
|
|
|
| def set_body_yaw(self, yaw_deg: float) -> None:
|
| """Set body yaw angle in degrees via MovementManager."""
|
| if not self._set_pose_via_manager(body_yaw=math.radians(yaw_deg)):
|
| self._disabled_pose_setter('body_yaw')
|
|
|
| def get_antenna_left(self) -> float:
|
| """Get left antenna angle in degrees."""
|
| joints = self._get_joint_positions()
|
| if joints is None:
|
| return 0.0
|
| try:
|
| _, antennas = joints
|
| return math.degrees(antennas[1])
|
| except Exception as e:
|
| logger.error(f"Error getting left antenna: {e}")
|
| return 0.0
|
|
|
| def set_antenna_left(self, angle_deg: float) -> None:
|
| """Set left antenna angle in degrees via MovementManager."""
|
| if not self._set_pose_via_manager(antenna_left=math.radians(angle_deg)):
|
| self._disabled_pose_setter('antenna_left')
|
|
|
| def get_antenna_right(self) -> float:
|
| """Get right antenna angle in degrees."""
|
| joints = self._get_joint_positions()
|
| if joints is None:
|
| return 0.0
|
| try:
|
| _, antennas = joints
|
| return math.degrees(antennas[0])
|
| except Exception as e:
|
| logger.error(f"Error getting right antenna: {e}")
|
| return 0.0
|
|
|
| def set_antenna_right(self, angle_deg: float) -> None:
|
| """Set right antenna angle in degrees via MovementManager."""
|
| if not self._set_pose_via_manager(antenna_right=math.radians(angle_deg)):
|
| self._disabled_pose_setter('antenna_right')
|
|
|
|
|
|
|
| def get_look_at_x(self) -> float:
|
| """Get look at target X coordinate in world frame (meters)."""
|
|
|
|
|
| return getattr(self, '_look_at_x', 0.0)
|
|
|
| def set_look_at_x(self, x: float) -> None:
|
| """Set look at target X coordinate."""
|
| self._look_at_x = x
|
| self._update_look_at()
|
|
|
| def get_look_at_y(self) -> float:
|
| """Get look at target Y coordinate in world frame (meters)."""
|
| return getattr(self, '_look_at_y', 0.0)
|
|
|
| def set_look_at_y(self, y: float) -> None:
|
| """Set look at target Y coordinate."""
|
| self._look_at_y = y
|
| self._update_look_at()
|
|
|
| def get_look_at_z(self) -> float:
|
| """Get look at target Z coordinate in world frame (meters)."""
|
| return getattr(self, '_look_at_z', 0.0)
|
|
|
| def set_look_at_z(self, z: float) -> None:
|
| """Set look at target Z coordinate."""
|
| self._look_at_z = z
|
| self._update_look_at()
|
|
|
| def _update_look_at(self) -> None:
|
| """Update robot to look at the target coordinates.
|
|
|
| NOTE: Disabled to prevent conflict with MovementManager's control loop.
|
| """
|
| logger.warning("_update_look_at is disabled - MovementManager controls head pose")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def get_control_loop_frequency(self) -> float:
|
| """Get control loop frequency in Hz with caching."""
|
| status = self._get_cached_status()
|
| if status is None:
|
| return 0.0
|
| try:
|
| backend_status = status.get('backend_status')
|
| if backend_status and isinstance(backend_status, dict):
|
| control_loop_stats = backend_status.get('control_loop_stats', {})
|
| return control_loop_stats.get('mean_control_loop_frequency', 0.0)
|
| return 0.0
|
| except Exception as e:
|
| logger.error(f"Error getting control loop frequency: {e}")
|
| return 0.0
|
|
|
| def get_sdk_version(self) -> str:
|
| """Get SDK version with caching."""
|
| status = self._get_cached_status()
|
| if status is None:
|
| return "N/A"
|
| return status.get('version') or "unknown"
|
|
|
| def get_robot_name(self) -> str:
|
| """Get robot name with caching."""
|
| status = self._get_cached_status()
|
| if status is None:
|
| return "N/A"
|
| return status.get('robot_name') or "unknown"
|
|
|
| def get_wireless_version(self) -> bool:
|
| """Check if this is a wireless version with caching."""
|
| status = self._get_cached_status()
|
| if status is None:
|
| return False
|
| return status.get('wireless_version', False)
|
|
|
| def get_simulation_mode(self) -> bool:
|
| """Check if simulation mode is enabled with caching."""
|
| status = self._get_cached_status()
|
| if status is None:
|
| return False
|
| return status.get('simulation_enabled', False)
|
|
|
| def get_wlan_ip(self) -> str:
|
| """Get WLAN IP address with caching."""
|
| status = self._get_cached_status()
|
| if status is None:
|
| return "N/A"
|
| return status.get('wlan_ip') or "N/A"
|
|
|
|
|
|
|
| def _get_imu_value(self, sensor_type: str, index: int) -> float:
|
| """Get a specific IMU sensor value.
|
|
|
| Args:
|
| sensor_type: 'accelerometer', 'gyroscope', or 'temperature'
|
| index: Array index (0=x, 1=y, 2=z) or -1 for scalar values
|
|
|
| Returns:
|
| The sensor value, or 0.0 on error
|
| """
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
| imu_data = self.reachy.imu
|
| if imu_data is None or sensor_type not in imu_data:
|
| return 0.0
|
| value = imu_data[sensor_type]
|
| return float(value[index]) if index >= 0 else float(value)
|
| except Exception as e:
|
| logger.debug(f"Error getting IMU {sensor_type}: {e}")
|
| return 0.0
|
|
|
| def get_imu_accel_x(self) -> float:
|
| """Get IMU X-axis acceleration in m/s²."""
|
| return self._get_imu_value('accelerometer', 0)
|
|
|
| def get_imu_accel_y(self) -> float:
|
| """Get IMU Y-axis acceleration in m/s²."""
|
| return self._get_imu_value('accelerometer', 1)
|
|
|
| def get_imu_accel_z(self) -> float:
|
| """Get IMU Z-axis acceleration in m/s²."""
|
| return self._get_imu_value('accelerometer', 2)
|
|
|
| def get_imu_gyro_x(self) -> float:
|
| """Get IMU X-axis angular velocity in rad/s."""
|
| return self._get_imu_value('gyroscope', 0)
|
|
|
| def get_imu_gyro_y(self) -> float:
|
| """Get IMU Y-axis angular velocity in rad/s."""
|
| return self._get_imu_value('gyroscope', 1)
|
|
|
| def get_imu_gyro_z(self) -> float:
|
| """Get IMU Z-axis angular velocity in rad/s."""
|
| return self._get_imu_value('gyroscope', 2)
|
|
|
| def get_imu_temperature(self) -> float:
|
| """Get IMU temperature in °C."""
|
| return self._get_imu_value('temperature', -1)
|
|
|
|
|
|
|
|
|
|
|
| def _get_respeaker(self):
|
| """Get ReSpeaker device from media manager with thread-safe access.
|
|
|
| Returns a context manager that holds the lock during ReSpeaker operations.
|
| Usage:
|
| with self._get_respeaker() as respeaker:
|
| if respeaker:
|
| respeaker.read("...")
|
| """
|
| if not self.is_available:
|
| return _ReSpeakerContext(None, self._respeaker_lock)
|
| try:
|
| if not self.reachy.media or not self.reachy.media.audio:
|
| return _ReSpeakerContext(None, self._respeaker_lock)
|
| respeaker = self.reachy.media.audio._respeaker
|
| return _ReSpeakerContext(respeaker, self._respeaker_lock)
|
| except Exception:
|
| return _ReSpeakerContext(None, self._respeaker_lock)
|
|
|
|
|
|
|
| def get_agc_enabled(self) -> bool:
|
| """Get AGC (Automatic Gain Control) enabled status."""
|
| with self._get_respeaker() as respeaker:
|
| if respeaker is None:
|
| return getattr(self, '_agc_enabled', True)
|
| try:
|
| result = respeaker.read("PP_AGCONOFF")
|
| if result is not None:
|
| self._agc_enabled = bool(result[1])
|
| return self._agc_enabled
|
| except Exception as e:
|
| logger.debug(f"Error getting AGC status: {e}")
|
| return getattr(self, '_agc_enabled', True)
|
|
|
| def set_agc_enabled(self, enabled: bool) -> None:
|
| """Set AGC (Automatic Gain Control) enabled status."""
|
| self._agc_enabled = enabled
|
| with self._get_respeaker() as respeaker:
|
| if respeaker is None:
|
| return
|
| try:
|
| respeaker.write("PP_AGCONOFF", [1 if enabled else 0])
|
| logger.info(f"AGC {'enabled' if enabled else 'disabled'}")
|
| except Exception as e:
|
| logger.error(f"Error setting AGC status: {e}")
|
|
|
| def get_agc_max_gain(self) -> float:
|
| """Get AGC maximum gain in dB (0-40 dB range)."""
|
| with self._get_respeaker() as respeaker:
|
| if respeaker is None:
|
| return getattr(self, '_agc_max_gain', 30.0)
|
| try:
|
| result = respeaker.read("PP_AGCMAXGAIN")
|
| if result is not None:
|
| self._agc_max_gain = float(result[0])
|
| return self._agc_max_gain
|
| except Exception as e:
|
| logger.debug(f"Error getting AGC max gain: {e}")
|
| return getattr(self, '_agc_max_gain', 30.0)
|
|
|
| def set_agc_max_gain(self, gain: float) -> None:
|
| """Set AGC maximum gain in dB (0-40 dB range)."""
|
| gain = max(0.0, min(40.0, gain))
|
| self._agc_max_gain = gain
|
| with self._get_respeaker() as respeaker:
|
| if respeaker is None:
|
| return
|
| try:
|
| respeaker.write("PP_AGCMAXGAIN", [gain])
|
| logger.info(f"AGC max gain set to {gain} dB")
|
| except Exception as e:
|
| logger.error(f"Error setting AGC max gain: {e}")
|
|
|
| def get_noise_suppression(self) -> float:
|
| """Get noise suppression level (0-100%).
|
|
|
| PP_MIN_NS represents "minimum signal preservation ratio":
|
| - PP_MIN_NS = 0.85 means "keep at least 85% of signal" = 15% suppression
|
| - PP_MIN_NS = 0.15 means "keep at least 15% of signal" = 85% suppression
|
|
|
| We display "noise suppression strength" to user, so:
|
| - suppression_percent = (1.0 - PP_MIN_NS) * 100
|
| """
|
| with self._get_respeaker() as respeaker:
|
| if respeaker is None:
|
| return getattr(self, '_noise_suppression', 15.0)
|
| try:
|
| result = respeaker.read("PP_MIN_NS")
|
| if result is not None:
|
| raw_value = result[0]
|
|
|
| self._noise_suppression = max(0.0, min(100.0, (1.0 - raw_value) * 100.0))
|
| logger.debug(f"Noise suppression: PP_MIN_NS={raw_value:.2f} -> {self._noise_suppression:.1f}%")
|
| return self._noise_suppression
|
| except Exception as e:
|
| logger.debug(f"Error getting noise suppression: {e}")
|
| return getattr(self, '_noise_suppression', 15.0)
|
|
|
| def set_noise_suppression(self, level: float) -> None:
|
| """Set noise suppression level (0-100%)."""
|
| level = max(0.0, min(100.0, level))
|
| self._noise_suppression = level
|
| with self._get_respeaker() as respeaker:
|
| if respeaker is None:
|
| return
|
| try:
|
|
|
| value = 1.0 - (level / 100.0)
|
| respeaker.write("PP_MIN_NS", [value])
|
| logger.info(f"Noise suppression set to {level}%")
|
| except Exception as e:
|
| logger.error(f"Error setting noise suppression: {e}")
|
|
|
| def get_echo_cancellation_converged(self) -> bool:
|
| """Check if echo cancellation has converged."""
|
| with self._get_respeaker() as respeaker:
|
| if respeaker is None:
|
| return False
|
| try:
|
| result = respeaker.read("AEC_AECCONVERGED")
|
| if result is not None:
|
| return bool(result[1])
|
| except Exception as e:
|
| logger.debug(f"Error getting AEC converged status: {e}")
|
| return False
|
|
|
|
|
|
|
| def get_doa_angle(self) -> tuple[float, bool] | None:
|
| """Get Direction of Arrival angle from microphone array.
|
|
|
| The DOA angle indicates the direction of the sound source relative to the robot.
|
| Angle is in radians: 0 = left, π/2 = front/back, π = right.
|
|
|
| Returns:
|
| Tuple of (angle_radians, speech_detected), or None if unavailable.
|
| - angle_radians: Sound source direction in radians
|
| - speech_detected: Whether speech is currently detected
|
| """
|
| if not self.is_available:
|
| return None
|
| try:
|
| if self.reachy.media and self.reachy.media.audio:
|
| return self.reachy.media.audio.get_DoA()
|
| except Exception as e:
|
| logger.debug(f"Error getting DOA: {e}")
|
| return None
|
|
|
| def get_doa_angle_degrees(self) -> float:
|
| """Get DOA angle in degrees for Home Assistant entity.
|
|
|
| Returns the raw DOA angle in degrees (0-180°).
|
| SDK convention: 0° = left, 90° = front, 180° = right
|
| """
|
| doa = self.get_doa_angle()
|
| if doa is None:
|
| return 0.0
|
| angle_rad, _ = doa
|
|
|
| angle_deg = math.degrees(angle_rad)
|
| return angle_deg
|
|
|
| def get_speech_detected(self) -> bool:
|
| """Get speech detection status from DOA.
|
|
|
| Returns True if speech is currently detected.
|
| """
|
| doa = self.get_doa_angle()
|
| if doa is None:
|
| return False
|
| _, speech_detected = doa
|
| return speech_detected
|
|
|