legal-eye / tau_rag /chunking /legal_hebrew.py
Legal-i's picture
Initial deploy: legal-eye Hebrew legal RAG (17K corpus, verbatim-from-precedent)
3be54c6 verified
"""Legal-Hebrew chunker.
Splits on common Hebrew legal section boundaries: סעיף N, פרק N, תת-סעיף N,
(א)/(ב)/(ג). Keeps each numbered unit as its own chunk and attaches the
section label to metadata — so the retriever can later filter by section
and the generator can cite "סעיף 5".
Fallback: if no legal markers are found, falls back to sentence_chunker.
"""
from __future__ import annotations
import re
from typing import List, Tuple
from ..core.types import Chunk, Document
from .sentence import sentence_chunker
# Match "סעיף 5", "סעיף 5א", "פרק ב", "(א)", "(1)", "תת-סעיף 3",
# "תקנה 3", "פקודה 7", "חוק 12" — all common legal reference units.
_SECTION_RE = re.compile(
r"(?m)^\s*(?:"
r"(?:סעיף|פרק|תת-סעיף|סימן|תקנה|פקודה|תקנות)\s+"
r"[\u0590-\u05FF0-9\"'\.]+|" # Hebrew unit + id
r"\([\u0590-\u05FF0-9]+\)|" # (א) / (1)
r"\d+\.\d+\.?\s|" # 1.2 or 1.2.
r"\[\d+\]" # [1]
r")\s*"
)
def _split_by_sections(text: str) -> List[Tuple[str, str, int]]:
"""Return [(label, body, start_offset), ...]. label is the section header."""
matches = list(_SECTION_RE.finditer(text))
if not matches:
return []
out: List[Tuple[str, str, int]] = []
for i, m in enumerate(matches):
start = m.start()
end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
header = m.group().strip()
body = text[m.end():end].strip()
if body:
out.append((header, body, start))
return out
def legal_hebrew_chunker(
doc: Document,
chunk_size: int = 800,
min_section_chars: int = 8,
) -> List[Chunk]:
"""Primary: section-aware. Fallback: sentence_chunker."""
sections = _split_by_sections(doc.text)
if not sections:
return sentence_chunker(doc, chunk_size=chunk_size)
chunks: List[Chunk] = []
idx = 0
for label, body, offset in sections:
# If a section is very long, split it into sub-chunks at sentence boundaries
if len(body) > chunk_size:
# Reuse sentence_chunker on a temp Document
sub_doc = Document(id=doc.id, text=body, metadata={
**doc.metadata, "section": label,
})
sub_chunks = sentence_chunker(sub_doc, chunk_size=chunk_size)
for sc in sub_chunks:
chunks.append(Chunk(
doc_id=doc.id,
chunk_id=f"{doc.id}::{idx}",
text=sc.text,
start=offset + sc.start,
end=offset + sc.end,
metadata={**doc.metadata, "section": label},
))
idx += 1
else:
if len(body) < min_section_chars and chunks:
# tiny section — append to previous chunk
prev = chunks[-1]
prev.text = (prev.text + "\n" + label + " " + body).strip()
prev.end = offset + len(body)
continue
chunks.append(Chunk(
doc_id=doc.id,
chunk_id=f"{doc.id}::{idx}",
text=body,
start=offset,
end=offset + len(body),
metadata={**doc.metadata, "section": label},
))
idx += 1
return chunks or sentence_chunker(doc, chunk_size=chunk_size)