GitHub Actions
fix: 0 test failures; FileService; real RagService; emergency probe; chat return
4aaae80
Raw
History Blame
4.41 kB
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class Chunk:
text: str
metadata: dict # {doc_cid, doc_title, page, chunk_index, language}
def chunk_text(
text: str,
*,
chunk_size: int = 512,
overlap: int = 64,
metadata: dict | None = None,
) -> list[Chunk]:
"""Split text using sliding window measured in approximate tokens (chars/4).
Respects paragraph boundaries (double newline) where possible, else word
boundaries.
"""
meta = metadata or {}
approx_tokens = len(text) // 4
if approx_tokens <= chunk_size:
return [Chunk(text=text, metadata=meta)]
# Split on paragraph boundaries first
paragraphs = text.split("\n\n")
chunks: list[Chunk] = []
current_parts: list[str] = []
current_tokens = 0
def flush(parts: list[str]) -> str:
return "\n\n".join(parts).strip()
for para in paragraphs:
para_tokens = len(para) // 4
if current_tokens + para_tokens > chunk_size and current_parts:
chunk_text_val = flush(current_parts)
if chunk_text_val:
chunks.append(Chunk(text=chunk_text_val, metadata=meta))
# Carry overlap: keep tail words from current
overlap_chars = overlap * 4
tail = (
chunk_text_val[-overlap_chars:]
if overlap_chars < len(chunk_text_val)
else chunk_text_val
)
# Find word boundary at start of tail
space_idx = tail.find(" ")
if space_idx != -1:
tail = tail[space_idx + 1 :]
current_parts = [tail] if tail else []
current_tokens = len(tail) // 4
if para_tokens > chunk_size:
# Para itself too large — split at word boundaries
words = para.split(" ")
word_buf: list[str] = []
word_tokens = 0
for word in words:
wt = (len(word) + 1) // 4 or 1
if word_tokens + wt > chunk_size and word_buf:
chunk_text_val = " ".join(word_buf).strip()
if chunk_text_val:
chunks.append(Chunk(text=chunk_text_val, metadata=meta))
# overlap
overlap_chars = overlap * 4
tail_words = " ".join(word_buf)
tail = (
tail_words[-overlap_chars:]
if overlap_chars < len(tail_words)
else tail_words
)
space_idx = tail.find(" ")
if space_idx != -1:
tail = tail[space_idx + 1 :]
word_buf = tail.split(" ") if tail else []
word_tokens = len(tail) // 4
word_buf.append(word)
word_tokens += wt
remaining = " ".join(word_buf).strip()
if remaining:
current_parts.append(remaining)
current_tokens += len(remaining) // 4
else:
current_parts.append(para)
current_tokens += para_tokens
# Flush remainder
if current_parts:
chunk_text_val = flush(current_parts)
if chunk_text_val:
chunks.append(Chunk(text=chunk_text_val, metadata=meta))
return chunks if chunks else [Chunk(text=text, metadata=meta)]
def chunk_pdf(pdf_bytes: bytes, *, doc_metadata: dict) -> list[Chunk]:
"""Extract text per page using pypdf, then chunk_text per page.
Falls back to treating as plain text if pypdf not installed.
"""
try:
import io
import pypdf # type: ignore[import-untyped]
reader = pypdf.PdfReader(io.BytesIO(pdf_bytes))
all_chunks: list[Chunk] = []
for page_num, page in enumerate(reader.pages):
page_text = page.extract_text() or ""
if not page_text.strip():
continue
meta = {**doc_metadata, "page": page_num, "language": "unknown"}
page_chunks = chunk_text(page_text, metadata=meta)
all_chunks.extend(page_chunks)
return all_chunks
except ImportError:
# Fallback: treat bytes as UTF-8 text
text = pdf_bytes.decode("utf-8", errors="replace")
return chunk_text(text, metadata=doc_metadata)