"""Filesystem helpers: logging, atomic JSON writes, run state, and metrics.""" from __future__ import annotations import json import os import time from pathlib import Path from .constants import * def log(message: str) -> None: print(message, flush=True) def ensure_dirs() -> None: DATA_DIR.mkdir(parents=True, exist_ok=True) CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True) SESSIONS_DIR.mkdir(parents=True, exist_ok=True) EXPORT_DIR.mkdir(parents=True, exist_ok=True) RUN_DIR.mkdir(parents=True, exist_ok=True) def write_json_atomic(path: Path, data: dict) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(".tmp") tmp.write_text(json.dumps(data, indent=2), encoding="utf-8") tmp.replace(path) def write_run_state(**updates) -> None: ensure_dirs() state = {} if STATE_PATH.exists(): try: state = json.loads(STATE_PATH.read_text(encoding="utf-8")) except Exception: state = {} state.update(updates) state["updated_at"] = time.time() write_json_atomic(STATE_PATH, state) def write_metric(kind: str, **values) -> None: ensure_dirs() row = {"time": time.time(), "kind": kind, **values} with METRICS_PATH.open("a", encoding="utf-8") as fh: fh.write(json.dumps(row, ensure_ascii=False) + "\n") def read_json_file(path: Path, fallback): try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return fallback def read_jsonl_tail(path: Path, limit: int = 4000) -> list[dict]: if not path.exists(): return [] try: with path.open("rb") as fh: fh.seek(0, os.SEEK_END) size = fh.tell() fh.seek(max(0, size - 4 * 1024 * 1024)) lines = fh.read().decode("utf-8", errors="replace").splitlines() except Exception: return [] out = [] for line in lines[-limit:]: try: out.append(json.loads(line)) except Exception: pass return out def read_text_tail(path: Path, max_chars: int = 24000) -> str: if not path.exists(): return "" try: text = path.read_text(encoding="utf-8", errors="replace") except Exception: return "" return text[-max_chars:] def next_training_session_dir() -> Path: ensure_dirs() idx = 1 while True: session_dir = SESSIONS_DIR / f"training_{idx}" if not session_dir.exists(): (session_dir / "checkpoints").mkdir(parents=True, exist_ok=True) return session_dir checkpoint_dir = session_dir / "checkpoints" if checkpoint_dir.exists() and not any(checkpoint_dir.glob("*.pt")): return session_dir idx += 1 def session_dir_from_checkpoint(path: Path) -> Path | None: parts = path.resolve().parts if "training_sessions" not in parts: return None idx = parts.index("training_sessions") if idx + 1 >= len(parts): return None return Path(*parts[: idx + 2]) def all_checkpoint_files() -> list[Path]: ensure_dirs() paths = list(CHECKPOINT_DIR.glob("*.pt")) paths.extend(SESSIONS_DIR.glob("training_*/checkpoints/*.pt")) return sorted({p.resolve() for p in paths if p.exists()}, key=lambda p: p.stat().st_mtime, reverse=True)