Desmond-Dong's picture
"refactor-use-reachy-mini-native-audio"
1562b35
raw
history blame
5.57 kB
"""Audio player using Reachy Mini's media system."""
import logging
import threading
import time
from collections.abc import Callable
from pathlib import Path
from typing import List, Optional, Union
import numpy as np
import scipy.signal
_LOGGER = logging.getLogger(__name__)
class AudioPlayer:
"""Audio player using Reachy Mini's media system."""
def __init__(self, reachy_mini=None) -> None:
self.reachy_mini = reachy_mini
self.is_playing = False
self._playlist: List[str] = []
self._done_callback: Optional[Callable[[], None]] = None
self._done_callback_lock = threading.Lock()
self._duck_volume: float = 0.5
self._unduck_volume: float = 1.0
self._current_volume: float = 1.0
self._stop_flag = threading.Event()
def set_reachy_mini(self, reachy_mini) -> None:
"""Set the Reachy Mini instance."""
self.reachy_mini = reachy_mini
def play(
self,
url: Union[str, List[str]],
done_callback: Optional[Callable[[], None]] = None,
stop_first: bool = True,
) -> None:
if stop_first:
self.stop()
if isinstance(url, str):
self._playlist = [url]
else:
self._playlist = list(url)
self._done_callback = done_callback
self._stop_flag.clear()
self._play_next()
def _play_next(self) -> None:
if not self._playlist or self._stop_flag.is_set():
self._on_playback_finished()
return
next_url = self._playlist.pop(0)
_LOGGER.debug("Playing %s", next_url)
self.is_playing = True
# Start playback in a thread
thread = threading.Thread(target=self._play_file, args=(next_url,), daemon=True)
thread.start()
def _play_file(self, file_path: str) -> None:
"""Play an audio file."""
try:
# Handle URLs - download first
if file_path.startswith(("http://", "https://")):
import urllib.request
import tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp:
urllib.request.urlretrieve(file_path, tmp.name)
file_path = tmp.name
if self._stop_flag.is_set():
return
# Use Reachy Mini's media system if available
if self.reachy_mini is not None:
try:
# Use Reachy Mini's play_sound method
self.reachy_mini.media.play_sound(file_path)
# Estimate playback duration and wait
import soundfile as sf
data, samplerate = sf.read(file_path)
duration = len(data) / samplerate
# Wait for playback to complete (with stop check)
start_time = time.time()
while time.time() - start_time < duration:
if self._stop_flag.is_set():
self.reachy_mini.media.clear_output_buffer()
break
time.sleep(0.1)
except Exception as e:
_LOGGER.warning("Reachy Mini audio failed, falling back to sounddevice: %s", e)
self._play_file_fallback(file_path)
else:
self._play_file_fallback(file_path)
except Exception as e:
_LOGGER.error("Error playing audio: %s", e)
finally:
self.is_playing = False
# Play next in playlist or finish
if self._playlist and not self._stop_flag.is_set():
self._play_next()
else:
self._on_playback_finished()
def _play_file_fallback(self, file_path: str) -> None:
"""Fallback to sounddevice for audio playback."""
import sounddevice as sd
import soundfile as sf
data, samplerate = sf.read(file_path)
# Apply volume
data = data * self._current_volume
if not self._stop_flag.is_set():
sd.play(data, samplerate)
sd.wait()
def _on_playback_finished(self) -> None:
"""Called when playback is finished."""
self.is_playing = False
todo_callback: Optional[Callable[[], None]] = None
with self._done_callback_lock:
if self._done_callback:
todo_callback = self._done_callback
self._done_callback = None
if todo_callback:
try:
todo_callback()
except Exception:
_LOGGER.exception("Unexpected error running done callback")
def pause(self) -> None:
self.is_playing = False
def resume(self) -> None:
if self._playlist:
self._play_next()
def stop(self) -> None:
self._stop_flag.set()
if self.reachy_mini is not None:
try:
self.reachy_mini.media.clear_output_buffer()
except Exception:
pass
self._playlist.clear()
self.is_playing = False
def duck(self) -> None:
self._current_volume = self._duck_volume
def unduck(self) -> None:
self._current_volume = self._unduck_volume
def set_volume(self, volume: int) -> None:
volume = max(0, min(100, volume))
self._unduck_volume = volume / 100.0
self._duck_volume = self._unduck_volume / 2
self._current_volume = self._unduck_volume