GitHub Actions
fix: 0 test failures; FileService; real RagService; emergency probe; chat return
4aaae80
Raw
History Blame
3.25 kB
from __future__ import annotations
from hearthnet.blobs.chunker import BlobError, BlobManifest, verify_chunk
from hearthnet.blobs.store import BlobStore
class TransferManager:
"""Coordinates parallel chunk fetch from multiple peer sources.
MVP: fetch from one source at a time (parallel is Phase 2 optimization).
"""
def __init__(self, store: BlobStore, http_client=None) -> None:
self.store = store
self._http_client = http_client
async def fetch(self, cid: str, sources: list[str]) -> BlobManifest:
"""Fetch a blob from one of the sources (tries in order).
sources: list of base URLs like 'https://host:7080'
"""
last_exc: Exception | None = None
for base_url in sources:
try:
return await self._fetch_from(cid, base_url)
except Exception as exc:
last_exc = exc
raise BlobError(
"not_found",
f"Could not fetch {cid} from any source. Last error: {last_exc}",
)
async def _fetch_from(self, cid: str, base_url: str) -> BlobManifest:
"""Fetch blob manifest + all chunks from base_url/file/chunks/<chunk_cid>."""
import json
client = self._http_client
if client is None:
try:
import httpx # type: ignore[import]
client = httpx.AsyncClient()
except ImportError as exc:
raise BlobError(
"io_error", "httpx is required for TransferManager network fetch"
) from exc
manifest_url = f"{base_url.rstrip('/')}/file/manifest/{cid}"
try:
resp = await client.get(manifest_url, timeout=30)
resp.raise_for_status()
raw = resp.json()
except Exception as exc:
raise BlobError(
"io_error", f"Failed to fetch manifest from {manifest_url}: {exc}"
) from exc
from hearthnet.blobs.store import BlobStore as _BS
manifest = _BS._manifest_from_dict(raw)
for chunk_ref in manifest.chunks:
if self.store._blob_path(chunk_ref.cid).exists():
continue # already have it
chunk_url = f"{base_url.rstrip('/')}/file/chunks/{chunk_ref.cid}"
try:
resp = await client.get(chunk_url, timeout=60)
resp.raise_for_status()
chunk_data = resp.content
except Exception as exc:
raise BlobError(
"io_error", f"Failed to fetch chunk {chunk_ref.cid}: {exc}"
) from exc
verify_chunk(chunk_data, chunk_ref.cid)
path = self.store._blob_path(chunk_ref.cid)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(chunk_data)
# Write manifest if not already stored
if not self.store.has(manifest.cid):
mpath = self.store._manifest_path(manifest.cid)
mpath.parent.mkdir(parents=True, exist_ok=True)
mpath.write_text(
json.dumps(_BS._manifest_to_dict(manifest), sort_keys=True, indent=2),
encoding="utf-8",
)
return manifest