# chunk_manager.py import re from pathlib import Path from text_cleaner import smart_punctuate # Assuming you've extracted this already import json def save_chunks_to_json(chunks, output_path): """Save enriched chunk list to a JSON file""" with open(output_path, 'w', encoding='utf-8') as f: json.dump(chunks, f, indent=2, ensure_ascii=False) print(f"✅ Saved {len(chunks)} chunks to: {output_path}") def break_long_sentence(sentence, max_words): break_patterns = [ r'(,\s+and\s+)', r'(,\s+but\s+)', r'(,\s+)', r'(;\s*)', r'—', r'(\.\s*")', ] chunks = [] remaining_text = sentence.strip() while remaining_text: words = remaining_text.split() if len(words) <= max_words: chunks.append(remaining_text.strip()) break for pattern in break_patterns: for match in re.finditer(pattern, remaining_text): break_pos = match.end() candidate = remaining_text[:break_pos].strip() if len(candidate.split()) <= max_words: chunks.append(candidate) remaining_text = remaining_text[break_pos:].strip() break else: continue break else: forced = " ".join(words[:max_words]) + "," chunks.append(forced) remaining_text = " ".join(words[max_words:]).strip() return chunks def fix_short_sentences(chunk_text): short = re.findall(r'\b[A-Z][a-z]{1,3}\.\s+', chunk_text) if len(short) >= 2: merged = chunk_text.replace(". ", ", ") if not merged.endswith("."): merged += "." return merged return chunk_text def detect_content_boundary(chunk_text): if re.match(r'^\s*(Chapter \d+|CHAPTER \d+)', chunk_text, re.IGNORECASE): return "chapter_start" if re.search(r'\*\*\*|---|###', chunk_text): return "section_break" if chunk_text.endswith('\n\n') or chunk_text.endswith('\n'): return "paragraph_end" return None def sentence_chunk_text(text, max_words=30, min_words=4): sentence_end_re = re.compile(r'([.!?][\"\')]*\s+)') lines = text.splitlines() paragraph_buffer = [] final_chunks = [] def flush_paragraph(lines_in_para): raw_sentences = [] for line in lines_in_para: start = 0 for match in sentence_end_re.finditer(line): end = match.end() sentence = line[start:end].strip() if sentence: raw_sentences.append(sentence) start = end if start < len(line): sentence = line[start:].strip() if sentence: raw_sentences.append(sentence) # Now group into chunks temp_chunks = [] short_group = [] for sentence in raw_sentences: wc = len(sentence.split()) if wc > max_words: split_chunks = break_long_sentence(sentence, max_words) for chunk in split_chunks: temp_chunks.append(chunk.strip()) elif wc < min_words: short_group.append(sentence) else: if short_group: merged_text = ", ".join(short_group + [sentence]) temp_chunks.append(merged_text.strip()) short_group = [] else: temp_chunks.append(sentence.strip()) if short_group: merged_text = ", ".join(short_group) temp_chunks.append(merged_text.strip()) # Apply proper paragraph end tagging for i, chunk in enumerate(temp_chunks): final_chunks.append(( fix_short_sentences(chunk), True if i == len(temp_chunks) - 1 else False )) for line in lines: stripped = line.strip() if not stripped: flush_paragraph(paragraph_buffer) paragraph_buffer = [] else: paragraph_buffer.append(stripped) # Flush any remaining paragraph if paragraph_buffer: flush_paragraph(paragraph_buffer) return final_chunks def prechunk_text_file(path, max_words=30, min_words=4): raw = Path(path).read_text(encoding='utf-8') text = smart_punctuate(raw) chunks = sentence_chunk_text(text, max_words=max_words, min_words=min_words) enriched_chunks = [] for i, (chunk_text, is_para_end) in enumerate(chunks): boundary = detect_content_boundary(chunk_text) enriched_chunks.append({ "index": i, "text": chunk_text.strip(), "word_count": len(chunk_text.strip().split()), "boundary_type": boundary or "none", "is_paragraph_end": is_para_end }) return enriched_chunks