File size: 4,064 Bytes
4cd8837
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from __future__ import annotations

import base64
from typing import Any

from hearthnet.bus.capability import CapabilityDescriptor, RouteRequest
from hearthnet.services.image.backends.base import ImageDescribeBackend, ImageDescription
from hearthnet.services.image.backends.florence2 import Florence2Backend


class ImageDescribeService:
    """Service wrapping image-description backends.

    Registers: img.describe@1.0
    """

    name = "image.describe"

    def __init__(self, backends: list[ImageDescribeBackend] | None = None, bus: Any = None) -> None:
        if backends is not None:
            self._backends: list[ImageDescribeBackend] = backends
        else:
            self._backends = [Florence2Backend()]
        self._bus = bus
        self._by_name: dict[str, ImageDescribeBackend] = {b.name: b for b in self._backends}

    # ── Service registration ──────────────────────────────────────────────────

    def capabilities(self) -> list[tuple]:
        return [
            (
                CapabilityDescriptor(
                    name="img.describe",
                    max_concurrent=2,
                    idempotent=True,
                    timeout_seconds=60,
                ),
                self.describe,
                None,
            ),
        ]

    def register(self, bus: Any) -> None:
        self._bus = bus
        for cap, handler, predicate in self.capabilities():
            bus.register_local(cap, handler, predicate)

    # ── Handler ───────────────────────────────────────────────────────────────

    async def describe(self, req: RouteRequest) -> dict:
        params: dict = req.body.get("input", {})

        image_cid: str | None = params.get("image_cid")
        image_b64: str | None = params.get("image_b64")
        mode: str = params.get("mode", "caption")
        backend_name: str | None = params.get("backend")

        # Resolve image bytes
        image_bytes: bytes | None = None
        if image_b64:
            try:
                image_bytes = base64.b64decode(image_b64)
            except Exception as exc:
                return {"error": "bad_request", "message": f"invalid image_b64: {exc}"}
        elif image_cid:
            # Attempt to resolve from blob store if available
            try:
                # If bus has a blob store reference, use it; otherwise return error
                if hasattr(self._bus, "blob_store"):
                    store: Any = self._bus.blob_store
                    image_bytes = store.get(image_cid)
            except Exception:
                pass
            if image_bytes is None:
                return {"error": "not_found", "message": f"blob {image_cid} not found"}
        else:
            return {"error": "bad_request", "message": "image_cid or image_b64 required"}

        # Select backend
        backend: ImageDescribeBackend | None = None
        if backend_name:
            backend = self._by_name.get(backend_name)
            if backend is None:
                return {"error": "bad_request", "message": f"unknown backend: {backend_name}"}
        elif self._backends:
            backend = self._backends[0]
        else:
            return {"error": "unavailable", "message": "no image backends configured"}

        result: ImageDescription = await backend.describe(image_bytes, mode=mode)
        return {
            "output": {
                "caption": result.caption,
                "tags": result.tags,
                "objects": result.objects,
                "ocr_text": result.ocr_text,
                "backend": result.backend,
                "ms": result.ms,
            },
            "meta": {},
        }

    def health(self) -> dict:
        return {
            "service": self.name,
            "backends": [b.health() for b in self._backends],
        }