File size: 4,931 Bytes
346d87a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | # chunk_manager.py
import re
from pathlib import Path
from text_cleaner import smart_punctuate # Assuming you've extracted this already
import json
def save_chunks_to_json(chunks, output_path):
"""Save enriched chunk list to a JSON file"""
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(chunks, f, indent=2, ensure_ascii=False)
print(f"✅ Saved {len(chunks)} chunks to: {output_path}")
def break_long_sentence(sentence, max_words):
break_patterns = [
r'(,\s+and\s+)', r'(,\s+but\s+)', r'(,\s+)', r'(;\s*)', r'—', r'(\.\s*")',
]
chunks = []
remaining_text = sentence.strip()
while remaining_text:
words = remaining_text.split()
if len(words) <= max_words:
chunks.append(remaining_text.strip())
break
for pattern in break_patterns:
for match in re.finditer(pattern, remaining_text):
break_pos = match.end()
candidate = remaining_text[:break_pos].strip()
if len(candidate.split()) <= max_words:
chunks.append(candidate)
remaining_text = remaining_text[break_pos:].strip()
break
else:
continue
break
else:
forced = " ".join(words[:max_words]) + ","
chunks.append(forced)
remaining_text = " ".join(words[max_words:]).strip()
return chunks
def fix_short_sentences(chunk_text):
short = re.findall(r'\b[A-Z][a-z]{1,3}\.\s+', chunk_text)
if len(short) >= 2:
merged = chunk_text.replace(". ", ", ")
if not merged.endswith("."):
merged += "."
return merged
return chunk_text
def detect_content_boundary(chunk_text):
if re.match(r'^\s*(Chapter \d+|CHAPTER \d+)', chunk_text, re.IGNORECASE):
return "chapter_start"
if re.search(r'\*\*\*|---|###', chunk_text):
return "section_break"
if chunk_text.endswith('\n\n') or chunk_text.endswith('\n'):
return "paragraph_end"
return None
def sentence_chunk_text(text, max_words=30, min_words=4):
sentence_end_re = re.compile(r'([.!?][\"\')]*\s+)')
lines = text.splitlines()
paragraph_buffer = []
final_chunks = []
def flush_paragraph(lines_in_para):
raw_sentences = []
for line in lines_in_para:
start = 0
for match in sentence_end_re.finditer(line):
end = match.end()
sentence = line[start:end].strip()
if sentence:
raw_sentences.append(sentence)
start = end
if start < len(line):
sentence = line[start:].strip()
if sentence:
raw_sentences.append(sentence)
# Now group into chunks
temp_chunks = []
short_group = []
for sentence in raw_sentences:
wc = len(sentence.split())
if wc > max_words:
split_chunks = break_long_sentence(sentence, max_words)
for chunk in split_chunks:
temp_chunks.append(chunk.strip())
elif wc < min_words:
short_group.append(sentence)
else:
if short_group:
merged_text = ", ".join(short_group + [sentence])
temp_chunks.append(merged_text.strip())
short_group = []
else:
temp_chunks.append(sentence.strip())
if short_group:
merged_text = ", ".join(short_group)
temp_chunks.append(merged_text.strip())
# Apply proper paragraph end tagging
for i, chunk in enumerate(temp_chunks):
final_chunks.append((
fix_short_sentences(chunk),
True if i == len(temp_chunks) - 1 else False
))
for line in lines:
stripped = line.strip()
if not stripped:
flush_paragraph(paragraph_buffer)
paragraph_buffer = []
else:
paragraph_buffer.append(stripped)
# Flush any remaining paragraph
if paragraph_buffer:
flush_paragraph(paragraph_buffer)
return final_chunks
def prechunk_text_file(path, max_words=30, min_words=4):
raw = Path(path).read_text(encoding='utf-8')
text = smart_punctuate(raw)
chunks = sentence_chunk_text(text, max_words=max_words, min_words=min_words)
enriched_chunks = []
for i, (chunk_text, is_para_end) in enumerate(chunks):
boundary = detect_content_boundary(chunk_text)
enriched_chunks.append({
"index": i,
"text": chunk_text.strip(),
"word_count": len(chunk_text.strip().split()),
"boundary_type": boundary or "none",
"is_paragraph_end": is_para_end
})
return enriched_chunks
|