File size: 5,571 Bytes
1562b35 b8cfa60 1562b35 b8cfa60 1562b35 b8cfa60 1562b35 b8cfa60 1562b35 b8cfa60 1562b35 b8cfa60 1562b35 b8cfa60 1562b35 b8cfa60 1562b35 b8cfa60 1562b35 b8cfa60 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | """Audio player using Reachy Mini's media system."""
import logging
import threading
import time
from collections.abc import Callable
from pathlib import Path
from typing import List, Optional, Union
import numpy as np
import scipy.signal
_LOGGER = logging.getLogger(__name__)
class AudioPlayer:
"""Audio player using Reachy Mini's media system."""
def __init__(self, reachy_mini=None) -> None:
self.reachy_mini = reachy_mini
self.is_playing = False
self._playlist: List[str] = []
self._done_callback: Optional[Callable[[], None]] = None
self._done_callback_lock = threading.Lock()
self._duck_volume: float = 0.5
self._unduck_volume: float = 1.0
self._current_volume: float = 1.0
self._stop_flag = threading.Event()
def set_reachy_mini(self, reachy_mini) -> None:
"""Set the Reachy Mini instance."""
self.reachy_mini = reachy_mini
def play(
self,
url: Union[str, List[str]],
done_callback: Optional[Callable[[], None]] = None,
stop_first: bool = True,
) -> None:
if stop_first:
self.stop()
if isinstance(url, str):
self._playlist = [url]
else:
self._playlist = list(url)
self._done_callback = done_callback
self._stop_flag.clear()
self._play_next()
def _play_next(self) -> None:
if not self._playlist or self._stop_flag.is_set():
self._on_playback_finished()
return
next_url = self._playlist.pop(0)
_LOGGER.debug("Playing %s", next_url)
self.is_playing = True
# Start playback in a thread
thread = threading.Thread(target=self._play_file, args=(next_url,), daemon=True)
thread.start()
def _play_file(self, file_path: str) -> None:
"""Play an audio file."""
try:
# Handle URLs - download first
if file_path.startswith(("http://", "https://")):
import urllib.request
import tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp:
urllib.request.urlretrieve(file_path, tmp.name)
file_path = tmp.name
if self._stop_flag.is_set():
return
# Use Reachy Mini's media system if available
if self.reachy_mini is not None:
try:
# Use Reachy Mini's play_sound method
self.reachy_mini.media.play_sound(file_path)
# Estimate playback duration and wait
import soundfile as sf
data, samplerate = sf.read(file_path)
duration = len(data) / samplerate
# Wait for playback to complete (with stop check)
start_time = time.time()
while time.time() - start_time < duration:
if self._stop_flag.is_set():
self.reachy_mini.media.clear_output_buffer()
break
time.sleep(0.1)
except Exception as e:
_LOGGER.warning("Reachy Mini audio failed, falling back to sounddevice: %s", e)
self._play_file_fallback(file_path)
else:
self._play_file_fallback(file_path)
except Exception as e:
_LOGGER.error("Error playing audio: %s", e)
finally:
self.is_playing = False
# Play next in playlist or finish
if self._playlist and not self._stop_flag.is_set():
self._play_next()
else:
self._on_playback_finished()
def _play_file_fallback(self, file_path: str) -> None:
"""Fallback to sounddevice for audio playback."""
import sounddevice as sd
import soundfile as sf
data, samplerate = sf.read(file_path)
# Apply volume
data = data * self._current_volume
if not self._stop_flag.is_set():
sd.play(data, samplerate)
sd.wait()
def _on_playback_finished(self) -> None:
"""Called when playback is finished."""
self.is_playing = False
todo_callback: Optional[Callable[[], None]] = None
with self._done_callback_lock:
if self._done_callback:
todo_callback = self._done_callback
self._done_callback = None
if todo_callback:
try:
todo_callback()
except Exception:
_LOGGER.exception("Unexpected error running done callback")
def pause(self) -> None:
self.is_playing = False
def resume(self) -> None:
if self._playlist:
self._play_next()
def stop(self) -> None:
self._stop_flag.set()
if self.reachy_mini is not None:
try:
self.reachy_mini.media.clear_output_buffer()
except Exception:
pass
self._playlist.clear()
self.is_playing = False
def duck(self) -> None:
self._current_volume = self._duck_volume
def unduck(self) -> None:
self._current_volume = self._unduck_volume
def set_volume(self, volume: int) -> None:
volume = max(0, min(100, volume))
self._unduck_volume = volume / 100.0
self._duck_volume = self._unduck_volume / 2
self._current_volume = self._unduck_volume
|