"""Standalone ingester — walks a directory, parses every supported file, and writes a JSONL corpus. Does NOT depend on a running server. Output JSONL format (one chunk per line): {"id": "...", "text": "...", "metadata": {...}} Progress is printed live. Errors per-file are caught and logged — one bad PDF won't kill the whole run. Usage: python -m scripts.ingest_local /path/to/data \\ --output corpus.jsonl python -m scripts.ingest_local /path/to/data --dry-run After ingest: # Inspect: head -1 corpus.jsonl | python3 -m json.tool wc -l corpus.jsonl # Upload to running tau-rag (optional): curl -X POST http://localhost:8000/v1/data/upload -F "file=@corpus.jsonl" """ from __future__ import annotations import argparse import json import os import re import sys import time import traceback from pathlib import Path from typing import Dict, List, Optional # ============================================ extension → kind table EXT_TO_KIND = { ".parquet": "parquet", ".pq": "parquet", ".csv": "csv", ".tsv": "csv", ".jsonl": "jsonl", ".ndjson": "jsonl", ".json": "jsonl", ".pkl": "pkl", ".pickle": "pkl", ".txt": "txt", ".text": "txt", ".log": "txt", ".md": "md", ".markdown": "md", ".yaml": "txt", ".yml": "txt", ".pdf": "pdf", ".docx": "docx", ".pptx": "pptx", ".xlsx": "xlsx", ".html": "html", ".htm": "html", } # ============================================ chunker def chunk_text(text: str, target_chars: int = 1500, overlap_chars: int = 200) -> List[str]: text = (text or "").strip() if not text: return [] if len(text) <= target_chars: return [text] sentences = re.split(r'(?<=[.!?׃])\s+|\n{2,}', text) chunks: List[str] = [] cur = "" for s in sentences: s = s.strip() if not s: continue if len(cur) + len(s) + 1 <= target_chars: cur = (cur + " " + s).strip() if cur else s else: if cur: chunks.append(cur) if len(s) > target_chars: for i in range(0, len(s), target_chars - overlap_chars): chunks.append(s[i:i + target_chars]) cur = "" else: cur = s if cur: chunks.append(cur) return chunks # ============================================ extractors def extract_pdf(path: str) -> str: try: from pypdf import PdfReader except ImportError: from PyPDF2 import PdfReader # noqa r = PdfReader(path) return "\n\n".join( (p.extract_text() or "") for p in r.pages) def extract_docx(path: str) -> str: from docx import Document return "\n\n".join( p.text for p in Document(path).paragraphs if p.text.strip()) def extract_pptx(path: str) -> str: from pptx import Presentation parts = [] for i, slide in enumerate(Presentation(path).slides, 1): sp = [] for sh in slide.shapes: if hasattr(sh, "text") and sh.text: sp.append(sh.text) if sp: parts.append(f"## Slide {i}\n" + "\n".join(sp)) return "\n\n".join(parts) def extract_xlsx(path: str) -> str: from openpyxl import load_workbook wb = load_workbook(path, data_only=True, read_only=True) parts = [] for ws in wb.worksheets: sp = [f"## Sheet: {ws.title}"] for row in ws.iter_rows(values_only=True): cells = [str(c) for c in row if c is not None and str(c).strip()] if cells: sp.append(" | ".join(cells)) if len(sp) > 1: parts.append("\n".join(sp)) return "\n\n".join(parts) def extract_html(path: str) -> str: from html.parser import HTMLParser text = Path(path).read_text(encoding="utf-8", errors="replace") text = re.sub(r'<(script|style)[^>]*>.*?', ' ', text, flags=re.DOTALL | re.IGNORECASE) class S(HTMLParser): def __init__(self): super().__init__() self.parts = [] def handle_data(self, data): if data.strip(): self.parts.append(data) p = S() try: p.feed(text) except Exception: pass return re.sub(r'\s+', ' ', " ".join(p.parts)).strip() # ============================================ parse a single file def parse_file(path: Path, root: Path, chunk_size: int, overlap: int): """Parse one file → list of {id, text, metadata}.""" rel = str(path.relative_to(root)) ext = path.suffix.lower() kind = EXT_TO_KIND.get(ext, "txt") docs = [] # Tabular formats — each row is a doc if kind == "parquet": try: import pyarrow.parquet as pq pf = pq.ParquetFile(str(path)) # Detect text/id columns once schema_cols = set(pf.schema.names) text_col = next( (c for c in ("text", "chunk", "content") if c in schema_cols), None) id_col = "id" if "id" in schema_cols else None if not text_col: return [] # no text column → useless for RAG cols = [c for c in (text_col, id_col) if c] i = 0 # Stream in batches of 1000 rows to avoid loading # the whole table into memory. for batch in pf.iter_batches( batch_size=1000, columns=cols): bd = batch.to_pydict() texts = bd.get(text_col, []) ids = bd.get(id_col, []) if id_col else [] for j, t in enumerate(texts): if not t: i += 1 continue doc_id = (str(ids[j]) if id_col and j < len(ids) else f"{rel}-{i}") docs.append({ "id": doc_id, "text": str(t), "metadata": {"source": rel, "kind": "parquet"}, }) i += 1 except ImportError: import pandas as pd for chunk_df in pd.read_parquet( str(path), engine="pyarrow", chunksize=None).itertuples(): # fallback pass # pandas fallback rare; skip return docs if kind == "csv": import csv delim = "\t" if ext == ".tsv" else "," with open(path, encoding="utf-8") as f: for i, row in enumerate(csv.DictReader( f, delimiter=delim)): t = (row.get("text") or row.get("chunk") or row.get("content") or "") if not t: continue docs.append({ "id": str(row.get("id", f"{rel}-{i}")), "text": t, "metadata": {"source": rel, "kind": "csv"}, }) return docs if kind == "jsonl": for i, line in enumerate(path.read_text( encoding="utf-8", errors="replace").splitlines()): line = line.strip() if not line: continue try: row = json.loads(line) except Exception: continue if not isinstance(row, dict): continue t = (row.get("text") or row.get("chunk") or row.get("content") or "") if not t: continue docs.append({ "id": str(row.get("id", f"{rel}-{i}")), "text": str(t), "metadata": {"source": rel, "kind": "jsonl"}, }) return docs if kind == "pkl": import pickle with open(path, "rb") as f: data = pickle.load(f) rows = (data if isinstance(data, list) else [data] if isinstance(data, dict) else []) for i, row in enumerate(rows): if isinstance(row, str): docs.append({ "id": f"{rel}-{i}", "text": row, "metadata": {"source": rel, "kind": "pkl"}}) elif isinstance(row, dict): t = (row.get("text") or row.get("chunk") or row.get("content") or "") if t: docs.append({ "id": str(row.get("id", f"{rel}-{i}")), "text": str(t), "metadata": {"source": rel, "kind": "pkl"}}) return docs # Free-text formats — one text → many chunks if kind in ("txt", "md"): text = path.read_text(encoding="utf-8", errors="replace") elif kind == "pdf": text = extract_pdf(str(path)) elif kind == "docx": text = extract_docx(str(path)) elif kind == "pptx": text = extract_pptx(str(path)) elif kind == "xlsx": text = extract_xlsx(str(path)) elif kind == "html": text = extract_html(str(path)) else: return [] chunks = chunk_text(text, chunk_size, overlap) for i, ch in enumerate(chunks): docs.append({ "id": f"{rel}-{i}", "text": ch, "metadata": {"source": rel, "kind": kind, "chunk": i, "n_chunks": len(chunks)}, }) return docs # ============================================ main walker def main(argv: Optional[List[str]] = None) -> int: p = argparse.ArgumentParser( description="Standalone corpus ingester — no server needed") p.add_argument("path", help="root directory to scan") p.add_argument("--output", "-o", default="corpus.jsonl", help="output JSONL file (default: corpus.jsonl)") p.add_argument("--chunk-size", type=int, default=1500) p.add_argument("--overlap", type=int, default=200) p.add_argument("--max-files", type=int, default=None) p.add_argument("--extensions", nargs="*", default=None, help="whitelist (e.g. .pdf .docx .txt)") p.add_argument("--dry-run", action="store_true", help="scan only, don't parse") p.add_argument("--errors", default=None, help="write errored files to this path") p.add_argument("--max-file-size-mb", type=float, default=200, help="skip files bigger than this (default: 200MB)") p.add_argument("--max-chunks-per-file", type=int, default=10000, help="cap chunks per file (default: 10000)") p.add_argument("--resume", action="store_true", help="if output exists, skip files already in it") args = p.parse_args(argv) root = Path(args.path).expanduser().resolve() if not root.exists(): print(f"✗ Path does not exist: {root}", file=sys.stderr) return 2 if not root.is_dir(): print(f"✗ Not a directory: {root}", file=sys.stderr) return 2 allowed = set(EXT_TO_KIND.keys()) if args.extensions: allowed = {e.lower() if e.startswith(".") else "." + e.lower() for e in args.extensions} print(f"→ Scanning {root}...") files: List[Path] = [] skip_count = 0 try: for p_ in root.rglob("*"): try: if not p_.is_file(): continue except (PermissionError, OSError): skip_count += 1 continue # Skip dotfiles / dotdirs try: rel_parts = p_.parts[len(root.parts):] except Exception: continue if any(pt.startswith(".") for pt in rel_parts): continue if p_.suffix.lower() not in allowed: continue files.append(p_) if args.max_files and len(files) >= args.max_files: break except (PermissionError, OSError) as e: print(f"⚠ Stopped scanning: {e}", file=sys.stderr) files.sort() by_kind: Dict[str, int] = {} for f in files: k = EXT_TO_KIND.get(f.suffix.lower(), "?") by_kind[k] = by_kind.get(k, 0) + 1 print(f"\n📋 Found {len(files):,} files " f"({skip_count} unreadable, skipped)") for k, v in sorted(by_kind.items(), key=lambda x: -x[1]): print(f" {k:10s} {v:>6,}") print() if args.dry_run: print("(dry-run — exiting without parsing)") return 0 # Open output JSONL (append in resume mode) out_path = Path(args.output).resolve() out_path.parent.mkdir(parents=True, exist_ok=True) max_bytes = int(args.max_file_size_mb * 1024 * 1024) cap_chunks = args.max_chunks_per_file # Resume support: read existing JSONL and collect processed sources seen_sources: set = set() if args.resume and out_path.exists(): print(f"→ Resume mode: scanning {out_path} for " f"already-processed sources...") with open(out_path, encoding="utf-8") as f: for line in f: try: d = json.loads(line) src = d.get("metadata", {}).get("source") if src: seen_sources.add(src) except Exception: continue print(f" {len(seen_sources):,} sources already done — " f"will skip them.") open_mode = "a" if args.resume and out_path.exists() else "w" print(f"→ Writing to {out_path} (mode: {open_mode})") print(f" max file size: {args.max_file_size_mb} MB | " f"max chunks/file: {cap_chunks:,}") err_lines: List[str] = [] n_total_docs = 0 n_files_ok = 0 n_files_failed = 0 n_skipped_too_big = 0 n_skipped_resume = 0 n_capped = 0 t0 = time.time() last_print = t0 with open(out_path, open_mode, encoding="utf-8") as out_f: for i, fp in enumerate(files, 1): rel = str(fp.relative_to(root)) # Skip already done in resume if rel in seen_sources: n_skipped_resume += 1 continue # Skip oversized files try: size = fp.stat().st_size except Exception: size = 0 if size > max_bytes: n_skipped_too_big += 1 err_lines.append( f"{rel}\tSKIPPED: {size/1024/1024:.1f}MB > " f"{args.max_file_size_mb}MB") continue try: docs = parse_file( fp, root, args.chunk_size, args.overlap) if len(docs) > cap_chunks: docs = docs[:cap_chunks] n_capped += 1 for d in docs: out_f.write(json.dumps( d, ensure_ascii=False) + "\n") # Flush every 100 files to avoid losing progress if i % 100 == 0: out_f.flush() n_total_docs += len(docs) n_files_ok += 1 except Exception as e: n_files_failed += 1 err_lines.append( f"{rel}\t{type(e).__name__}: {e}") now = time.time() if now - last_print >= 1.0 or i == len(files): elapsed = now - t0 rate = i / elapsed if elapsed > 0 else 0 eta = ((len(files) - i) / rate if rate > 0 else 0) pct = i / len(files) * 100 # Compact line — fixed width, no overflow line = (f" [{pct:5.1f}%] {i:>5,}/{len(files):,} " f"| chunks {n_total_docs:>9,} " f"| {rate:5.1f}/s " f"| ETA {eta:5.0f}s " f"| fail {n_files_failed:>3} " f"| skip {n_skipped_too_big + n_skipped_resume:>3}") # Pad to fixed width to overwrite previous line print(f"\r{line:<100}", end="", flush=True) last_print = now print() elapsed = time.time() - t0 print() print(f"✓ Done in {elapsed:.0f}s") print(f" Files OK: {n_files_ok:,}") print(f" Files failed: {n_files_failed}") print(f" Files too big: {n_skipped_too_big}") print(f" Files resumed: {n_skipped_resume}") print(f" Files capped: {n_capped}") print(f" Total chunks: {n_total_docs:,}") print(f" Output: {out_path} " f"({out_path.stat().st_size / 1024 / 1024:.1f} MB)") if args.errors and err_lines: Path(args.errors).write_text( "\n".join(err_lines), encoding="utf-8") print(f" Errors saved to {args.errors}") elif err_lines: print(f"\n⚠ First 5 errors:") for line in err_lines[:5]: print(f" {line}") print() print(f"📤 Next step — upload to tau-rag:") print(f" curl -X POST http://localhost:8000/v1/data/upload " f"\\\n -F \"file=@{out_path}\"") return 0 if n_files_failed == 0 else 1 if __name__ == "__main__": sys.exit(main())