| # chunk_manager.py |
|
|
| import re |
| from pathlib import Path |
| from text_cleaner import smart_punctuate # Assuming you've extracted this already |
|
|
| import json |
|
|
| def save_chunks_to_json(chunks, output_path): |
| """Save enriched chunk list to a JSON file""" |
| with open(output_path, 'w', encoding='utf-8') as f: |
| json.dump(chunks, f, indent=2, ensure_ascii=False) |
| print(f"✅ Saved {len(chunks)} chunks to: {output_path}") |
|
|
|
|
| def break_long_sentence(sentence, max_words): |
| break_patterns = [ |
| r'(,\s+and\s+)', r'(,\s+but\s+)', r'(,\s+)', r'(;\s*)', r'—', r'(\.\s*")', |
| ] |
| chunks = [] |
| remaining_text = sentence.strip() |
|
|
| while remaining_text: |
| words = remaining_text.split() |
| if len(words) <= max_words: |
| chunks.append(remaining_text.strip()) |
| break |
|
|
| for pattern in break_patterns: |
| for match in re.finditer(pattern, remaining_text): |
| break_pos = match.end() |
| candidate = remaining_text[:break_pos].strip() |
| if len(candidate.split()) <= max_words: |
| chunks.append(candidate) |
| remaining_text = remaining_text[break_pos:].strip() |
| break |
| else: |
| continue |
| break |
| else: |
| forced = " ".join(words[:max_words]) + "," |
| chunks.append(forced) |
| remaining_text = " ".join(words[max_words:]).strip() |
|
|
| return chunks |
|
|
| def fix_short_sentences(chunk_text): |
| short = re.findall(r'\b[A-Z][a-z]{1,3}\.\s+', chunk_text) |
| if len(short) >= 2: |
| merged = chunk_text.replace(". ", ", ") |
| if not merged.endswith("."): |
| merged += "." |
| return merged |
| return chunk_text |
|
|
| def detect_content_boundary(chunk_text): |
| if re.match(r'^\s*(Chapter \d+|CHAPTER \d+)', chunk_text, re.IGNORECASE): |
| return "chapter_start" |
| if re.search(r'\*\*\*|---|###', chunk_text): |
| return "section_break" |
| if chunk_text.endswith('\n\n') or chunk_text.endswith('\n'): |
| return "paragraph_end" |
| return None |
|
|
| def sentence_chunk_text(text, max_words=30, min_words=4): |
| sentence_end_re = re.compile(r'([.!?][\"\')]*\s+)') |
| lines = text.splitlines() |
| paragraph_buffer = [] |
| final_chunks = [] |
|
|
| def flush_paragraph(lines_in_para): |
| raw_sentences = [] |
|
|
| for line in lines_in_para: |
| start = 0 |
| for match in sentence_end_re.finditer(line): |
| end = match.end() |
| sentence = line[start:end].strip() |
| if sentence: |
| raw_sentences.append(sentence) |
| start = end |
| if start < len(line): |
| sentence = line[start:].strip() |
| if sentence: |
| raw_sentences.append(sentence) |
|
|
| # Now group into chunks |
| temp_chunks = [] |
| short_group = [] |
|
|
| for sentence in raw_sentences: |
| wc = len(sentence.split()) |
|
|
| if wc > max_words: |
| split_chunks = break_long_sentence(sentence, max_words) |
| for chunk in split_chunks: |
| temp_chunks.append(chunk.strip()) |
| elif wc < min_words: |
| short_group.append(sentence) |
| else: |
| if short_group: |
| merged_text = ", ".join(short_group + [sentence]) |
| temp_chunks.append(merged_text.strip()) |
| short_group = [] |
| else: |
| temp_chunks.append(sentence.strip()) |
|
|
| if short_group: |
| merged_text = ", ".join(short_group) |
| temp_chunks.append(merged_text.strip()) |
|
|
| # Apply proper paragraph end tagging |
| for i, chunk in enumerate(temp_chunks): |
| final_chunks.append(( |
| fix_short_sentences(chunk), |
| True if i == len(temp_chunks) - 1 else False |
| )) |
|
|
| for line in lines: |
| stripped = line.strip() |
| if not stripped: |
| flush_paragraph(paragraph_buffer) |
| paragraph_buffer = [] |
| else: |
| paragraph_buffer.append(stripped) |
|
|
| # Flush any remaining paragraph |
| if paragraph_buffer: |
| flush_paragraph(paragraph_buffer) |
|
|
| return final_chunks |
|
|
|
|
|
|
| def prechunk_text_file(path, max_words=30, min_words=4): |
| raw = Path(path).read_text(encoding='utf-8') |
| text = smart_punctuate(raw) |
| chunks = sentence_chunk_text(text, max_words=max_words, min_words=min_words) |
|
|
| enriched_chunks = [] |
| for i, (chunk_text, is_para_end) in enumerate(chunks): |
| boundary = detect_content_boundary(chunk_text) |
|
|
| enriched_chunks.append({ |
| "index": i, |
| "text": chunk_text.strip(), |
| "word_count": len(chunk_text.strip().split()), |
| "boundary_type": boundary or "none", |
| "is_paragraph_end": is_para_end |
| }) |
|
|
|
|
| return enriched_chunks |
|
|