| """Reachy Mini controller wrapper for ESPHome entities."""
|
|
|
| import logging
|
| from typing import Optional, TYPE_CHECKING
|
| import math
|
| import numpy as np
|
| from scipy.spatial.transform import Rotation as R
|
| import requests
|
|
|
| if TYPE_CHECKING:
|
| from reachy_mini import ReachyMini
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class ReachyController:
|
| """
|
| Wrapper class for Reachy Mini control operations.
|
|
|
| Provides safe access to Reachy Mini SDK functions with error handling
|
| and fallback for standalone mode (when robot is not available).
|
| """
|
|
|
| def __init__(self, reachy_mini: Optional["ReachyMini"] = None):
|
| """
|
| Initialize the controller.
|
|
|
| Args:
|
| reachy_mini: ReachyMini instance, or None for standalone mode
|
| """
|
| self.reachy = reachy_mini
|
| self._speaker_volume = 100
|
|
|
| @property
|
| def is_available(self) -> bool:
|
| """Check if robot is available."""
|
| return self.reachy is not None
|
|
|
|
|
|
|
| def get_daemon_state(self) -> str:
|
| """Get daemon state."""
|
| if not self.is_available:
|
| return "not_available"
|
| try:
|
|
|
| status = self.reachy.client.get_status(wait=False)
|
| return status.get('state', 'unknown')
|
| except Exception as e:
|
| logger.error(f"Error getting daemon state: {e}")
|
| return "error"
|
|
|
| def get_backend_ready(self) -> bool:
|
| """Check if backend is ready."""
|
| if not self.is_available:
|
| return False
|
| try:
|
|
|
| status = self.reachy.client.get_status(wait=False)
|
| return status.get('state') == 'running'
|
| except Exception as e:
|
| logger.error(f"Error getting backend status: {e}")
|
| return False
|
|
|
| def get_error_message(self) -> str:
|
| """Get current error message."""
|
| if not self.is_available:
|
| return "Robot not available"
|
| try:
|
| status = self.reachy.client.get_status(wait=False)
|
| return status.get('error') or ""
|
| except Exception as e:
|
| logger.error(f"Error getting error message: {e}")
|
| return str(e)
|
|
|
| def get_speaker_volume(self) -> float:
|
| """Get speaker volume (0-100)."""
|
| if not self.is_available:
|
| return self._speaker_volume
|
| try:
|
|
|
| status = self.reachy.client.get_status(wait=False)
|
| wlan_ip = status.get('wlan_ip', 'localhost')
|
| response = requests.get(f"http://{wlan_ip}:8000/api/volume/current", timeout=2)
|
| if response.status_code == 200:
|
| data = response.json()
|
| self._speaker_volume = float(data.get('volume', self._speaker_volume))
|
| except Exception as e:
|
| logger.debug(f"Could not get volume from API: {e}")
|
| return self._speaker_volume
|
|
|
| def set_speaker_volume(self, volume: float) -> None:
|
| """
|
| Set speaker volume (0-100).
|
|
|
| Args:
|
| volume: Volume level 0-100
|
| """
|
| volume = max(0.0, min(100.0, volume))
|
| self._speaker_volume = volume
|
|
|
| if not self.is_available:
|
| logger.warning("Cannot set volume: robot not available")
|
| return
|
|
|
| try:
|
|
|
| status = self.reachy.client.get_status(wait=False)
|
| wlan_ip = status.get('wlan_ip', 'localhost')
|
| response = requests.post(
|
| f"http://{wlan_ip}:8000/api/volume/set",
|
| json={"volume": int(volume)},
|
| timeout=5
|
| )
|
| if response.status_code == 200:
|
| logger.info(f"Speaker volume set to {volume}%")
|
| else:
|
| logger.error(f"Failed to set volume: {response.status_code} {response.text}")
|
| except Exception as e:
|
| logger.error(f"Error setting speaker volume: {e}")
|
|
|
| def get_microphone_volume(self) -> float:
|
| """Get microphone volume (0-100) using local SDK audio interface."""
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| return getattr(self, '_microphone_volume', 50.0)
|
|
|
|
|
| gain_params = [
|
| ("AGCGAIN", 31.0),
|
| ("MICGAIN", 31.0),
|
| ]
|
|
|
| for param_name, max_val in gain_params:
|
| try:
|
| result = respeaker.read(param_name)
|
| if result is not None:
|
| self._microphone_volume = (result[0] / max_val) * 100.0
|
| logger.debug(f"Read microphone volume: {self._microphone_volume}% ({param_name}: {result[0]})")
|
| return self._microphone_volume
|
| except Exception as e:
|
| logger.debug(f"Could not read {param_name}: {e}")
|
|
|
| return getattr(self, '_microphone_volume', 50.0)
|
|
|
| def set_microphone_volume(self, volume: float) -> None:
|
| """
|
| Set microphone volume (0-100) using local SDK audio interface.
|
|
|
| Args:
|
| volume: Volume level 0-100
|
| """
|
| volume = max(0.0, min(100.0, volume))
|
| self._microphone_volume = volume
|
|
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| logger.warning("Cannot set microphone volume: ReSpeaker not available")
|
| return
|
|
|
|
|
| gain_params = [
|
| ("AGCGAIN", 31.0),
|
| ("MICGAIN", 31.0),
|
| ]
|
|
|
| for param_name, max_val in gain_params:
|
| try:
|
| gain = int((volume / 100.0) * max_val)
|
| respeaker.write(param_name, [gain])
|
| logger.info(f"Microphone volume set to {volume}% ({param_name}: {gain})")
|
| return
|
| except Exception as e:
|
| logger.debug(f"Could not write {param_name}: {e}")
|
|
|
| logger.error("Failed to set microphone volume: no supported parameter found")
|
|
|
|
|
|
|
| def get_motors_enabled(self) -> bool:
|
| """Check if motors are enabled."""
|
| if not self.is_available:
|
| return False
|
| try:
|
|
|
| status = self.reachy.client.get_status(wait=False)
|
| backend_status = status.get('backend_status')
|
| if backend_status and isinstance(backend_status, dict):
|
| motor_mode = backend_status.get('motor_control_mode', 'disabled')
|
| return motor_mode == 'enabled'
|
| return status.get('state') == 'running'
|
| except Exception as e:
|
| logger.error(f"Error getting motor state: {e}")
|
| return False
|
|
|
| def set_motors_enabled(self, enabled: bool) -> None:
|
| """
|
| Enable or disable motors.
|
|
|
| Args:
|
| enabled: True to enable, False to disable
|
| """
|
| if not self.is_available:
|
| logger.warning("Cannot control motors: robot not available")
|
| return
|
|
|
| try:
|
| if enabled:
|
| self.reachy.enable_motors()
|
| logger.info("Motors enabled")
|
| else:
|
| self.reachy.disable_motors()
|
| logger.info("Motors disabled")
|
| except Exception as e:
|
| logger.error(f"Error setting motor state: {e}")
|
|
|
| def get_motor_mode(self) -> str:
|
| """Get current motor control mode."""
|
| if not self.is_available:
|
| return "disabled"
|
| try:
|
|
|
| status = self.reachy.client.get_status(wait=False)
|
| backend_status = status.get('backend_status')
|
| if backend_status and isinstance(backend_status, dict):
|
| motor_mode = backend_status.get('motor_control_mode', 'disabled')
|
| return motor_mode
|
| if status.get('state') == 'running':
|
| return "enabled"
|
| return "disabled"
|
| except Exception as e:
|
| logger.error(f"Error getting motor mode: {e}")
|
| return "error"
|
|
|
| def set_motor_mode(self, mode: str) -> None:
|
| """
|
| Set motor control mode.
|
|
|
| Args:
|
| mode: One of "enabled", "disabled", "gravity_compensation"
|
| """
|
| if not self.is_available:
|
| logger.warning("Cannot set motor mode: robot not available")
|
| return
|
|
|
| try:
|
| if mode == "enabled":
|
| self.reachy.enable_motors()
|
| elif mode == "disabled":
|
| self.reachy.disable_motors()
|
| elif mode == "gravity_compensation":
|
| self.reachy.enable_gravity_compensation()
|
| else:
|
| logger.warning(f"Invalid motor mode: {mode}")
|
| return
|
| logger.info(f"Motor mode set to {mode}")
|
| except Exception as e:
|
| logger.error(f"Error setting motor mode: {e}")
|
|
|
| def wake_up(self) -> None:
|
| """Execute wake up animation."""
|
| if not self.is_available:
|
| logger.warning("Cannot wake up: robot not available")
|
| return
|
|
|
| try:
|
| self.reachy.wake_up()
|
| logger.info("Wake up animation executed")
|
| except Exception as e:
|
| logger.error(f"Error executing wake up: {e}")
|
|
|
| def go_to_sleep(self) -> None:
|
| """Execute sleep animation."""
|
| if not self.is_available:
|
| logger.warning("Cannot sleep: robot not available")
|
| return
|
|
|
| try:
|
| self.reachy.goto_sleep()
|
| logger.info("Sleep animation executed")
|
| except Exception as e:
|
| logger.error(f"Error executing sleep: {e}")
|
|
|
|
|
|
|
| def _extract_pose_from_matrix(self, pose_matrix: np.ndarray) -> tuple:
|
| """
|
| Extract position (x, y, z) and rotation (roll, pitch, yaw) from 4x4 pose matrix.
|
|
|
| Args:
|
| pose_matrix: 4x4 homogeneous transformation matrix
|
|
|
| Returns:
|
| tuple: (x, y, z, roll, pitch, yaw) where position is in meters and angles in radians
|
| """
|
|
|
| x = pose_matrix[0, 3]
|
| y = pose_matrix[1, 3]
|
| z = pose_matrix[2, 3]
|
|
|
|
|
| rotation_matrix = pose_matrix[:3, :3]
|
| rotation = R.from_matrix(rotation_matrix)
|
|
|
| roll, pitch, yaw = rotation.as_euler('xyz')
|
|
|
| return x, y, z, roll, pitch, yaw
|
|
|
| def get_head_x(self) -> float:
|
| """Get head X position in mm."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
| pose = self.reachy.get_current_head_pose()
|
| x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose)
|
| return x * 1000
|
| except Exception as e:
|
| logger.error(f"Error getting head X: {e}")
|
| return 0.0
|
|
|
| def set_head_x(self, x_mm: float) -> None:
|
| """Set head X position in mm."""
|
| if not self.is_available:
|
| return
|
| try:
|
| pose = self.reachy.get_current_head_pose()
|
|
|
| new_pose = pose.copy()
|
| new_pose[0, 3] = x_mm / 1000
|
| self.reachy.goto_target(head=new_pose)
|
| except Exception as e:
|
| logger.error(f"Error setting head X: {e}")
|
|
|
| def get_head_y(self) -> float:
|
| """Get head Y position in mm."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
| pose = self.reachy.get_current_head_pose()
|
| x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose)
|
| return y * 1000
|
| except Exception as e:
|
| logger.error(f"Error getting head Y: {e}")
|
| return 0.0
|
|
|
| def set_head_y(self, y_mm: float) -> None:
|
| """Set head Y position in mm."""
|
| if not self.is_available:
|
| return
|
| try:
|
| pose = self.reachy.get_current_head_pose()
|
| new_pose = pose.copy()
|
| new_pose[1, 3] = y_mm / 1000
|
| self.reachy.goto_target(head=new_pose)
|
| except Exception as e:
|
| logger.error(f"Error setting head Y: {e}")
|
|
|
| def get_head_z(self) -> float:
|
| """Get head Z position in mm."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
| pose = self.reachy.get_current_head_pose()
|
| x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose)
|
| return z * 1000
|
| except Exception as e:
|
| logger.error(f"Error getting head Z: {e}")
|
| return 0.0
|
|
|
| def set_head_z(self, z_mm: float) -> None:
|
| """Set head Z position in mm."""
|
| if not self.is_available:
|
| return
|
| try:
|
| pose = self.reachy.get_current_head_pose()
|
| new_pose = pose.copy()
|
| new_pose[2, 3] = z_mm / 1000
|
| self.reachy.goto_target(head=new_pose)
|
| except Exception as e:
|
| logger.error(f"Error setting head Z: {e}")
|
|
|
| def get_head_roll(self) -> float:
|
| """Get head roll angle in degrees."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
| pose = self.reachy.get_current_head_pose()
|
| x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose)
|
| return math.degrees(roll)
|
| except Exception as e:
|
| logger.error(f"Error getting head roll: {e}")
|
| return 0.0
|
|
|
| def set_head_roll(self, roll_deg: float) -> None:
|
| """Set head roll angle in degrees."""
|
| if not self.is_available:
|
| return
|
| try:
|
| pose = self.reachy.get_current_head_pose()
|
| x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose)
|
|
|
| new_rotation = R.from_euler('xyz', [math.radians(roll_deg), pitch, yaw])
|
| new_pose = pose.copy()
|
| new_pose[:3, :3] = new_rotation.as_matrix()
|
| self.reachy.goto_target(head=new_pose)
|
| except Exception as e:
|
| logger.error(f"Error setting head roll: {e}")
|
|
|
| def get_head_pitch(self) -> float:
|
| """Get head pitch angle in degrees."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
| pose = self.reachy.get_current_head_pose()
|
| x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose)
|
| return math.degrees(pitch)
|
| except Exception as e:
|
| logger.error(f"Error getting head pitch: {e}")
|
| return 0.0
|
|
|
| def set_head_pitch(self, pitch_deg: float) -> None:
|
| """Set head pitch angle in degrees."""
|
| if not self.is_available:
|
| return
|
| try:
|
| pose = self.reachy.get_current_head_pose()
|
| x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose)
|
| new_rotation = R.from_euler('xyz', [roll, math.radians(pitch_deg), yaw])
|
| new_pose = pose.copy()
|
| new_pose[:3, :3] = new_rotation.as_matrix()
|
| self.reachy.goto_target(head=new_pose)
|
| except Exception as e:
|
| logger.error(f"Error setting head pitch: {e}")
|
|
|
| def get_head_yaw(self) -> float:
|
| """Get head yaw angle in degrees."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
| pose = self.reachy.get_current_head_pose()
|
| x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose)
|
| return math.degrees(yaw)
|
| except Exception as e:
|
| logger.error(f"Error getting head yaw: {e}")
|
| return 0.0
|
|
|
| def set_head_yaw(self, yaw_deg: float) -> None:
|
| """Set head yaw angle in degrees."""
|
| if not self.is_available:
|
| return
|
| try:
|
| pose = self.reachy.get_current_head_pose()
|
| x, y, z, roll, pitch, yaw = self._extract_pose_from_matrix(pose)
|
| new_rotation = R.from_euler('xyz', [roll, pitch, math.radians(yaw_deg)])
|
| new_pose = pose.copy()
|
| new_pose[:3, :3] = new_rotation.as_matrix()
|
| self.reachy.goto_target(head=new_pose)
|
| except Exception as e:
|
| logger.error(f"Error setting head yaw: {e}")
|
|
|
| def get_body_yaw(self) -> float:
|
| """Get body yaw angle in degrees."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
|
|
| head_joints, _ = self.reachy.get_current_joint_positions()
|
| return math.degrees(head_joints[0])
|
| except Exception as e:
|
| logger.error(f"Error getting body yaw: {e}")
|
| return 0.0
|
|
|
| def set_body_yaw(self, yaw_deg: float) -> None:
|
| """Set body yaw angle in degrees."""
|
| if not self.is_available:
|
| return
|
| try:
|
| self.reachy.goto_target(body_yaw=math.radians(yaw_deg))
|
| except Exception as e:
|
| logger.error(f"Error setting body yaw: {e}")
|
|
|
| def get_antenna_left(self) -> float:
|
| """Get left antenna angle in degrees."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
|
|
|
|
| _, antennas = self.reachy.get_current_joint_positions()
|
| return math.degrees(antennas[1])
|
| except Exception as e:
|
| logger.error(f"Error getting left antenna: {e}")
|
| return 0.0
|
|
|
| def set_antenna_left(self, angle_deg: float) -> None:
|
| """Set left antenna angle in degrees."""
|
| if not self.is_available:
|
| return
|
| try:
|
| _, antennas = self.reachy.get_current_joint_positions()
|
| right = antennas[0]
|
| self.reachy.goto_target(antennas=[right, math.radians(angle_deg)])
|
| except Exception as e:
|
| logger.error(f"Error setting left antenna: {e}")
|
|
|
| def get_antenna_right(self) -> float:
|
| """Get right antenna angle in degrees."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
| _, antennas = self.reachy.get_current_joint_positions()
|
| return math.degrees(antennas[0])
|
| except Exception as e:
|
| logger.error(f"Error getting right antenna: {e}")
|
| return 0.0
|
|
|
| def set_antenna_right(self, angle_deg: float) -> None:
|
| """Set right antenna angle in degrees."""
|
| if not self.is_available:
|
| return
|
| try:
|
| _, antennas = self.reachy.get_current_joint_positions()
|
| left = antennas[1]
|
| self.reachy.goto_target(antennas=[math.radians(angle_deg), left])
|
| except Exception as e:
|
| logger.error(f"Error setting right antenna: {e}")
|
|
|
|
|
|
|
| def get_look_at_x(self) -> float:
|
| """Get look at target X coordinate in world frame (meters)."""
|
|
|
|
|
| return getattr(self, '_look_at_x', 0.0)
|
|
|
| def set_look_at_x(self, x: float) -> None:
|
| """Set look at target X coordinate."""
|
| self._look_at_x = x
|
| self._update_look_at()
|
|
|
| def get_look_at_y(self) -> float:
|
| """Get look at target Y coordinate in world frame (meters)."""
|
| return getattr(self, '_look_at_y', 0.0)
|
|
|
| def set_look_at_y(self, y: float) -> None:
|
| """Set look at target Y coordinate."""
|
| self._look_at_y = y
|
| self._update_look_at()
|
|
|
| def get_look_at_z(self) -> float:
|
| """Get look at target Z coordinate in world frame (meters)."""
|
| return getattr(self, '_look_at_z', 0.0)
|
|
|
| def set_look_at_z(self, z: float) -> None:
|
| """Set look at target Z coordinate."""
|
| self._look_at_z = z
|
| self._update_look_at()
|
|
|
| def _update_look_at(self) -> None:
|
| """Update robot to look at the target coordinates."""
|
| if not self.is_available:
|
| return
|
| try:
|
| x = getattr(self, '_look_at_x', 0.0)
|
| y = getattr(self, '_look_at_y', 0.0)
|
| z = getattr(self, '_look_at_z', 0.0)
|
| self.reachy.look_at_world(x, y, z)
|
| logger.info(f"Looking at world coordinates: ({x}, {y}, {z})")
|
| except Exception as e:
|
| logger.error(f"Error updating look at: {e}")
|
|
|
|
|
|
|
| def get_doa_angle(self) -> float:
|
| """Get direction of arrival angle in degrees."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
|
|
| doa_result = self.reachy.media.get_DoA()
|
| if doa_result is not None:
|
|
|
| return math.degrees(doa_result[0])
|
| return 0.0
|
| except Exception as e:
|
| logger.error(f"Error getting DOA angle: {e}")
|
| return 0.0
|
|
|
| def get_speech_detected(self) -> bool:
|
| """Check if speech is detected."""
|
| if not self.is_available:
|
| return False
|
| try:
|
|
|
| doa_result = self.reachy.media.get_DoA()
|
| if doa_result is not None:
|
| return doa_result[1]
|
| return False
|
| except Exception as e:
|
| logger.error(f"Error getting speech detection: {e}")
|
| return False
|
|
|
|
|
|
|
| def get_control_loop_frequency(self) -> float:
|
| """Get control loop frequency in Hz."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
|
|
| status = self.reachy.client.get_status(wait=False)
|
| backend_status = status.get('backend_status')
|
| if backend_status and isinstance(backend_status, dict):
|
| control_loop_stats = backend_status.get('control_loop_stats', {})
|
| return control_loop_stats.get('mean_control_loop_frequency', 0.0)
|
| return 0.0
|
| except Exception as e:
|
| logger.error(f"Error getting control loop frequency: {e}")
|
| return 0.0
|
|
|
| def get_sdk_version(self) -> str:
|
| """Get SDK version."""
|
| if not self.is_available:
|
| return "N/A"
|
| try:
|
| status = self.reachy.client.get_status(wait=False)
|
| return status.get('version') or "unknown"
|
| except Exception as e:
|
| logger.error(f"Error getting SDK version: {e}")
|
| return "error"
|
|
|
| def get_robot_name(self) -> str:
|
| """Get robot name."""
|
| if not self.is_available:
|
| return "N/A"
|
| try:
|
| status = self.reachy.client.get_status(wait=False)
|
| return status.get('robot_name') or "unknown"
|
| except Exception as e:
|
| logger.error(f"Error getting robot name: {e}")
|
| return "error"
|
|
|
| def get_wireless_version(self) -> bool:
|
| """Check if this is a wireless version."""
|
| if not self.is_available:
|
| return False
|
| try:
|
| status = self.reachy.client.get_status(wait=False)
|
| return status.get('wireless_version', False)
|
| except Exception as e:
|
| logger.error(f"Error getting wireless version: {e}")
|
| return False
|
|
|
| def get_simulation_mode(self) -> bool:
|
| """Check if simulation mode is enabled."""
|
| if not self.is_available:
|
| return False
|
| try:
|
| status = self.reachy.client.get_status(wait=False)
|
| return status.get('simulation_enabled', False)
|
| except Exception as e:
|
| logger.error(f"Error getting simulation mode: {e}")
|
| return False
|
|
|
| def get_wlan_ip(self) -> str:
|
| """Get WLAN IP address."""
|
| if not self.is_available:
|
| return "N/A"
|
| try:
|
| status = self.reachy.client.get_status(wait=False)
|
| return status.get('wlan_ip') or "N/A"
|
| except Exception as e:
|
| logger.error(f"Error getting WLAN IP: {e}")
|
| return "error"
|
|
|
|
|
|
|
| def get_imu_accel_x(self) -> float:
|
| """Get IMU X-axis acceleration in m/s²."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
| imu_data = self.reachy.imu
|
| if imu_data is not None and 'accelerometer' in imu_data:
|
| return float(imu_data['accelerometer'][0])
|
| return 0.0
|
| except Exception as e:
|
| logger.error(f"Error getting IMU accel X: {e}")
|
| return 0.0
|
|
|
| def get_imu_accel_y(self) -> float:
|
| """Get IMU Y-axis acceleration in m/s²."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
| imu_data = self.reachy.imu
|
| if imu_data is not None and 'accelerometer' in imu_data:
|
| return float(imu_data['accelerometer'][1])
|
| return 0.0
|
| except Exception as e:
|
| logger.error(f"Error getting IMU accel Y: {e}")
|
| return 0.0
|
|
|
| def get_imu_accel_z(self) -> float:
|
| """Get IMU Z-axis acceleration in m/s²."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
| imu_data = self.reachy.imu
|
| if imu_data is not None and 'accelerometer' in imu_data:
|
| return float(imu_data['accelerometer'][2])
|
| return 0.0
|
| except Exception as e:
|
| logger.error(f"Error getting IMU accel Z: {e}")
|
| return 0.0
|
|
|
| def get_imu_gyro_x(self) -> float:
|
| """Get IMU X-axis angular velocity in rad/s."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
| imu_data = self.reachy.imu
|
| if imu_data is not None and 'gyroscope' in imu_data:
|
| return float(imu_data['gyroscope'][0])
|
| return 0.0
|
| except Exception as e:
|
| logger.error(f"Error getting IMU gyro X: {e}")
|
| return 0.0
|
|
|
| def get_imu_gyro_y(self) -> float:
|
| """Get IMU Y-axis angular velocity in rad/s."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
| imu_data = self.reachy.imu
|
| if imu_data is not None and 'gyroscope' in imu_data:
|
| return float(imu_data['gyroscope'][1])
|
| return 0.0
|
| except Exception as e:
|
| logger.error(f"Error getting IMU gyro Y: {e}")
|
| return 0.0
|
|
|
| def get_imu_gyro_z(self) -> float:
|
| """Get IMU Z-axis angular velocity in rad/s."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
| imu_data = self.reachy.imu
|
| if imu_data is not None and 'gyroscope' in imu_data:
|
| return float(imu_data['gyroscope'][2])
|
| return 0.0
|
| except Exception as e:
|
| logger.error(f"Error getting IMU gyro Z: {e}")
|
| return 0.0
|
|
|
| def get_imu_temperature(self) -> float:
|
| """Get IMU temperature in °C."""
|
| if not self.is_available:
|
| return 0.0
|
| try:
|
| imu_data = self.reachy.imu
|
| if imu_data is not None and 'temperature' in imu_data:
|
| return float(imu_data['temperature'])
|
| return 0.0
|
| except Exception as e:
|
| logger.error(f"Error getting IMU temperature: {e}")
|
| return 0.0
|
|
|
|
|
|
|
| def _get_respeaker(self):
|
| """Get ReSpeaker device from media manager."""
|
| if not self.is_available:
|
| logger.debug("ReSpeaker not available: robot not connected")
|
| return None
|
| try:
|
| if not self.reachy.media:
|
| logger.debug("ReSpeaker not available: media manager is None")
|
| return None
|
| if not self.reachy.media.audio:
|
| logger.debug("ReSpeaker not available: audio is None")
|
| return None
|
| respeaker = self.reachy.media.audio._respeaker
|
| if respeaker is None:
|
| logger.debug("ReSpeaker not available: _respeaker is None (USB device not found)")
|
| return respeaker
|
| except Exception as e:
|
| logger.debug(f"ReSpeaker not available: {e}")
|
| return None
|
|
|
| def get_led_brightness(self) -> float:
|
| """Get LED brightness (0-100)."""
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| return getattr(self, '_led_brightness', 50.0)
|
| try:
|
| result = respeaker.read("LED_BRIGHTNESS")
|
| if result is not None:
|
|
|
| self._led_brightness = (result[1] / 255.0) * 100.0
|
| return self._led_brightness
|
| except Exception as e:
|
| logger.debug(f"Error getting LED brightness: {e}")
|
| return getattr(self, '_led_brightness', 50.0)
|
|
|
| def set_led_brightness(self, brightness: float) -> None:
|
| """Set LED brightness (0-100)."""
|
| brightness = max(0.0, min(100.0, brightness))
|
| self._led_brightness = brightness
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| return
|
| try:
|
|
|
| value = int((brightness / 100.0) * 255)
|
| respeaker.write("LED_BRIGHTNESS", [value])
|
| logger.info(f"LED brightness set to {brightness}%")
|
| except Exception as e:
|
| logger.error(f"Error setting LED brightness: {e}")
|
|
|
| def get_led_effect(self) -> str:
|
| """Get current LED effect."""
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| return getattr(self, '_led_effect', 'off')
|
| try:
|
| result = respeaker.read("LED_EFFECT")
|
| if result is not None:
|
| effect_map = {0: 'off', 1: 'solid', 2: 'breathing', 3: 'rainbow', 4: 'doa'}
|
| self._led_effect = effect_map.get(result[1], 'off')
|
| return self._led_effect
|
| except Exception as e:
|
| logger.debug(f"Error getting LED effect: {e}")
|
| return getattr(self, '_led_effect', 'off')
|
|
|
| def set_led_effect(self, effect: str) -> None:
|
| """Set LED effect."""
|
| self._led_effect = effect
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| return
|
| try:
|
| effect_map = {'off': 0, 'solid': 1, 'breathing': 2, 'rainbow': 3, 'doa': 4}
|
| value = effect_map.get(effect, 0)
|
| respeaker.write("LED_EFFECT", [value])
|
| logger.info(f"LED effect set to {effect}")
|
| except Exception as e:
|
| logger.error(f"Error setting LED effect: {e}")
|
|
|
| def get_led_color_r(self) -> float:
|
| """Get LED red color component (0-255)."""
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| return getattr(self, '_led_color_r', 0.0)
|
| try:
|
| result = respeaker.read("LED_COLOR")
|
| if result is not None:
|
|
|
| color = result[1] if len(result) > 1 else 0
|
| self._led_color_r = float((color >> 16) & 0xFF)
|
| return self._led_color_r
|
| except Exception as e:
|
| logger.debug(f"Error getting LED color R: {e}")
|
| return getattr(self, '_led_color_r', 0.0)
|
|
|
| def set_led_color_r(self, value: float) -> None:
|
| """Set LED red color component (0-255)."""
|
| self._led_color_r = max(0.0, min(255.0, value))
|
| self._update_led_color()
|
|
|
| def get_led_color_g(self) -> float:
|
| """Get LED green color component (0-255)."""
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| return getattr(self, '_led_color_g', 0.0)
|
| try:
|
| result = respeaker.read("LED_COLOR")
|
| if result is not None:
|
| color = result[1] if len(result) > 1 else 0
|
| self._led_color_g = float((color >> 8) & 0xFF)
|
| return self._led_color_g
|
| except Exception as e:
|
| logger.debug(f"Error getting LED color G: {e}")
|
| return getattr(self, '_led_color_g', 0.0)
|
|
|
| def set_led_color_g(self, value: float) -> None:
|
| """Set LED green color component (0-255)."""
|
| self._led_color_g = max(0.0, min(255.0, value))
|
| self._update_led_color()
|
|
|
| def get_led_color_b(self) -> float:
|
| """Get LED blue color component (0-255)."""
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| return getattr(self, '_led_color_b', 0.0)
|
| try:
|
| result = respeaker.read("LED_COLOR")
|
| if result is not None:
|
| color = result[1] if len(result) > 1 else 0
|
| self._led_color_b = float(color & 0xFF)
|
| return self._led_color_b
|
| except Exception as e:
|
| logger.debug(f"Error getting LED color B: {e}")
|
| return getattr(self, '_led_color_b', 0.0)
|
|
|
| def set_led_color_b(self, value: float) -> None:
|
| """Set LED blue color component (0-255)."""
|
| self._led_color_b = max(0.0, min(255.0, value))
|
| self._update_led_color()
|
|
|
| def _update_led_color(self) -> None:
|
| """Update LED color from R, G, B components."""
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| return
|
| try:
|
| r = int(getattr(self, '_led_color_r', 0))
|
| g = int(getattr(self, '_led_color_g', 0))
|
| b = int(getattr(self, '_led_color_b', 0))
|
| color = (r << 16) | (g << 8) | b
|
| respeaker.write("LED_COLOR", [color])
|
| logger.info(f"LED color set to RGB({r}, {g}, {b})")
|
| except Exception as e:
|
| logger.error(f"Error setting LED color: {e}")
|
|
|
|
|
|
|
| def get_agc_enabled(self) -> bool:
|
| """Get AGC (Automatic Gain Control) enabled status."""
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| return getattr(self, '_agc_enabled', False)
|
| try:
|
| result = respeaker.read("PP_AGCONOFF")
|
| if result is not None:
|
| self._agc_enabled = bool(result[1])
|
| return self._agc_enabled
|
| except Exception as e:
|
| logger.debug(f"Error getting AGC status: {e}")
|
| return getattr(self, '_agc_enabled', False)
|
|
|
| def set_agc_enabled(self, enabled: bool) -> None:
|
| """Set AGC (Automatic Gain Control) enabled status."""
|
| self._agc_enabled = enabled
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| return
|
| try:
|
| respeaker.write("PP_AGCONOFF", [1 if enabled else 0])
|
| logger.info(f"AGC {'enabled' if enabled else 'disabled'}")
|
| except Exception as e:
|
| logger.error(f"Error setting AGC status: {e}")
|
|
|
| def get_agc_max_gain(self) -> float:
|
| """Get AGC maximum gain in dB."""
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| return getattr(self, '_agc_max_gain', 15.0)
|
| try:
|
| result = respeaker.read("PP_AGCMAXGAIN")
|
| if result is not None:
|
| self._agc_max_gain = float(result[0])
|
| return self._agc_max_gain
|
| except Exception as e:
|
| logger.debug(f"Error getting AGC max gain: {e}")
|
| return getattr(self, '_agc_max_gain', 15.0)
|
|
|
| def set_agc_max_gain(self, gain: float) -> None:
|
| """Set AGC maximum gain in dB."""
|
| gain = max(0.0, min(30.0, gain))
|
| self._agc_max_gain = gain
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| return
|
| try:
|
| respeaker.write("PP_AGCMAXGAIN", [gain])
|
| logger.info(f"AGC max gain set to {gain} dB")
|
| except Exception as e:
|
| logger.error(f"Error setting AGC max gain: {e}")
|
|
|
| def get_noise_suppression(self) -> float:
|
| """Get noise suppression level (0-100%)."""
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| return getattr(self, '_noise_suppression', 50.0)
|
| try:
|
| result = respeaker.read("PP_MIN_NS")
|
| if result is not None:
|
|
|
|
|
| self._noise_suppression = max(0.0, min(100.0, (1.0 - result[0]) * 100.0))
|
| return self._noise_suppression
|
| except Exception as e:
|
| logger.debug(f"Error getting noise suppression: {e}")
|
| return getattr(self, '_noise_suppression', 50.0)
|
|
|
| def set_noise_suppression(self, level: float) -> None:
|
| """Set noise suppression level (0-100%)."""
|
| level = max(0.0, min(100.0, level))
|
| self._noise_suppression = level
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| return
|
| try:
|
|
|
| value = 1.0 - (level / 100.0)
|
| respeaker.write("PP_MIN_NS", [value])
|
| logger.info(f"Noise suppression set to {level}%")
|
| except Exception as e:
|
| logger.error(f"Error setting noise suppression: {e}")
|
|
|
| def get_echo_cancellation_converged(self) -> bool:
|
| """Check if echo cancellation has converged."""
|
| respeaker = self._get_respeaker()
|
| if respeaker is None:
|
| return False
|
| try:
|
| result = respeaker.read("AEC_AECCONVERGED")
|
| if result is not None:
|
| return bool(result[1])
|
| except Exception as e:
|
| logger.debug(f"Error getting AEC converged status: {e}")
|
| return False
|
|
|
|
|
|
|
| def get_head_joints_json(self) -> str:
|
| """
|
| Get head joints as JSON string.
|
|
|
| Returns:
|
| JSON string: "[yaw_body, stewart_1, stewart_2, stewart_3, stewart_4, stewart_5, stewart_6]"
|
| Values in radians
|
| """
|
| if not self.is_available:
|
| return "[]"
|
| try:
|
| import json
|
| head_joints, _ = self.reachy.get_current_joint_positions()
|
| if head_joints and len(head_joints) >= 7:
|
|
|
| joints_list = [float(j) for j in head_joints[:7]]
|
| return json.dumps(joints_list)
|
| return "[]"
|
| except Exception as e:
|
| logger.error(f"Error getting head joints JSON: {e}")
|
| return "[]"
|
|
|
| def get_passive_joints_json(self) -> str:
|
| """
|
| Get passive joints as JSON string.
|
|
|
| Returns:
|
| JSON string: "[passive_1_x, passive_1_y, passive_1_z, ..., passive_7_z]"
|
| Values in radians (21 values total)
|
| """
|
| if not self.is_available:
|
| return "[]"
|
| try:
|
| import json
|
|
|
| status = self.reachy.client.get_status(wait=False)
|
| wlan_ip = status.get('wlan_ip', 'localhost')
|
|
|
| backend_url = f"http://{wlan_ip}:8000/api/state/full?with_passive_joints=true"
|
| response = requests.get(backend_url, timeout=0.5)
|
| if response.status_code == 200:
|
| data = response.json()
|
| passive_joints = data.get("passive_joints")
|
| if passive_joints and len(passive_joints) >= 21:
|
| return json.dumps(passive_joints)
|
| return "[]"
|
| except Exception as e:
|
| logger.error(f"Error getting passive joints JSON: {e}")
|
| return "[]"
|
|
|