"""Ingest a directory tree into a running tau-rag instance. Walks the directory recursively, uploads each supported file (pdf/docx/txt/md/html/csv/jsonl/parquet/pkl) by calling POST /v1/data/ingest_dir on the local server. Usage: # Preview what would be ingested (no upload): python -m scripts.ingest_dir /path/to/data --dry-run # Real ingest: python -m scripts.ingest_dir /path/to/data # Limit file types or count: python -m scripts.ingest_dir /path/to/data \\ --extensions .pdf .docx .txt --max-files 1000 # Custom server / chunking: python -m scripts.ingest_dir /path/to/data \\ --server http://localhost:8000 \\ --chunk-size 2000 --chunk-overlap 250 """ from __future__ import annotations import argparse import json import sys from typing import List, Optional try: import requests except ImportError: print("ERROR: requests not installed. pip install requests", file=sys.stderr) sys.exit(2) def main(argv: Optional[List[str]] = None) -> int: p = argparse.ArgumentParser( description="Ingest a directory into tau-rag") p.add_argument("path", help="server-side directory to ingest") p.add_argument("--server", default="http://localhost:8000", help="tau-rag base URL (default: http://localhost:8000)") p.add_argument("--api-key", default="", help="API key (X-API-Key header)") p.add_argument("--no-recursive", action="store_true", help="don't descend into subdirs") p.add_argument("--chunk-size", type=int, default=1500) p.add_argument("--chunk-overlap", type=int, default=200) p.add_argument("--max-files", type=int, default=None) p.add_argument("--extensions", nargs="*", default=None, help="whitelist (e.g. .pdf .docx .txt)") p.add_argument("--dry-run", action="store_true", help="scan and report only; no ingest") p.add_argument("--save", default=None, help="save full per-file report to JSON file") args = p.parse_args(argv) payload = { "path": args.path, "recursive": not args.no_recursive, "chunk_size": args.chunk_size, "chunk_overlap": args.chunk_overlap, "max_files": args.max_files, "extensions": args.extensions, "dry_run": args.dry_run, } headers = {"Content-Type": "application/json"} if args.api_key: headers["X-API-Key"] = args.api_key url = args.server.rstrip("/") + "/v1/data/ingest_dir" print(f"→ POST {url}") print(f" path: {args.path}") print(f" recursive: {payload['recursive']}, " f"chunk_size: {args.chunk_size}, " f"chunk_overlap: {args.chunk_overlap}") if args.dry_run: print(" (dry-run — nothing will be ingested)") print() try: r = requests.post(url, headers=headers, json=payload, timeout=3600) except requests.exceptions.ConnectionError as e: print(f"✗ Cannot connect to {args.server} — is the server running?", file=sys.stderr) print(f" ({e})", file=sys.stderr) return 3 except requests.exceptions.Timeout: print(f"✗ Request timed out (the directory might be huge — try --max-files)", file=sys.stderr) return 4 if r.status_code != 200: print(f"✗ HTTP {r.status_code}: {r.text[:300]}", file=sys.stderr) return r.status_code data = r.json() if args.dry_run: print(f"📋 DRY RUN — would ingest {data['n_files_found']} files") if data.get("by_kind"): print("\nBy kind:") for k, v in sorted(data["by_kind"].items(), key=lambda x: -x[1]): print(f" {k:10s} {v:>6}") if data.get("files"): print(f"\nFirst {min(20, len(data['files']))} files:") for f in data["files"][:20]: print(f" {f}") if data.get("truncated"): print(f" ... and more (truncated at 200)") return 0 print(f"✓ Done — {data['n_files_ok']}/{data['n_files_found']} files OK" f" ({data['n_files_failed']} failed)") print(f" 📄 Total docs parsed: {data['n_total_docs']:,}") print(f" 📚 Total docs indexed in pipeline: {data['n_total_indexed']:,}") print(f" Pipeline attached: {data['pipeline_attached']}") failed = [r for r in data["per_file"] if not r["ok"]] if failed: print(f"\n⚠ Failed files (showing first 10):") for r in failed[:10]: print(f" ✗ {r['file']:60s} {r['error']}") by_kind: dict = {} for r in data["per_file"]: if r["ok"]: by_kind[r["kind"]] = by_kind.get(r["kind"], 0) + r["n_docs"] if by_kind: print("\nDocs ingested by kind:") for k, v in sorted(by_kind.items(), key=lambda x: -x[1]): print(f" {k:10s} {v:>10,}") if args.save: with open(args.save, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f"\n💾 Full report saved to {args.save}") return 0 if __name__ == "__main__": sys.exit(main())