GitHub Actions
fix: 0 test failures; FileService; real RagService; emergency probe; chat return
4aaae80
Raw
History Blame
4.13 kB
"""SttService β€” registers stt.transcribe@1.0 on the bus."""
from __future__ import annotations
import base64
from typing import Any
from hearthnet.constants import STT_MAX_AUDIO_SECONDS
class SttService:
name = "stt"
version = "1.0"
def __init__(
self,
backends: list[Any] | None = None,
bus: Any = None,
) -> None:
if backends is not None:
self._backends = backends
else:
self._backends = self._discover_backends()
if bus is not None:
self.register(bus)
# ── Backend discovery ─────────────────────────────────────────────────────
def _discover_backends(self) -> list[Any]:
backends: list[Any] = []
try:
from hearthnet.services.speech.backends.whisper_local import WhisperBackend
b = WhisperBackend()
if b.health().get("status") == "ok":
backends.append(b)
except Exception:
pass
return backends
def _select_backend(self) -> Any | None:
for backend in self._backends:
if backend.health().get("status") == "ok":
return backend
return None
# ── Capability registration ───────────────────────────────────────────────
def register(self, bus: Any) -> None:
from hearthnet.bus.capability import CapabilityDescriptor
desc = CapabilityDescriptor(
name="stt.transcribe",
version=(1, 0),
stability="stable",
params={"backends": [b.name for b in self._backends]},
max_concurrent=2,
trust_required="member",
timeout_seconds=STT_MAX_AUDIO_SECONDS + 30,
idempotent=True,
)
bus.register_capability(desc, self._handle_transcribe, self.params_compatible)
def params_compatible(self, offered: dict, requested: dict) -> bool:
req_backend = requested.get("backend")
if not req_backend:
return True
return req_backend in offered.get("backends", [])
# ── Handler ───────────────────────────────────────────────────────────────
async def _handle_transcribe(self, req: Any) -> dict:
body = req.body if hasattr(req, "body") else req
inp = body.get("input", body)
audio_b64: str | None = inp.get("audio_b64")
language: str | None = inp.get("language")
translate_to_en: bool = bool(inp.get("translate_to_en", False))
if not audio_b64:
return {"error": "bad_request", "reason": "audio_b64 is required"}
try:
audio_bytes = base64.b64decode(audio_b64)
except Exception:
return {"error": "bad_request", "reason": "audio_b64 is not valid base64"}
backend = self._select_backend()
if backend is None:
return {
"error": "backend_unavailable",
"reason": "No healthy STT backend available",
}
try:
result = await backend.transcribe(
audio_bytes, language=language, translate_to_en=translate_to_en
)
except Exception as exc:
return {"error": "internal_error", "reason": str(exc)}
segments_out = [
{
"start_seconds": s.start_seconds,
"end_seconds": s.end_seconds,
"text": s.text,
"language": s.language,
"confidence": s.confidence,
}
for s in result.segments
]
return {
"segments": segments_out,
"full_text": result.full_text,
"detected_language": result.detected_language,
"backend": result.backend,
"ms": result.ms,
}