HearthNet-Nemotron / hearthnet /services /image /describe_service.py
GitHub Actions
fix: 0 test failures; FileService; real RagService; emergency probe; chat return
4aaae80
Raw
History Blame
4.06 kB
from __future__ import annotations
import base64
from typing import Any
from hearthnet.bus.capability import CapabilityDescriptor, RouteRequest
from hearthnet.services.image.backends.base import ImageDescribeBackend, ImageDescription
from hearthnet.services.image.backends.florence2 import Florence2Backend
class ImageDescribeService:
"""Service wrapping image-description backends.
Registers: img.describe@1.0
"""
name = "image.describe"
def __init__(self, backends: list[ImageDescribeBackend] | None = None, bus: Any = None) -> None:
if backends is not None:
self._backends: list[ImageDescribeBackend] = backends
else:
self._backends = [Florence2Backend()]
self._bus = bus
self._by_name: dict[str, ImageDescribeBackend] = {b.name: b for b in self._backends}
# ── Service registration ──────────────────────────────────────────────────
def capabilities(self) -> list[tuple]:
return [
(
CapabilityDescriptor(
name="img.describe",
max_concurrent=2,
idempotent=True,
timeout_seconds=60,
),
self.describe,
None,
),
]
def register(self, bus: Any) -> None:
self._bus = bus
for cap, handler, predicate in self.capabilities():
bus.register_local(cap, handler, predicate)
# ── Handler ───────────────────────────────────────────────────────────────
async def describe(self, req: RouteRequest) -> dict:
params: dict = req.body.get("input", {})
image_cid: str | None = params.get("image_cid")
image_b64: str | None = params.get("image_b64")
mode: str = params.get("mode", "caption")
backend_name: str | None = params.get("backend")
# Resolve image bytes
image_bytes: bytes | None = None
if image_b64:
try:
image_bytes = base64.b64decode(image_b64)
except Exception as exc:
return {"error": "bad_request", "message": f"invalid image_b64: {exc}"}
elif image_cid:
# Attempt to resolve from blob store if available
try:
# If bus has a blob store reference, use it; otherwise return error
if hasattr(self._bus, "blob_store"):
store: Any = self._bus.blob_store
image_bytes = store.get(image_cid)
except Exception:
pass
if image_bytes is None:
return {"error": "not_found", "message": f"blob {image_cid} not found"}
else:
return {"error": "bad_request", "message": "image_cid or image_b64 required"}
# Select backend
backend: ImageDescribeBackend | None = None
if backend_name:
backend = self._by_name.get(backend_name)
if backend is None:
return {"error": "bad_request", "message": f"unknown backend: {backend_name}"}
elif self._backends:
backend = self._backends[0]
else:
return {"error": "unavailable", "message": "no image backends configured"}
result: ImageDescription = await backend.describe(image_bytes, mode=mode)
return {
"output": {
"caption": result.caption,
"tags": result.tags,
"objects": result.objects,
"ocr_text": result.ocr_text,
"backend": result.backend,
"ms": result.ms,
},
"meta": {},
}
def health(self) -> dict:
return {
"service": self.name,
"backends": [b.health() for b in self._backends],
}