"""Reachy Mini controller wrapper for ESPHome entities.""" import logging from typing import Optional, TYPE_CHECKING import math import numpy as np from scipy.spatial.transform import Rotation as R import requests if TYPE_CHECKING: from reachy_mini import ReachyMini logger = logging.getLogger(__name__) class ReachyController: """ Wrapper class for Reachy Mini control operations. Provides safe access to Reachy Mini SDK functions with error handling and fallback for standalone mode (when robot is not available). """ def __init__(self, reachy_mini: Optional["ReachyMini"] = None): """ Initialize the controller. Args: reachy_mini: ReachyMini instance, or None for standalone mode """ self.reachy = reachy_mini self._speaker_volume = 100 # Default volume @property def is_available(self) -> bool: """Check if robot is available.""" return self.reachy is not None # ========== Phase 1: Basic Status & Volume ========== def get_daemon_state(self) -> str: """Get daemon state.""" if not self.is_available: return "not_available" try: # client.get_status() returns a dict with 'state' key status = self.reachy.client.get_status(wait=False) return status.get('state', 'unknown') except Exception as e: logger.error(f"Error getting daemon state: {e}") return "error" def get_backend_ready(self) -> bool: """Check if backend is ready.""" if not self.is_available: return False try: # Check if daemon state is 'running' status = self.reachy.client.get_status(wait=False) return status.get('state') == 'running' except Exception as e: logger.error(f"Error getting backend status: {e}") return False def get_error_message(self) -> str: """Get current error message.""" if not self.is_available: return "Robot not available" try: status = self.reachy.client.get_status(wait=False) return status.get('error') or "" except Exception as e: logger.error(f"Error getting error message: {e}") return str(e) def get_speaker_volume(self) -> float: """Get speaker volume (0-100).""" if not self.is_available: return self._speaker_volume try: # Get volume from daemon API status = self.reachy.client.get_status(wait=False) wlan_ip = status.get('wlan_ip', 'localhost') response = requests.get(f"http://{wlan_ip}:8000/api/volume/current", timeout=2) if response.status_code == 200: data = response.json() self._speaker_volume = float(data.get('volume', self._speaker_volume)) except Exception as e: logger.debug(f"Could not get volume from API: {e}") return self._speaker_volume def set_speaker_volume(self, volume: float) -> None: """ Set speaker volume (0-100). Args: volume: Volume level 0-100 """ volume = max(0.0, min(100.0, volume)) self._speaker_volume = volume if not self.is_available: logger.warning("Cannot set volume: robot not available") return try: # Set volume via daemon API status = self.reachy.client.get_status(wait=False) wlan_ip = status.get('wlan_ip', 'localhost') response = requests.post( f"http://{wlan_ip}:8000/api/volume/set", json={"volume": int(volume)}, timeout=5 ) if response.status_code == 200: logger.info(f"Speaker volume set to {volume}%") else: logger.error(f"Failed to set volume: {response.status_code} {response.text}") except Exception as e: logger.error(f"Error setting speaker volume: {e}") def get_microphone_volume(self) -> float: """Get microphone volume (0-100) using local SDK audio interface.""" respeaker = self._get_respeaker() if respeaker is None: return getattr(self, '_microphone_volume', 50.0) # Try different gain parameters (varies by ReSpeaker model) gain_params = [ ("AGCGAIN", 31.0), # AGC target level (0-31) ("MICGAIN", 31.0), # Microphone gain ] for param_name, max_val in gain_params: try: result = respeaker.read(param_name) if result is not None: self._microphone_volume = (result[0] / max_val) * 100.0 logger.debug(f"Read microphone volume: {self._microphone_volume}% ({param_name}: {result[0]})") return self._microphone_volume except Exception as e: logger.debug(f"Could not read {param_name}: {e}") return getattr(self, '_microphone_volume', 50.0) def set_microphone_volume(self, volume: float) -> None: """ Set microphone volume (0-100) using local SDK audio interface. Args: volume: Volume level 0-100 """ volume = max(0.0, min(100.0, volume)) self._microphone_volume = volume respeaker = self._get_respeaker() if respeaker is None: logger.warning("Cannot set microphone volume: ReSpeaker not available") return # Try different gain parameters (varies by ReSpeaker model) gain_params = [ ("AGCGAIN", 31.0), # AGC target level (0-31) ("MICGAIN", 31.0), # Microphone gain ] for param_name, max_val in gain_params: try: gain = int((volume / 100.0) * max_val) respeaker.write(param_name, [gain]) logger.info(f"Microphone volume set to {volume}% ({param_name}: {gain})") return # Success, stop trying other params except Exception as e: logger.debug(f"Could not write {param_name}: {e}") logger.error("Failed to set microphone volume: no supported parameter found") # ========== Phase 2: Motor Control ========== def get_motors_enabled(self) -> bool: """Check if motors are enabled.""" if not self.is_available: return False try: # Get motor control mode from backend status status = self.reachy.client.get_status(wait=False) backend_status = status.get('backend_status') if backend_status and isinstance(backend_status, dict): motor_mode = backend_status.get('motor_control_mode', 'disabled') return motor_mode == 'enabled' return status.get('state') == 'running' except Exception as e: logger.error(f"Error getting motor state: {e}") return False def set_motors_enabled(self, enabled: bool) -> None: """ Enable or disable motors. Args: enabled: True to enable, False to disable """ if not self.is_available: logger.warning("Cannot control motors: robot not available") return try: if enabled: self.reachy.enable_motors() logger.info("Motors enabled") else: self.reachy.disable_motors() logger.info("Motors disabled") except Exception as e: logger.error(f"Error setting motor state: {e}") def get_motor_mode(self) -> str: """Get current motor control mode.""" if not self.is_available: return "disabled" try: # Get motor control mode from backend status status = self.reachy.client.get_status(wait=False) backend_status = status.get('backend_status') if backend_status and isinstance(backend_status, dict): motor_mode = backend_status.get('motor_control_mode', 'disabled') return motor_mode if status.get('state') == 'running': return "enabled" return "disabled" except Exception as e: logger.error(f"Error getting motor mode: {e}") return "error" def set_motor_mode(self, mode: str) -> None: """ Set motor control mode. Args: mode: One of "enabled", "disabled", "gravity_compensation" """ if not self.is_available: logger.warning("Cannot set motor mode: robot not available") return try: if mode == "enabled": self.reachy.enable_motors() elif mode == "disabled": self.reachy.disable_motors() elif mode == "gravity_compensation": self.reachy.enable_gravity_compensation() else: logger.warning(f"Invalid motor mode: {mode}") return logger.info(f"Motor mode set to {mode}") except Exception as e: logger.error(f"Error setting motor mode: {e}") def wake_up(self) -> None: """Execute wake up animation.""" if not self.is_available: logger.warning("Cannot wake up: robot not available") return try: self.reachy.wake_up() logger.info("Wake up animation executed") except Exception as e: logger.error(f"Error executing wake up: {e}") def go_to_sleep(self) -> None: """Execute sleep animation.""" if not self.is_available: logger.warning("Cannot sleep: robot not available") return try: self.reachy.goto_sleep() logger.info("Sleep animation executed") except Exception as e: logger.error(f"Error executing sleep: {e}") # ========== Phase 3: Pose Control ========== def _extract_pose_from_matrix(self, pose_matrix: np.ndarray) -> tuple: """ Extract position (x, y, z) and rotation (roll, pitch, yaw) from 4x4 pose matrix. Args: pose_matrix: 4x4 homogeneous transformation matrix Returns: tuple: (x, y, z, roll, pitch, yaw) where position is in meters and angles in radians """ # Extract position from the last column x = pose_matrix[0, 3] y = pose_matrix[1, 3] z = pose_matrix[2, 3] # Extract rotation matrix and convert to euler angles rotation_matrix = pose_matrix[:3, :3] rotation = R.from_matrix(rotation_matrix) # Use 'xyz' convention for roll, pitch, yaw roll, pitch, yaw = rotation.as_euler('xyz') return x, y, z, roll, pitch, yaw def get_head_x(self) -> float: """Get head X position in mm.""" if not self.is_available: return 0.0 try: pose = self.reachy.get_current_head_pose() x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose) return x * 1000 # Convert m to mm except Exception as e: logger.error(f"Error getting head X: {e}") return 0.0 def set_head_x(self, x_mm: float) -> None: """Set head X position in mm.""" if not self.is_available: return try: pose = self.reachy.get_current_head_pose() # Modify the X position in the matrix new_pose = pose.copy() new_pose[0, 3] = x_mm / 1000 # Convert mm to m self.reachy.goto_target(head=new_pose) except Exception as e: logger.error(f"Error setting head X: {e}") def get_head_y(self) -> float: """Get head Y position in mm.""" if not self.is_available: return 0.0 try: pose = self.reachy.get_current_head_pose() x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose) return y * 1000 except Exception as e: logger.error(f"Error getting head Y: {e}") return 0.0 def set_head_y(self, y_mm: float) -> None: """Set head Y position in mm.""" if not self.is_available: return try: pose = self.reachy.get_current_head_pose() new_pose = pose.copy() new_pose[1, 3] = y_mm / 1000 self.reachy.goto_target(head=new_pose) except Exception as e: logger.error(f"Error setting head Y: {e}") def get_head_z(self) -> float: """Get head Z position in mm.""" if not self.is_available: return 0.0 try: pose = self.reachy.get_current_head_pose() x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose) return z * 1000 except Exception as e: logger.error(f"Error getting head Z: {e}") return 0.0 def set_head_z(self, z_mm: float) -> None: """Set head Z position in mm.""" if not self.is_available: return try: pose = self.reachy.get_current_head_pose() new_pose = pose.copy() new_pose[2, 3] = z_mm / 1000 self.reachy.goto_target(head=new_pose) except Exception as e: logger.error(f"Error setting head Z: {e}") def get_head_roll(self) -> float: """Get head roll angle in degrees.""" if not self.is_available: return 0.0 try: pose = self.reachy.get_current_head_pose() x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose) return math.degrees(roll) except Exception as e: logger.error(f"Error getting head roll: {e}") return 0.0 def set_head_roll(self, roll_deg: float) -> None: """Set head roll angle in degrees.""" if not self.is_available: return try: pose = self.reachy.get_current_head_pose() x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose) # Create new rotation with updated roll new_rotation = R.from_euler('xyz', [math.radians(roll_deg), pitch, yaw]) new_pose = pose.copy() new_pose[:3, :3] = new_rotation.as_matrix() self.reachy.goto_target(head=new_pose) except Exception as e: logger.error(f"Error setting head roll: {e}") def get_head_pitch(self) -> float: """Get head pitch angle in degrees.""" if not self.is_available: return 0.0 try: pose = self.reachy.get_current_head_pose() x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose) return math.degrees(pitch) except Exception as e: logger.error(f"Error getting head pitch: {e}") return 0.0 def set_head_pitch(self, pitch_deg: float) -> None: """Set head pitch angle in degrees.""" if not self.is_available: return try: pose = self.reachy.get_current_head_pose() x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose) new_rotation = R.from_euler('xyz', [roll, math.radians(pitch_deg), yaw]) new_pose = pose.copy() new_pose[:3, :3] = new_rotation.as_matrix() self.reachy.goto_target(head=new_pose) except Exception as e: logger.error(f"Error setting head pitch: {e}") def get_head_yaw(self) -> float: """Get head yaw angle in degrees.""" if not self.is_available: return 0.0 try: pose = self.reachy.get_current_head_pose() x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose) return math.degrees(yaw) except Exception as e: logger.error(f"Error getting head yaw: {e}") return 0.0 def set_head_yaw(self, yaw_deg: float) -> None: """Set head yaw angle in degrees.""" if not self.is_available: return try: pose = self.reachy.get_current_head_pose() x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose) new_rotation = R.from_euler('xyz', [roll, pitch, math.radians(yaw_deg)]) new_pose = pose.copy() new_pose[:3, :3] = new_rotation.as_matrix() self.reachy.goto_target(head=new_pose) except Exception as e: logger.error(f"Error setting head yaw: {e}") def get_body_yaw(self) -> float: """Get body yaw angle in degrees.""" if not self.is_available: return 0.0 try: # Body yaw is the first element of head joint positions head_joints, _ = self.reachy.get_current_joint_positions() return math.degrees(head_joints[0]) except Exception as e: logger.error(f"Error getting body yaw: {e}") return 0.0 def set_body_yaw(self, yaw_deg: float) -> None: """Set body yaw angle in degrees.""" if not self.is_available: return try: self.reachy.goto_target(body_yaw=math.radians(yaw_deg)) except Exception as e: logger.error(f"Error setting body yaw: {e}") def get_antenna_left(self) -> float: """Get left antenna angle in degrees.""" if not self.is_available: return 0.0 try: # get_current_joint_positions() returns (head_joints, antenna_joints) # antenna_joints is [right, left] _, antennas = self.reachy.get_current_joint_positions() return math.degrees(antennas[1]) # left is index 1 except Exception as e: logger.error(f"Error getting left antenna: {e}") return 0.0 def set_antenna_left(self, angle_deg: float) -> None: """Set left antenna angle in degrees.""" if not self.is_available: return try: _, antennas = self.reachy.get_current_joint_positions() right = antennas[0] self.reachy.goto_target(antennas=[right, math.radians(angle_deg)]) except Exception as e: logger.error(f"Error setting left antenna: {e}") def get_antenna_right(self) -> float: """Get right antenna angle in degrees.""" if not self.is_available: return 0.0 try: _, antennas = self.reachy.get_current_joint_positions() return math.degrees(antennas[0]) # right is index 0 except Exception as e: logger.error(f"Error getting right antenna: {e}") return 0.0 def set_antenna_right(self, angle_deg: float) -> None: """Set right antenna angle in degrees.""" if not self.is_available: return try: _, antennas = self.reachy.get_current_joint_positions() left = antennas[1] self.reachy.goto_target(antennas=[math.radians(angle_deg), left]) except Exception as e: logger.error(f"Error setting right antenna: {e}") # ========== Phase 4: Look At Control ========== def get_look_at_x(self) -> float: """Get look at target X coordinate in world frame (meters).""" # This is a target position, not a current state # We'll store it internally return getattr(self, '_look_at_x', 0.0) def set_look_at_x(self, x: float) -> None: """Set look at target X coordinate.""" self._look_at_x = x self._update_look_at() def get_look_at_y(self) -> float: """Get look at target Y coordinate in world frame (meters).""" return getattr(self, '_look_at_y', 0.0) def set_look_at_y(self, y: float) -> None: """Set look at target Y coordinate.""" self._look_at_y = y self._update_look_at() def get_look_at_z(self) -> float: """Get look at target Z coordinate in world frame (meters).""" return getattr(self, '_look_at_z', 0.0) def set_look_at_z(self, z: float) -> None: """Set look at target Z coordinate.""" self._look_at_z = z self._update_look_at() def _update_look_at(self) -> None: """Update robot to look at the target coordinates.""" if not self.is_available: return try: x = getattr(self, '_look_at_x', 0.0) y = getattr(self, '_look_at_y', 0.0) z = getattr(self, '_look_at_z', 0.0) self.reachy.look_at_world(x, y, z) logger.info(f"Looking at world coordinates: ({x}, {y}, {z})") except Exception as e: logger.error(f"Error updating look at: {e}") # ========== Phase 5: Audio Sensors ========== def get_doa_angle(self) -> float: """Get direction of arrival angle in degrees.""" if not self.is_available: return 0.0 try: # Access DOA through media_manager doa_result = self.reachy.media.get_DoA() if doa_result is not None: # Convert radians to degrees return math.degrees(doa_result[0]) return 0.0 except Exception as e: logger.error(f"Error getting DOA angle: {e}") return 0.0 def get_speech_detected(self) -> bool: """Check if speech is detected.""" if not self.is_available: return False try: # Access speech detection through media_manager doa_result = self.reachy.media.get_DoA() if doa_result is not None: return doa_result[1] return False except Exception as e: logger.error(f"Error getting speech detection: {e}") return False # ========== Phase 6: Diagnostic Information ========== def get_control_loop_frequency(self) -> float: """Get control loop frequency in Hz.""" if not self.is_available: return 0.0 try: # Get control loop stats from backend status status = self.reachy.client.get_status(wait=False) backend_status = status.get('backend_status') if backend_status and isinstance(backend_status, dict): control_loop_stats = backend_status.get('control_loop_stats', {}) return control_loop_stats.get('mean_control_loop_frequency', 0.0) return 0.0 except Exception as e: logger.error(f"Error getting control loop frequency: {e}") return 0.0 def get_sdk_version(self) -> str: """Get SDK version.""" if not self.is_available: return "N/A" try: status = self.reachy.client.get_status(wait=False) return status.get('version') or "unknown" except Exception as e: logger.error(f"Error getting SDK version: {e}") return "error" def get_robot_name(self) -> str: """Get robot name.""" if not self.is_available: return "N/A" try: status = self.reachy.client.get_status(wait=False) return status.get('robot_name') or "unknown" except Exception as e: logger.error(f"Error getting robot name: {e}") return "error" def get_wireless_version(self) -> bool: """Check if this is a wireless version.""" if not self.is_available: return False try: status = self.reachy.client.get_status(wait=False) return status.get('wireless_version', False) except Exception as e: logger.error(f"Error getting wireless version: {e}") return False def get_simulation_mode(self) -> bool: """Check if simulation mode is enabled.""" if not self.is_available: return False try: status = self.reachy.client.get_status(wait=False) return status.get('simulation_enabled', False) except Exception as e: logger.error(f"Error getting simulation mode: {e}") return False def get_wlan_ip(self) -> str: """Get WLAN IP address.""" if not self.is_available: return "N/A" try: status = self.reachy.client.get_status(wait=False) return status.get('wlan_ip') or "N/A" except Exception as e: logger.error(f"Error getting WLAN IP: {e}") return "error" # ========== Phase 7: IMU Sensors (Wireless only) ========== def get_imu_accel_x(self) -> float: """Get IMU X-axis acceleration in m/s².""" if not self.is_available: return 0.0 try: imu_data = self.reachy.imu if imu_data is not None and 'accelerometer' in imu_data: return float(imu_data['accelerometer'][0]) return 0.0 except Exception as e: logger.error(f"Error getting IMU accel X: {e}") return 0.0 def get_imu_accel_y(self) -> float: """Get IMU Y-axis acceleration in m/s².""" if not self.is_available: return 0.0 try: imu_data = self.reachy.imu if imu_data is not None and 'accelerometer' in imu_data: return float(imu_data['accelerometer'][1]) return 0.0 except Exception as e: logger.error(f"Error getting IMU accel Y: {e}") return 0.0 def get_imu_accel_z(self) -> float: """Get IMU Z-axis acceleration in m/s².""" if not self.is_available: return 0.0 try: imu_data = self.reachy.imu if imu_data is not None and 'accelerometer' in imu_data: return float(imu_data['accelerometer'][2]) return 0.0 except Exception as e: logger.error(f"Error getting IMU accel Z: {e}") return 0.0 def get_imu_gyro_x(self) -> float: """Get IMU X-axis angular velocity in rad/s.""" if not self.is_available: return 0.0 try: imu_data = self.reachy.imu if imu_data is not None and 'gyroscope' in imu_data: return float(imu_data['gyroscope'][0]) return 0.0 except Exception as e: logger.error(f"Error getting IMU gyro X: {e}") return 0.0 def get_imu_gyro_y(self) -> float: """Get IMU Y-axis angular velocity in rad/s.""" if not self.is_available: return 0.0 try: imu_data = self.reachy.imu if imu_data is not None and 'gyroscope' in imu_data: return float(imu_data['gyroscope'][1]) return 0.0 except Exception as e: logger.error(f"Error getting IMU gyro Y: {e}") return 0.0 def get_imu_gyro_z(self) -> float: """Get IMU Z-axis angular velocity in rad/s.""" if not self.is_available: return 0.0 try: imu_data = self.reachy.imu if imu_data is not None and 'gyroscope' in imu_data: return float(imu_data['gyroscope'][2]) return 0.0 except Exception as e: logger.error(f"Error getting IMU gyro Z: {e}") return 0.0 def get_imu_temperature(self) -> float: """Get IMU temperature in °C.""" if not self.is_available: return 0.0 try: imu_data = self.reachy.imu if imu_data is not None and 'temperature' in imu_data: return float(imu_data['temperature']) return 0.0 except Exception as e: logger.error(f"Error getting IMU temperature: {e}") return 0.0 # ========== Phase 11: LED Control (via local SDK) ========== def _get_respeaker(self): """Get ReSpeaker device from media manager.""" if not self.is_available: logger.debug("ReSpeaker not available: robot not connected") return None try: if not self.reachy.media: logger.debug("ReSpeaker not available: media manager is None") return None if not self.reachy.media.audio: logger.debug("ReSpeaker not available: audio is None") return None respeaker = self.reachy.media.audio._respeaker if respeaker is None: logger.debug("ReSpeaker not available: _respeaker is None (USB device not found)") return respeaker except Exception as e: logger.debug(f"ReSpeaker not available: {e}") return None def get_led_brightness(self) -> float: """Get LED brightness (0-100).""" respeaker = self._get_respeaker() if respeaker is None: return getattr(self, '_led_brightness', 50.0) try: result = respeaker.read("LED_BRIGHTNESS") if result is not None: # LED_BRIGHTNESS is 0-255, convert to 0-100 self._led_brightness = (result[1] / 255.0) * 100.0 return self._led_brightness except Exception as e: logger.debug(f"Error getting LED brightness: {e}") return getattr(self, '_led_brightness', 50.0) def set_led_brightness(self, brightness: float) -> None: """Set LED brightness (0-100).""" brightness = max(0.0, min(100.0, brightness)) self._led_brightness = brightness respeaker = self._get_respeaker() if respeaker is None: return try: # Convert 0-100 to 0-255 value = int((brightness / 100.0) * 255) respeaker.write("LED_BRIGHTNESS", [value]) logger.info(f"LED brightness set to {brightness}%") except Exception as e: logger.error(f"Error setting LED brightness: {e}") def get_led_effect(self) -> str: """Get current LED effect.""" respeaker = self._get_respeaker() if respeaker is None: return getattr(self, '_led_effect', 'off') try: result = respeaker.read("LED_EFFECT") if result is not None: effect_map = {0: 'off', 1: 'solid', 2: 'breathing', 3: 'rainbow', 4: 'doa'} self._led_effect = effect_map.get(result[1], 'off') return self._led_effect except Exception as e: logger.debug(f"Error getting LED effect: {e}") return getattr(self, '_led_effect', 'off') def set_led_effect(self, effect: str) -> None: """Set LED effect.""" self._led_effect = effect respeaker = self._get_respeaker() if respeaker is None: return try: effect_map = {'off': 0, 'solid': 1, 'breathing': 2, 'rainbow': 3, 'doa': 4} value = effect_map.get(effect, 0) respeaker.write("LED_EFFECT", [value]) logger.info(f"LED effect set to {effect}") except Exception as e: logger.error(f"Error setting LED effect: {e}") def get_led_color_r(self) -> float: """Get LED red color component (0-255).""" respeaker = self._get_respeaker() if respeaker is None: return getattr(self, '_led_color_r', 0.0) try: result = respeaker.read("LED_COLOR") if result is not None: # LED_COLOR is a 32-bit value: 0x00RRGGBB color = result[1] if len(result) > 1 else 0 self._led_color_r = float((color >> 16) & 0xFF) return self._led_color_r except Exception as e: logger.debug(f"Error getting LED color R: {e}") return getattr(self, '_led_color_r', 0.0) def set_led_color_r(self, value: float) -> None: """Set LED red color component (0-255).""" self._led_color_r = max(0.0, min(255.0, value)) self._update_led_color() def get_led_color_g(self) -> float: """Get LED green color component (0-255).""" respeaker = self._get_respeaker() if respeaker is None: return getattr(self, '_led_color_g', 0.0) try: result = respeaker.read("LED_COLOR") if result is not None: color = result[1] if len(result) > 1 else 0 self._led_color_g = float((color >> 8) & 0xFF) return self._led_color_g except Exception as e: logger.debug(f"Error getting LED color G: {e}") return getattr(self, '_led_color_g', 0.0) def set_led_color_g(self, value: float) -> None: """Set LED green color component (0-255).""" self._led_color_g = max(0.0, min(255.0, value)) self._update_led_color() def get_led_color_b(self) -> float: """Get LED blue color component (0-255).""" respeaker = self._get_respeaker() if respeaker is None: return getattr(self, '_led_color_b', 0.0) try: result = respeaker.read("LED_COLOR") if result is not None: color = result[1] if len(result) > 1 else 0 self._led_color_b = float(color & 0xFF) return self._led_color_b except Exception as e: logger.debug(f"Error getting LED color B: {e}") return getattr(self, '_led_color_b', 0.0) def set_led_color_b(self, value: float) -> None: """Set LED blue color component (0-255).""" self._led_color_b = max(0.0, min(255.0, value)) self._update_led_color() def _update_led_color(self) -> None: """Update LED color from R, G, B components.""" respeaker = self._get_respeaker() if respeaker is None: return try: r = int(getattr(self, '_led_color_r', 0)) g = int(getattr(self, '_led_color_g', 0)) b = int(getattr(self, '_led_color_b', 0)) color = (r << 16) | (g << 8) | b respeaker.write("LED_COLOR", [color]) logger.info(f"LED color set to RGB({r}, {g}, {b})") except Exception as e: logger.error(f"Error setting LED color: {e}") # ========== Phase 12: Audio Processing (via local SDK) ========== def get_agc_enabled(self) -> bool: """Get AGC (Automatic Gain Control) enabled status.""" respeaker = self._get_respeaker() if respeaker is None: return getattr(self, '_agc_enabled', False) try: result = respeaker.read("PP_AGCONOFF") if result is not None: self._agc_enabled = bool(result[1]) return self._agc_enabled except Exception as e: logger.debug(f"Error getting AGC status: {e}") return getattr(self, '_agc_enabled', False) def set_agc_enabled(self, enabled: bool) -> None: """Set AGC (Automatic Gain Control) enabled status.""" self._agc_enabled = enabled respeaker = self._get_respeaker() if respeaker is None: return try: respeaker.write("PP_AGCONOFF", [1 if enabled else 0]) logger.info(f"AGC {'enabled' if enabled else 'disabled'}") except Exception as e: logger.error(f"Error setting AGC status: {e}") def get_agc_max_gain(self) -> float: """Get AGC maximum gain in dB.""" respeaker = self._get_respeaker() if respeaker is None: return getattr(self, '_agc_max_gain', 15.0) try: result = respeaker.read("PP_AGCMAXGAIN") if result is not None: self._agc_max_gain = float(result[0]) return self._agc_max_gain except Exception as e: logger.debug(f"Error getting AGC max gain: {e}") return getattr(self, '_agc_max_gain', 15.0) def set_agc_max_gain(self, gain: float) -> None: """Set AGC maximum gain in dB.""" gain = max(0.0, min(30.0, gain)) self._agc_max_gain = gain respeaker = self._get_respeaker() if respeaker is None: return try: respeaker.write("PP_AGCMAXGAIN", [gain]) logger.info(f"AGC max gain set to {gain} dB") except Exception as e: logger.error(f"Error setting AGC max gain: {e}") def get_noise_suppression(self) -> float: """Get noise suppression level (0-100%).""" respeaker = self._get_respeaker() if respeaker is None: return getattr(self, '_noise_suppression', 50.0) try: result = respeaker.read("PP_MIN_NS") if result is not None: # PP_MIN_NS is typically a float value, convert to percentage # Lower values = more suppression self._noise_suppression = max(0.0, min(100.0, (1.0 - result[0]) * 100.0)) return self._noise_suppression except Exception as e: logger.debug(f"Error getting noise suppression: {e}") return getattr(self, '_noise_suppression', 50.0) def set_noise_suppression(self, level: float) -> None: """Set noise suppression level (0-100%).""" level = max(0.0, min(100.0, level)) self._noise_suppression = level respeaker = self._get_respeaker() if respeaker is None: return try: # Convert percentage to PP_MIN_NS value (inverted) value = 1.0 - (level / 100.0) respeaker.write("PP_MIN_NS", [value]) logger.info(f"Noise suppression set to {level}%") except Exception as e: logger.error(f"Error setting noise suppression: {e}") def get_echo_cancellation_converged(self) -> bool: """Check if echo cancellation has converged.""" respeaker = self._get_respeaker() if respeaker is None: return False try: result = respeaker.read("AEC_AECCONVERGED") if result is not None: return bool(result[1]) except Exception as e: logger.debug(f"Error getting AEC converged status: {e}") return False # ========== Phase 13: Robot Joints ========== def get_head_joints_json(self) -> str: """ Get head joints as JSON string. Returns: JSON string: "[yaw_body, stewart_1, stewart_2, stewart_3, stewart_4, stewart_5, stewart_6]" Values in radians """ if not self.is_available: return "[]" try: import json head_joints, _ = self.reachy.get_current_joint_positions() if head_joints and len(head_joints) >= 7: # Convert radians to list joints_list = [float(j) for j in head_joints[:7]] return json.dumps(joints_list) return "[]" except Exception as e: logger.error(f"Error getting head joints JSON: {e}") return "[]" def get_passive_joints_json(self) -> str: """ Get passive joints as JSON string. Returns: JSON string: "[passive_1_x, passive_1_y, passive_1_z, ..., passive_7_z]" Values in radians (21 values total) """ if not self.is_available: return "[]" try: import json # Get WLAN IP from daemon status status = self.reachy.client.get_status(wait=False) wlan_ip = status.get('wlan_ip', 'localhost') # Call the backend API to get passive joints backend_url = f"http://{wlan_ip}:8000/api/state/full?with_passive_joints=true" response = requests.get(backend_url, timeout=0.5) if response.status_code == 200: data = response.json() passive_joints = data.get("passive_joints") if passive_joints and len(passive_joints) >= 21: return json.dumps(passive_joints) return "[]" except Exception as e: logger.error(f"Error getting passive joints JSON: {e}") return "[]"