Spaces:
Running on Zero
Running on Zero
GitHub Actions
fix: 0 test failures; FileService; real RagService; emergency probe; chat return
4aaae80 | from __future__ import annotations | |
| from dataclasses import dataclass | |
| class Chunk: | |
| text: str | |
| metadata: dict # {doc_cid, doc_title, page, chunk_index, language} | |
| def chunk_text( | |
| text: str, | |
| *, | |
| chunk_size: int = 512, | |
| overlap: int = 64, | |
| metadata: dict | None = None, | |
| ) -> list[Chunk]: | |
| """Split text using sliding window measured in approximate tokens (chars/4). | |
| Respects paragraph boundaries (double newline) where possible, else word | |
| boundaries. | |
| """ | |
| meta = metadata or {} | |
| approx_tokens = len(text) // 4 | |
| if approx_tokens <= chunk_size: | |
| return [Chunk(text=text, metadata=meta)] | |
| # Split on paragraph boundaries first | |
| paragraphs = text.split("\n\n") | |
| chunks: list[Chunk] = [] | |
| current_parts: list[str] = [] | |
| current_tokens = 0 | |
| def flush(parts: list[str]) -> str: | |
| return "\n\n".join(parts).strip() | |
| for para in paragraphs: | |
| para_tokens = len(para) // 4 | |
| if current_tokens + para_tokens > chunk_size and current_parts: | |
| chunk_text_val = flush(current_parts) | |
| if chunk_text_val: | |
| chunks.append(Chunk(text=chunk_text_val, metadata=meta)) | |
| # Carry overlap: keep tail words from current | |
| overlap_chars = overlap * 4 | |
| tail = ( | |
| chunk_text_val[-overlap_chars:] | |
| if overlap_chars < len(chunk_text_val) | |
| else chunk_text_val | |
| ) | |
| # Find word boundary at start of tail | |
| space_idx = tail.find(" ") | |
| if space_idx != -1: | |
| tail = tail[space_idx + 1 :] | |
| current_parts = [tail] if tail else [] | |
| current_tokens = len(tail) // 4 | |
| if para_tokens > chunk_size: | |
| # Para itself too large — split at word boundaries | |
| words = para.split(" ") | |
| word_buf: list[str] = [] | |
| word_tokens = 0 | |
| for word in words: | |
| wt = (len(word) + 1) // 4 or 1 | |
| if word_tokens + wt > chunk_size and word_buf: | |
| chunk_text_val = " ".join(word_buf).strip() | |
| if chunk_text_val: | |
| chunks.append(Chunk(text=chunk_text_val, metadata=meta)) | |
| # overlap | |
| overlap_chars = overlap * 4 | |
| tail_words = " ".join(word_buf) | |
| tail = ( | |
| tail_words[-overlap_chars:] | |
| if overlap_chars < len(tail_words) | |
| else tail_words | |
| ) | |
| space_idx = tail.find(" ") | |
| if space_idx != -1: | |
| tail = tail[space_idx + 1 :] | |
| word_buf = tail.split(" ") if tail else [] | |
| word_tokens = len(tail) // 4 | |
| word_buf.append(word) | |
| word_tokens += wt | |
| remaining = " ".join(word_buf).strip() | |
| if remaining: | |
| current_parts.append(remaining) | |
| current_tokens += len(remaining) // 4 | |
| else: | |
| current_parts.append(para) | |
| current_tokens += para_tokens | |
| # Flush remainder | |
| if current_parts: | |
| chunk_text_val = flush(current_parts) | |
| if chunk_text_val: | |
| chunks.append(Chunk(text=chunk_text_val, metadata=meta)) | |
| return chunks if chunks else [Chunk(text=text, metadata=meta)] | |
| def chunk_pdf(pdf_bytes: bytes, *, doc_metadata: dict) -> list[Chunk]: | |
| """Extract text per page using pypdf, then chunk_text per page. | |
| Falls back to treating as plain text if pypdf not installed. | |
| """ | |
| try: | |
| import io | |
| import pypdf # type: ignore[import-untyped] | |
| reader = pypdf.PdfReader(io.BytesIO(pdf_bytes)) | |
| all_chunks: list[Chunk] = [] | |
| for page_num, page in enumerate(reader.pages): | |
| page_text = page.extract_text() or "" | |
| if not page_text.strip(): | |
| continue | |
| meta = {**doc_metadata, "page": page_num, "language": "unknown"} | |
| page_chunks = chunk_text(page_text, metadata=meta) | |
| all_chunks.extend(page_chunks) | |
| return all_chunks | |
| except ImportError: | |
| # Fallback: treat bytes as UTF-8 text | |
| text = pdf_bytes.decode("utf-8", errors="replace") | |
| return chunk_text(text, metadata=doc_metadata) | |