"""TAU-RAG CLI — subcommands. Usage: python3 -m tau_rag.api.cli query "מה חובות המעביד?" --preset hebrew_legal python3 -m tau_rag.api.cli ingest --source path/to/corpus.jsonl --out snapshot/ python3 -m tau_rag.api.cli bench --cases cases.jsonl --preset no_llm python3 -m tau_rag.api.cli eval --corpus path/ --synthesize python3 -m tau_rag.api.cli dashboard --bench-json run.json --out dash.html python3 -m tau_rag.api.cli trace --output traces.jsonl python3 -m tau_rag.api.cli info """ from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Any, Dict, List from ..core.config import Config from ..core.types import Document, Query # ----------------------------------------------------------------- helpers def _load_config(args) -> Config: if args.config: return Config.from_json(args.config) return { "mock": Config.mock, "default": Config.default, "hebrew_legal": Config.hebrew_legal, "no_llm": Config.no_llm, }[args.preset]() _DEMO_CORPUS = [ Document(id="labor-5", text=( "המעביד חייב לשלם לעובד תוספת 25% בשעתיים הראשונות " "ותוספת של 50% בכל שעה נוספת לאחר מכן, למעט עבודה בשבת." ), metadata={"source": "חוק שעות עבודה ומנוחה, סעיף 5"}), Document(id="labor-10", text=( "עובד רשאי לסרב לעבוד בשבת אם אין הסכמה מפורשת, " "פרט למקרים חריגים." ), metadata={"source": "סעיף 10"}), Document(id="labor-7", text=( "אסור למעביד לפטר עובד בשל סירובו לעבוד שעות נוספות." ), metadata={"source": "חוק הגנת העובד, סעיף 7"}), Document(id="labor-12", text=( "חופשת מחלה תינתן לעובד בתנאי שהמציא אישור רפואי." ), metadata={"source": "חוק דמי מחלה, סעיף 12"}), Document(id="family-1", text=( "לבני זוג הזכות להתגרש על פי דין, כפוף לאישור בית הדין הרבני." ), metadata={"source": "חוק שיפוט בתי דין רבניים"}), Document(id="family-2", text=( "משמורת הילדים תינתן להורה המשמש להם יציבות, אלא אם כן " "החלטה אחרת של בית המשפט." ), metadata={"source": "חוק הכשרות המשפטית והאפוטרופסות"}), ] def _load_corpus(path: Path) -> List[Document]: if not path.exists(): raise FileNotFoundError( f"Corpus path does not exist: {path}\n" f" Tip: use --demo for a built-in mini-corpus, " f"or --text 'id::text' for inline docs." ) if path.is_file() and path.suffix == ".jsonl": docs = [] for line in path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue row = json.loads(line) docs.append(Document(id=str(row["id"]), text=row["text"], metadata=row.get("metadata", {}))) return docs if path.is_dir(): docs = [] for f in sorted(path.glob("*.txt")): docs.append(Document(id=f.stem, text=f.read_text(encoding="utf-8"), metadata={"path": str(f)})) for f in sorted(path.glob("*.pdf")): from ..loaders import PDFLoader docs.extend(PDFLoader(f).load()) if not docs: raise FileNotFoundError( f"No .txt / .pdf / .jsonl found under: {path}\n" f" Tip: use --demo or --text 'id::content'." ) return docs raise ValueError( f"Unsupported corpus path: {path}\n" f" Expected a .jsonl file, a directory of .txt/.pdf, " f"--demo, or --text inline docs." ) def _parse_inline_text(entries: List[str]) -> List[Document]: """Parse ['id::text', 'id2::text2'] style inline documents.""" docs: List[Document] = [] for i, entry in enumerate(entries): if "::" in entry: doc_id, text = entry.split("::", 1) else: doc_id, text = f"doc-{i}", entry docs.append(Document(id=doc_id.strip(), text=text.strip(), metadata={})) return docs # ----------------------------------------------------------------- query def cmd_query(args) -> int: from ..pipeline import Pipeline cfg = _load_config(args) pipe = Pipeline.from_config(cfg) # Assemble the corpus from whichever inputs are provided docs: List[Document] = [] if args.demo: docs.extend(_DEMO_CORPUS) if args.text: docs.extend(_parse_inline_text(args.text)) if args.corpus: docs.extend(_load_corpus(args.corpus)) if docs: n = pipe.add_documents(docs, chunker=args.chunker) print(f"[corpus] {len(docs)} docs → {n} chunks", file=sys.stderr) else: print( "[warn] no corpus provided; the pipeline has no knowledge " "base to answer from. Use one of:\n" " --demo # built-in Hebrew legal mini-corpus\n" " --text 'doc-id::some content' # inline docs (repeatable)\n" " --corpus path/to/corpus.jsonl # load from file or folder", file=sys.stderr, ) resp = pipe.run(Query( text=args.q, lang=args.lang, k=args.k, rerank_k=args.rerank_k, )) out = resp.to_dict() if args.pretty: print(json.dumps(out, ensure_ascii=False, indent=2)) else: print(json.dumps(out, ensure_ascii=False)) return 0 # ----------------------------------------------------------------- ingest def cmd_ingest(args) -> int: from ..pipeline import Pipeline from ..persist import save_pipeline cfg = _load_config(args) pipe = Pipeline.from_config(cfg) docs = _load_corpus(args.source) n = pipe.add_documents(docs, chunker=args.chunker) stats = save_pipeline(pipe, args.out) print(json.dumps({ "source": str(args.source), "docs": len(docs), "chunks": n, "out": str(args.out), "per_retriever": stats, }, indent=2)) return 0 # ----------------------------------------------------------------- bench def cmd_bench(args) -> int: from ..bench import Benchmark, Case from ..pipeline import Pipeline from ..persist import load_pipeline cfg = _load_config(args) if args.snapshot: pipe = load_pipeline(cfg, args.snapshot) else: pipe = Pipeline.from_config(cfg) if args.corpus: pipe.add_documents(_load_corpus(args.corpus), chunker=args.chunker) cases: List[Case] = [] for line in args.cases.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue row = json.loads(line) cases.append(Case( query=row["query"], relevant_doc_ids=set(row.get("relevant_doc_ids", [])), lang=row.get("lang", "he"), expected_signature_hash=row.get("expected_signature_hash"), notes=row.get("notes", ""), )) result = Benchmark(pipe).run(cases, k=args.k) print(json.dumps(result.to_dict(), indent=2, ensure_ascii=False)) if args.save: Path(args.save).write_text( json.dumps({**result.to_dict(), "raw": result.raw}, indent=2, ensure_ascii=False), encoding="utf-8", ) print(f"[saved] {args.save}", file=sys.stderr) return 0 # ----------------------------------------------------------------- eval (synth) def cmd_eval(args) -> int: from ..bench import Benchmark, generate_cases_from_docs from ..pipeline import Pipeline cfg = _load_config(args) pipe = Pipeline.from_config(cfg) docs: List[Document] = [] if args.demo: docs.extend(_DEMO_CORPUS) if args.text: docs.extend(_parse_inline_text(args.text)) if args.corpus: docs.extend(_load_corpus(args.corpus)) if not docs: raise FileNotFoundError( "eval needs a corpus — use --demo, --text, or --corpus." ) pipe.add_documents(docs, chunker=args.chunker) cases = generate_cases_from_docs(docs, per_doc=args.per_doc, seed=args.seed) if not cases: print("[warn] no synthetic cases could be generated " "(no deontic cues detected in corpus).", file=sys.stderr) result = Benchmark(pipe).run(cases, k=args.k) print(json.dumps(result.to_dict(), indent=2, ensure_ascii=False)) return 0 # ----------------------------------------------------------------- dashboard def cmd_dashboard(args) -> int: from ..bench.dashboard import render_dashboard data = json.loads(args.bench_json.read_text(encoding="utf-8")) # simulate a BenchmarkResult-like object class _R: def to_dict(self_inner): return data raw = data.get("raw", []) out = render_dashboard(_R(), srib_report=None, out_path=args.out) print(f"[dashboard] {out}", file=sys.stderr) return 0 # ----------------------------------------------------------------- trace def cmd_trace(args) -> int: from ..observability import get_tracer tracer = get_tracer() out = tracer.export_json(args.output) print(f"[traces] {len(tracer.spans)} spans → {out}", file=sys.stderr) return 0 # ----------------------------------------------------------------- ab def cmd_ab(args) -> int: from ..bench import ab_compare, render_ab_html, Case from ..pipeline import Pipeline from ..core.config import Config def _load_cfg(p): return Config.from_json(p) if p else None def _preset(name): return { "mock": Config.mock, "default": Config.default, "hebrew_legal": Config.hebrew_legal, "no_llm": Config.no_llm, "hebrew_dense": Config.hebrew_dense, }[name]() cfg_a = _load_cfg(args.config_a) or _preset(args.preset_a) cfg_b = _load_cfg(args.config_b) or _preset(args.preset_b) pipe_a = Pipeline.from_config(cfg_a) pipe_b = Pipeline.from_config(cfg_b) # Load corpus (same for both) — reuse query-mode inputs docs: List[Document] = [] if args.demo: docs.extend(_DEMO_CORPUS) if args.text: docs.extend(_parse_inline_text(args.text)) if args.corpus: docs.extend(_load_corpus(args.corpus)) if docs: pipe_a.add_documents(docs, chunker=args.chunker) pipe_b.add_documents(docs, chunker=args.chunker) # Cases cases: List[Case] = [] for line in args.cases.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue row = json.loads(line) cases.append(Case( query=row["query"], relevant_doc_ids=set(row.get("relevant_doc_ids", [])), lang=row.get("lang", "he"), )) res = ab_compare(pipe_a, pipe_b, cases, k=args.k, label_a=args.label_a, label_b=args.label_b) print(json.dumps(res.to_dict(), indent=2, ensure_ascii=False)) if args.html: out = render_ab_html(res, args.html) print(f"[html] {out}", file=sys.stderr) return 0 # ----------------------------------------------------------------- info def cmd_info(args) -> int: from .. import __version__ from ..retrieve import __all__ as retrievers from ..generate import __all__ as generators from ..verify import __all__ as verifiers from ..fuse import __all__ as fusers print(json.dumps({ "version": __version__, "retrievers": retrievers, "generators": generators, "verifiers": verifiers, "fusers": fusers, "presets": ["mock", "default", "hebrew_legal", "no_llm"], }, indent=2)) return 0 def cmd_run_preset(args) -> int: """Execute a saved query preset (v1.66) from the CLI in-process — no HTTP server required. Supports --list, --corpus, --snapshot, --json.""" from ..presets import get_preset_store store = get_preset_store() # ---- --list mode if args.list_all: rows = store.list_all() if args.as_json: print(json.dumps(rows, ensure_ascii=False, indent=2)) else: if not rows: print("no presets saved.") print("hint: PUT /v1/queries/{name} or edit " "$TAU_RAG_QUERY_PRESETS_PATH") return 0 print(f"{len(rows)} preset(s):") for p in rows: notes = f" · {p.get('notes','')}" if p.get("notes") else "" print(f" {p['name']:<24} k={p.get('k','-')} " f"strategy={p.get('strategy','-')}{notes}") print(f" → {p['query']}") return 0 # ---- run mode requires a name if not args.name: print("[error] preset name required (or use --list)", file=sys.stderr) return 2 preset = store.get(args.name) if preset is None: print(f"[error] preset not found: {args.name}", file=sys.stderr) return 3 # Build pipeline from ..pipeline import Pipeline from ..core.types import Document, Query, Strategy pipe = Pipeline.from_config(_load_config(args)) # Optional corpus / snapshot if args.corpus: if not args.corpus.exists(): print(f"[error] corpus not found: {args.corpus}", file=sys.stderr) return 2 with open(args.corpus, encoding="utf-8") as f: docs = [] for line in f: line = line.strip() if not line or line.startswith("#"): continue row = json.loads(line) docs.append(Document( id=row["id"], text=row["text"], metadata=row.get("metadata") or {}, )) pipe.add_documents(docs) if args.snapshot: if not args.snapshot.exists(): print(f"[error] snapshot not found: {args.snapshot}", file=sys.stderr) return 2 pipe.load_snapshot(args.snapshot, replace=False) # Run the preset try: strategy = Strategy(preset.strategy) except ValueError: strategy = Strategy("hybrid") q = Query( text=preset.query, lang=preset.lang, filters={}, strategy=strategy, k=preset.k, rerank_k=preset.rerank_k, ) resp = pipe.run(q) try: omega = float(resp.signals.omega) if resp.signals else None except Exception: omega = None verif = getattr(resp, "verification", None) summary = { "preset": preset.name, "query": preset.query, "answer": resp.answer or "", "sources": list(resp.sources or []), "omega": omega, "passed": bool(getattr(verif, "passed", False)) if verif else None, "timing_ms": dict(resp.timing_ms or {}), } if args.as_json: print(json.dumps(summary, ensure_ascii=False, indent=2)) else: print(f'preset : {preset.name}') print(f'query : {preset.query}') print(f'answer : {summary["answer"]}') print(f'sources: {summary["sources"]}') if omega is not None: print(f'Ω : {omega:.3f}') print(f'passed : {summary["passed"]}') total = summary["timing_ms"].get("total_ms") if total is not None: print(f'total : {total:.2f}ms') return 0 def cmd_snapshot_diff(args) -> int: """Compare two snapshots on disk. Human-readable output by default, JSON with ``--json`` (exit code reflects whether any change was found: 0 = identical, 1 = changes).""" from pathlib import Path as _P from ..snapshot import diff_snapshots for label, p in (("a", args.a), ("b", args.b)): if not _P(p).exists(): print(f"[error] snapshot not found ({label}): {p}", file=sys.stderr) return 2 result = diff_snapshots(args.a, args.b, include_details=args.details) if args.as_json: print(json.dumps(result, ensure_ascii=False, indent=2)) else: ha, hb = result["a"], result["b"] print(f"a = {ha['path']} (n={ha['n_docs']} fp={ha['fingerprint']})") print(f"b = {hb['path']} (n={hb['n_docs']} fp={hb['fingerprint']})") print(f"same_fingerprint : {result['same_fingerprint']}") print(f"added ({len(result['added']):>4}): " f"{result['added'][:10]}{' …' if len(result['added'])>10 else ''}") print(f"removed ({len(result['removed']):>4}): " f"{result['removed'][:10]}{' …' if len(result['removed'])>10 else ''}") mods = result["modified"] ids = [m["id"] if isinstance(m, dict) else m for m in mods] print(f"modified ({len(ids):>4}): " f"{ids[:10]}{' …' if len(ids)>10 else ''}") print(f"unchanged: {result['unchanged_count']}") if result["warnings"]: print(f"[warn] {len(result['warnings'])} parse warning(s)") n_changed = (len(result["added"]) + len(result["removed"]) + len(result["modified"])) return 0 if n_changed == 0 else 1 def cmd_doctor(args) -> int: """Sanity-check the tau-rag environment. Reports: * python version + required deps * env var coherence * persistence path existence + writeability * pipeline initialization (basic readiness) * a ping of /v1/version if the server is reachable (skipped by default) Exit codes: 0 — all good (or only info) 1 — warnings found (or errors under --strict) 2 — hard errors (missing deps, pipeline broken) """ import os as _os import platform as _plat import sys as _sys from pathlib import Path as _P findings: list = [] def _add(level: str, check: str, msg: str, **extra): findings.append({"level": level, "check": check, "msg": msg, **extra}) # ---- python py = _sys.version.split()[0] if tuple(int(x) for x in py.split(".")[:2]) < (3, 10): _add("error", "python", f"Python {py} < 3.10 minimum") else: _add("ok", "python", f"Python {py} OK", version=py) # ---- platform _add("info", "platform", _plat.platform()) # ---- deps for mod in ("fastapi", "pydantic"): try: __import__(mod) _add("ok", f"dep.{mod}", f"{mod} available") except Exception: _add("error", f"dep.{mod}", f"{mod} not installed") for mod in ("uvicorn",): try: __import__(mod) _add("ok", f"dep.{mod}", f"{mod} available (for serve)") except Exception: _add("warn", f"dep.{mod}", f"{mod} not installed — `serve` will not work") # ---- env var coherence if _os.environ.get("TAU_RAG_REQUIRE_AUTH") == "1" \ and not _os.environ.get("TAU_RAG_ADMIN_KEY"): _add("warn", "env.auth", "TAU_RAG_REQUIRE_AUTH=1 but TAU_RAG_ADMIN_KEY is unset — " "bootstrap_admin will run on first /v1/admin/keys call") if _os.environ.get("TAU_RAG_SNAPSHOT_INTERVAL") \ and not _os.environ.get("TAU_RAG_SNAPSHOT_PATH"): _add("warn", "env.snapshot", "TAU_RAG_SNAPSHOT_INTERVAL set without TAU_RAG_SNAPSHOT_PATH — " "auto-snapshot disabled") # ---- persistence paths — existence + write access for name, env in ( ("snapshot", "TAU_RAG_SNAPSHOT_PATH"), ("synonyms", "TAU_RAG_SYNONYMS_PATH"), ("presets", "TAU_RAG_QUERY_PRESETS_PATH"), ("keys", "TAU_RAG_KEYS_PATH"), ("obs_log", "TAU_RAG_LOG_PATH"), ): path = _os.environ.get(env) if not path: _add("info", f"path.{name}", f"{env} not set (optional)") continue p = _P(path) if p.exists(): _add("ok", f"path.{name}", f"{name} exists: {p}", size=p.stat().st_size if p.is_file() else None) else: # Check if parent is writeable (so first save would succeed) parent = p.parent if not parent.exists(): _add("warn", f"path.{name}", f"{name} missing (and parent doesn't exist either): {p}") elif not _os.access(str(parent), _os.W_OK): _add("error", f"path.{name}", f"{name} missing, parent not writeable: {parent}") else: _add("info", f"path.{name}", f"{name} not yet created (will be on first save): {p}") # ---- pipeline readiness try: from ..pipeline import Pipeline from ..core.config import Config from .metrics import check_readiness preset = _os.environ.get("TAU_RAG_PRESET", "mock") cfg_factory = { "mock": Config.mock, "default": Config.default, "no_llm": Config.no_llm, "hebrew_legal": Config.hebrew_legal, "hebrew_dense": Config.hebrew_dense, }.get(preset, Config.mock) pipe = Pipeline.from_config(cfg_factory()) ok, detail = check_readiness(pipe) if ok: _add("ok", "pipeline", f"pipeline initializes (preset={preset})", retriever_members=detail.get("retriever_members")) else: _add("error", "pipeline", f"pipeline not ready (preset={preset})", detail=detail) except Exception as e: _add("error", "pipeline", f"could not construct pipeline: {type(e).__name__}: {e}") # ---- summary counts = {"ok": 0, "info": 0, "warn": 0, "error": 0} for f in findings: counts[f["level"]] = counts.get(f["level"], 0) + 1 if args.as_json: print(json.dumps({"findings": findings, "counts": counts}, ensure_ascii=False, indent=2)) else: LEVEL_ICON = {"ok": "✓", "info": "·", "warn": "!", "error": "✗"} print("=== tau-rag doctor ===") for f in findings: icon = LEVEL_ICON.get(f["level"], "?") print(f" [{icon}] {f['check']:22} {f['msg']}") print() print(f"summary: " f"{counts['ok']} ok · {counts['info']} info · " f"{counts['warn']} warn · {counts['error']} error") if counts["error"] > 0: return 2 if args.strict and counts["warn"] > 0: return 1 return 0 def cmd_backup(args) -> int: """Bundle {snapshot, synonyms, presets, keys} → tar.gz. Each file is stored at a well-known name inside the archive so ``restore`` knows where to put it back. The *original* abs path is recorded in a manifest.json sidecar so ``restore`` without --dest-dir can write back to the exact original locations. """ import os as _os import tarfile from pathlib import Path as _P _PATHS = [ ("snapshot", args.snapshot or _os.environ.get("TAU_RAG_SNAPSHOT_PATH")), ("synonyms", args.synonyms or _os.environ.get("TAU_RAG_SYNONYMS_PATH")), ("presets", args.presets or _os.environ.get("TAU_RAG_QUERY_PRESETS_PATH")), ("keys", args.keys or _os.environ.get("TAU_RAG_KEYS_PATH")), ] manifest = {"created_at": int(__import__("time").time()), "files": {}} included: list = [] out = args.out out.parent.mkdir(parents=True, exist_ok=True) with tarfile.open(out, "w:gz") as tar: for name, src in _PATHS: if not src: continue src_p = _P(src) if not src_p.exists(): print(f"[warn] {name} path not found, skipping: {src_p}", file=sys.stderr) continue # Archive member name: {name}/ arcname = f"{name}/{src_p.name}" tar.add(str(src_p), arcname=arcname) manifest["files"][name] = { "arcname": arcname, "src_path": str(src_p.resolve()), "size": src_p.stat().st_size, } included.append((name, str(src_p))) # Write the manifest import tempfile as _tempfile with _tempfile.NamedTemporaryFile("w", delete=False, suffix=".json", encoding="utf-8") as tf: tf.write(json.dumps(manifest, ensure_ascii=False, indent=2)) manifest_path = tf.name tar.add(manifest_path, arcname="manifest.json") try: _os.unlink(manifest_path) except Exception: pass print(f"wrote archive: {out} ({out.stat().st_size} bytes)") print(f"included : {len(included)} file(s)") for name, src in included: print(f" - {name:10} ← {src}") return 0 def cmd_restore(args) -> int: """Extract a backup archive and write its files back to the original paths recorded in manifest.json (or to --dest-dir if provided).""" import tarfile from pathlib import Path as _P arc = args.archive if not arc.exists(): print(f"[error] archive not found: {arc}", file=sys.stderr) return 2 with tarfile.open(arc, "r:gz") as tar: names = tar.getnames() if "manifest.json" not in names: print(f"[error] archive missing manifest.json; cannot restore", file=sys.stderr) return 2 if args.dry_run: print(f"archive: {arc}") print(f"contents ({len(names)} members):") for n in names: m = tar.getmember(n) print(f" {n:40} {m.size:>10}B") return 0 # Read manifest man_file = tar.extractfile("manifest.json") manifest = json.loads(man_file.read().decode("utf-8")) written: list = [] for name, meta in manifest.get("files", {}).items(): arcname = meta["arcname"] src_path = meta["src_path"] if args.dest_dir: dest_p = args.dest_dir / _P(arcname).name else: dest_p = _P(src_path) dest_p.parent.mkdir(parents=True, exist_ok=True) # Extract the single member to dest_p member = tar.getmember(arcname) reader = tar.extractfile(member) with open(dest_p, "wb") as fout: fout.write(reader.read()) written.append((name, str(dest_p))) print(f"restored {len(written)} file(s) from {arc}:") for name, dst in written: print(f" - {name:10} → {dst}") return 0 def cmd_serve(args) -> int: """Translate CLI flags → env vars → uvicorn.run(). Env vars are set BEFORE importing ``tau_rag.api.fastapi_app`` because that module reads them at import time (preset, snapshot path, interval, CORS origins, HSTS, etc.). """ import os as _os env_updates: Dict[str, str] = {} env_updates["TAU_RAG_PRESET"] = args.preset if args.snapshot_path: env_updates["TAU_RAG_SNAPSHOT_PATH"] = str(args.snapshot_path) if args.snapshot_interval: env_updates["TAU_RAG_SNAPSHOT_INTERVAL"] = str(float(args.snapshot_interval)) if args.synonyms_path: env_updates["TAU_RAG_SYNONYMS_PATH"] = str(args.synonyms_path) if args.require_auth: env_updates["TAU_RAG_REQUIRE_AUTH"] = "1" if args.admin_key: env_updates["TAU_RAG_ADMIN_KEY"] = args.admin_key if args.cors_origins: env_updates["TAU_RAG_CORS_ORIGINS"] = args.cors_origins if args.hsts: env_updates["TAU_RAG_HSTS"] = "1" if args.warmup: env_updates["TAU_RAG_WARMUP"] = "1" if args.log_stdout: env_updates["TAU_RAG_LOG_STDOUT"] = "1" uvicorn_args = { "app": "tau_rag.api.fastapi_app:app", "host": args.host, "port": args.port, "reload": args.reload, "log_level": args.log_level, } if args.dry_run: print("[serve dry-run] env updates:") for k in sorted(env_updates): print(f" {k}={env_updates[k]}") print() print("[serve dry-run] uvicorn args:") for k in sorted(uvicorn_args): print(f" {k}={uvicorn_args[k]}") return 0 # Actually start for k, v in env_updates.items(): _os.environ[k] = v try: import uvicorn except Exception: print("[error] uvicorn not installed. `pip install uvicorn[standard]`", file=sys.stderr) return 2 uvicorn.run(**uvicorn_args) return 0 def cmd_bulk_ingest(args) -> int: """Ingest a JSONL/CSV corpus in-process (no HTTP) via the same logic as ``POST /v1/documents/bulk`` — but running directly against a fresh Pipeline so no server is required.""" from ..api.fastapi_app import _parse_jsonl, _parse_csv from ..api.errors import Limits from ..core.types import Document from ..pipeline import Pipeline if not args.corpus.exists(): print(f"[error] corpus not found: {args.corpus}", file=sys.stderr) return 2 fmt = args.format if not fmt: fmt = "csv" if args.corpus.suffix.lower() == ".csv" else "jsonl" raw = args.corpus.read_text(encoding="utf-8") try: iterator = _parse_csv(raw) if fmt == "csv" else _parse_jsonl(raw) except Exception as e: print(f"[error] parse failed: {e}", file=sys.stderr) return 2 pipe = Pipeline.from_config(_load_config(args)) accepted: list = [] errors: list = [] rows_total = 0 for row_num, obj in iterator: rows_total += 1 if "__error__" in obj: errors.append({"row": row_num, "error": obj["__error__"]}) continue text = obj.get("text") if not isinstance(text, str) or not text: errors.append({"row": row_num, "error": "missing or empty 'text'"}) continue if len(text) > Limits.max_doc_text_len: errors.append({"row": row_num, "error": f"text exceeds {Limits.max_doc_text_len} chars"}) continue accepted.append(Document( id=obj.get("id") or f"row-{row_num}", text=text, metadata=obj.get("metadata") or {}, )) chunks = pipe.add_documents(accepted) if accepted else 0 result = { "format": fmt, "rows_total": rows_total, "accepted": len(accepted), "errors": errors, "added_chunks": chunks, } print(f"format : {fmt}") print(f"rows_total : {rows_total}") print(f"accepted : {len(accepted)}") print(f"errors : {len(errors)}") print(f"added_chunks : {chunks}") if errors[:3]: print("first 3 errors:") for e in errors[:3]: print(f" - row {e['row']}: {e['error']}") if args.snapshot_after: s = pipe.save_snapshot(args.snapshot_after) print(f"snapshot : {s}") if args.json: args.json.parent.mkdir(parents=True, exist_ok=True) args.json.write_text( json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(f"wrote JSON : {args.json}") return 0 if not errors else 1 def cmd_eval_gold(args) -> int: """Run the v1.39 evaluation harness against a JSONL gold set.""" from ..eval import ( GoldCase, load_cases_jsonl, run_eval, render_html, render_diff_html, EvalReport, ) from ..pipeline import Pipeline cases = load_cases_jsonl(args.cases) if not cases: print(f"[error] no cases loaded from {args.cases}", file=sys.stderr) return 2 cfg = _load_config(args) pipe = Pipeline.from_config(cfg) # Optional corpus ingest if args.corpus: from ..core.types import Document with open(args.corpus, encoding="utf-8") as f: docs = [] for line in f: line = line.strip() if not line or line.startswith("#"): continue row = json.loads(line) docs.append(Document(id=row["id"], text=row["text"], metadata=row.get("metadata") or {})) pipe.add_documents(docs) report = run_eval(pipe, cases, k=args.k) # Parse gates: "recall@5=0.7" → {"recall@5": 0.7} gate: dict = {} for spec in args.fail_below: try: name, floor = spec.split("=", 1) gate[name.strip()] = float(floor) except Exception: print(f"[warn] ignoring bad --fail-below {spec!r}", file=sys.stderr) failures = report.fail_below(gate) # v1.87 — upward-bad gates (latency-style): "p95_latency_ms=200" ceiling_gate: dict = {} for spec in getattr(args, "fail_above", []) or []: try: name, ceil = spec.split("=", 1) ceiling_gate[name.strip()] = float(ceil) except Exception: print(f"[warn] ignoring bad --fail-above {spec!r}", file=sys.stderr) failures = failures + report.fail_above(ceiling_gate) # Console summary print(f"cases : {report.n_cases}") print("aggregate :", json.dumps( {k: round(v, 3) for k, v in report.aggregate.items()})) print("latency_ms :", json.dumps( {k: round(v, 1) for k, v in report.latency_ms.items()})) # Combined gate block (v1.87: includes fail_above results) if gate or ceiling_gate: if failures: print("GATE FAILED :") for f in failures: print(f" - {f}") else: print("GATE PASSED") # Persist artifacts if args.json: args.json.parent.mkdir(parents=True, exist_ok=True) args.json.write_text( json.dumps(report.to_dict(), ensure_ascii=False, indent=2), encoding="utf-8", ) print(f"wrote JSON : {args.json}") if args.html: args.html.parent.mkdir(parents=True, exist_ok=True) args.html.write_text( render_html(report, title=f"eval · {args.cases.name}", gate=gate), encoding="utf-8", ) print(f"wrote HTML : {args.html}") # --- diff against baseline regression_failures: list = [] if args.diff: if not args.diff.exists(): print(f"[error] baseline JSON not found: {args.diff}", file=sys.stderr) return 2 baseline = EvalReport.from_dict( json.loads(args.diff.read_text(encoding="utf-8"))) # v1.87 — direction-aware regression: latency going UP is bad, # and we accept a separate latency tolerance (ms-scale jitter # is very different from 0.01-scale recall jitter). upward_bad = list(getattr(args, "upward_bad", []) or []) lat_tol = getattr(args, "latency_tolerance", None) regressions = report.regressions( baseline, tolerance=float(args.diff_tolerance), upward_bad=upward_bad, latency_tolerance=(float(lat_tol) if lat_tol is not None else None), ) if regressions: print("REGRESSIONS (vs baseline):") for r in regressions: print(f" - {r}") regression_failures = regressions else: print(f"no regressions vs baseline ({args.diff.name})") if args.diff_html: args.diff_html.parent.mkdir(parents=True, exist_ok=True) args.diff_html.write_text( render_diff_html(report, baseline, title=f"eval diff · {args.cases.name}"), encoding="utf-8", ) print(f"wrote DIFF : {args.diff_html}") if failures or regression_failures: return 3 return 0 # ----------------------------------------------------------------- entry def main() -> int: ap = argparse.ArgumentParser(prog="tau-rag") # Global ap.add_argument("--config", type=Path) ap.add_argument("--preset", default="no_llm", choices=["mock", "default", "hebrew_legal", "no_llm"]) ap.add_argument("--chunker", default="fixed", choices=["fixed", "sentence", "legal_hebrew"]) sp = ap.add_subparsers(dest="cmd", required=False) # query p = sp.add_parser("query", help="Run one query") p.add_argument("q", nargs="?", help="Query text") p.add_argument("--corpus", type=Path, default=None, help="Path to a .jsonl file or a folder of .txt/.pdf") p.add_argument("--text", action="append", default=[], help="Inline doc: 'id::content' — repeatable") p.add_argument("--demo", action="store_true", help="Use the built-in Hebrew legal mini-corpus") p.add_argument("--k", type=int, default=10) p.add_argument("--rerank-k", type=int, default=5) p.add_argument("--lang", default="he") p.add_argument("--pretty", action="store_true") p.set_defaults(func=cmd_query) # ingest p = sp.add_parser("ingest", help="Index a corpus and persist snapshot") p.add_argument("--source", type=Path, required=True) p.add_argument("--out", type=Path, required=True) p.set_defaults(func=cmd_ingest) # bench p = sp.add_parser("bench", help="Run a case file against a pipeline") p.add_argument("--cases", type=Path, required=True) p.add_argument("--corpus", type=Path) p.add_argument("--snapshot", type=Path, help="Re-use a persisted index instead of --corpus") p.add_argument("--k", type=int, default=10) p.add_argument("--save", type=Path, help="Save result to JSON") p.set_defaults(func=cmd_bench) # eval p = sp.add_parser("eval", help="Synthesize cases + run benchmark") p.add_argument("--corpus", type=Path, default=None) p.add_argument("--text", action="append", default=[], help="Inline doc: 'id::content' — repeatable") p.add_argument("--demo", action="store_true", help="Use the built-in Hebrew legal mini-corpus") p.add_argument("--per-doc", type=int, default=1) p.add_argument("--seed", type=int, default=0) p.add_argument("--k", type=int, default=10) p.set_defaults(func=cmd_eval) # dashboard p = sp.add_parser("dashboard", help="Render HTML from a bench JSON") p.add_argument("--bench-json", type=Path, required=True) p.add_argument("--out", type=Path, default="dashboard.html") p.set_defaults(func=cmd_dashboard) # trace p = sp.add_parser("trace", help="Dump accumulated tracer spans") p.add_argument("--output", type=Path, default="traces.jsonl") p.set_defaults(func=cmd_trace) # info p = sp.add_parser("info", help="Print version and available components") p.set_defaults(func=cmd_info) # run-preset — execute a saved preset from the CLI (no HTTP needed) p = sp.add_parser("run-preset", help="Execute a saved query preset in-process") p.add_argument("name", nargs="?", help="Preset name (omit with --list)") p.add_argument("--list", dest="list_all", action="store_true", help="List all saved presets and exit") p.add_argument("--corpus", type=Path, default=None, help="Optional JSONL corpus to ingest before running") p.add_argument("--snapshot", type=Path, default=None, help="Optional snapshot to load before running " "(alternative to --corpus)") p.add_argument("--json", dest="as_json", action="store_true", help="Emit machine-readable JSON") p.set_defaults(func=cmd_run_preset) # doctor — environment + deps + paths sanity check p = sp.add_parser("doctor", help="Diagnose environment + deps + paths health") p.add_argument("--json", dest="as_json", action="store_true", help="Emit machine-readable JSON; default is human text") p.add_argument("--strict", action="store_true", help="Exit nonzero on warnings, not only errors") p.set_defaults(func=cmd_doctor) # backup — bundle persistence files into a single tar.gz p = sp.add_parser("backup", help="Bundle persistence files into a tar.gz archive") p.add_argument("--out", type=Path, required=True, help="Output .tar.gz path") p.add_argument("--snapshot", type=Path, default=None, help="Snapshot JSONL path (default: $TAU_RAG_SNAPSHOT_PATH)") p.add_argument("--synonyms", type=Path, default=None, help="Synonyms JSONL path (default: $TAU_RAG_SYNONYMS_PATH)") p.add_argument("--presets", type=Path, default=None, help="Query presets JSONL path (default: $TAU_RAG_QUERY_PRESETS_PATH)") p.add_argument("--keys", type=Path, default=None, help="API keys JSON path (default: $TAU_RAG_KEYS_PATH)") p.set_defaults(func=cmd_backup) # restore — inverse of backup p = sp.add_parser("restore", help="Restore persistence files from a tar.gz archive") p.add_argument("--in", dest="archive", type=Path, required=True, help="Archive .tar.gz to restore from") p.add_argument("--dest-dir", type=Path, default=None, help="Override destination directory (default: restore to " "same paths they were backed up from).") p.add_argument("--dry-run", action="store_true", help="List the archive contents without extracting.") p.set_defaults(func=cmd_restore) # serve — one-shot server launcher (wraps uvicorn + env) p = sp.add_parser("serve", help="Run the FastAPI server with a single command") p.add_argument("--host", default="127.0.0.1", help="Bind host (default 127.0.0.1; use 0.0.0.0 for LAN)") p.add_argument("--port", type=int, default=8000) p.add_argument("--preset", default="no_llm", choices=["mock", "default", "hebrew_legal", "no_llm", "hebrew_dense"]) p.add_argument("--snapshot-path", type=Path, default=None, help="Path for auto-restore on startup + auto-save on shutdown.") p.add_argument("--snapshot-interval", type=float, default=None, help="Seconds between periodic auto-snapshots (crash-proof).") p.add_argument("--synonyms-path", type=Path, default=None, help="Path for synonym autoload/autosave.") p.add_argument("--require-auth", action="store_true", help="Require X-API-Key for protected endpoints.") p.add_argument("--admin-key", default=None, help="Pre-seeded admin API key (otherwise bootstrap_admin runs).") p.add_argument("--cors-origins", default=None, help="Comma-separated CORS allow-origins.") p.add_argument("--hsts", action="store_true", help="Enable Strict-Transport-Security header.") p.add_argument("--warmup", action="store_true", help="Pre-load embedders/tokenizers on startup (v1.56).") p.add_argument("--log-stdout", action="store_true", help="Emit every request+audit as a JSON line on stdout " "(for Datadog/Loki/ELK log aggregators).") p.add_argument("--reload", action="store_true", help="uvicorn --reload (dev only).") p.add_argument("--log-level", default="info", choices=["critical", "error", "warning", "info", "debug"]) p.add_argument("--dry-run", action="store_true", help="Print the effective env + uvicorn args; do not bind.") p.set_defaults(func=cmd_serve) # bulk-ingest — pipe a JSONL / CSV corpus into the pipeline p = sp.add_parser("bulk-ingest", help="Ingest a JSONL or CSV corpus with per-row errors") p.add_argument("--corpus", type=Path, required=True, help="Path to JSONL or CSV. Format inferred from extension.") p.add_argument("--format", choices=["jsonl", "csv"], default=None, help="Override format inference.") p.add_argument("--snapshot-after", type=Path, default=None, help="Write a snapshot to this path after ingest.") p.add_argument("--json", type=Path, default=None, help="Write the ingest result JSON to this path.") p.set_defaults(func=cmd_bulk_ingest) # eval-gold — run v1.39 harness against a gold JSONL + render HTML p = sp.add_parser("eval-gold", help="Run evaluation harness on a JSONL gold set") p.add_argument("--cases", type=Path, required=True, help="Path to gold cases JSONL (see eval/gold_hebrew_legal.jsonl)") p.add_argument("--corpus", type=Path, default=None, help="Optional JSONL corpus to index first (one {id,text} per line)") p.add_argument("--k", type=int, default=5) p.add_argument("--html", type=Path, default=None, help="Where to write the HTML report (optional)") p.add_argument("--json", type=Path, default=None, help="Where to write the JSON report (optional)") p.add_argument("--fail-below", action="append", default=[], metavar="METRIC=FLOOR", help='Regression gate, e.g. "recall@5=0.7". Repeatable.') p.add_argument("--fail-above", action="append", default=[], metavar="METRIC=CEILING", help='Upward-bad gate for latency-style metrics ' '(v1.87), e.g. "p95_latency_ms=200". ' 'Names ending in "_latency_ms" or starting with ' '"latency_ms." resolve into latency_ms block. ' 'Repeatable.') p.add_argument("--upward-bad", action="append", default=[], metavar="METRIC", help='Mark a metric as upward-bad for --diff ' 'regression detection (v1.87). Latency metrics ' '(name ends "_latency_ms" / starts ' '"latency_ms.") are treated as upward-bad ' 'automatically — only add this for non-standard ' 'metrics. Repeatable.') p.add_argument("--latency-tolerance", type=float, default=None, metavar="MS", help='Tolerance in ms for latency-style regressions ' '(v1.87). Overrides --diff-tolerance for metrics ' 'in the latency family.') p.add_argument("--diff", type=Path, default=None, help="Baseline JSON (from a previous --json run) to diff " "against. Fails non-zero if any metric regresses.") p.add_argument("--diff-html", type=Path, default=None, help="Where to write the diff HTML report (requires --diff).") p.add_argument("--diff-tolerance", type=float, default=0.01, help="Ignore regressions within this absolute delta " "(default 0.01 to absorb minor run-to-run jitter).") p.set_defaults(func=cmd_eval_gold) # ab p = sp.add_parser("ab", help="A/B compare two pipeline configs on cases") p.add_argument("--cases", type=Path, required=True) p.add_argument("--config-a", type=Path, default=None) p.add_argument("--config-b", type=Path, default=None) p.add_argument("--preset-a", default="no_llm", choices=["mock", "default", "hebrew_legal", "no_llm", "hebrew_dense"]) p.add_argument("--preset-b", default="hebrew_dense", choices=["mock", "default", "hebrew_legal", "no_llm", "hebrew_dense"]) p.add_argument("--label-a", default="A") p.add_argument("--label-b", default="B") p.add_argument("--corpus", type=Path, default=None) p.add_argument("--text", action="append", default=[]) p.add_argument("--demo", action="store_true") p.add_argument("--k", type=int, default=5) p.add_argument("--html", type=Path, default=None) p.set_defaults(func=cmd_ab) # snapshot-diff (v1.77) p = sp.add_parser("snapshot-diff", help="Diff two snapshots — added/removed/modified doc IDs") p.add_argument("a", help="Snapshot A (the \"before\")") p.add_argument("b", help="Snapshot B (the \"after\")") p.add_argument("--json", dest="as_json", action="store_true", help="Emit machine-readable JSON") p.add_argument("--details", action="store_true", help="Include old/new lengths + hashes for modified docs") p.set_defaults(func=cmd_snapshot_diff) # ---- backward-compat: `python -m tau_rag.api.cli "query"` without subcmd args, extra = ap.parse_known_args() if args.cmd is None: if extra: # fallback: treat first positional as query for legacy callers import shlex new = ["query", " ".join(extra)] args = ap.parse_args(new + (["--preset", args.preset] if args.preset else [])) else: ap.print_help() return 0 try: return args.func(args) except (FileNotFoundError, ValueError) as e: print(f"[error] {e}", file=sys.stderr) return 2 if __name__ == "__main__": raise SystemExit(main())