danneauxs
Deploy complete ChatterboxTTS system with utils, tools, and wrapper modules
346d87a
raw
history blame
4.93 kB
# chunk_manager.py
import re
from pathlib import Path
from text_cleaner import smart_punctuate # Assuming you've extracted this already
import json
def save_chunks_to_json(chunks, output_path):
"""Save enriched chunk list to a JSON file"""
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(chunks, f, indent=2, ensure_ascii=False)
print(f"✅ Saved {len(chunks)} chunks to: {output_path}")
def break_long_sentence(sentence, max_words):
break_patterns = [
r'(,\s+and\s+)', r'(,\s+but\s+)', r'(,\s+)', r'(;\s*)', r'—', r'(\.\s*")',
]
chunks = []
remaining_text = sentence.strip()
while remaining_text:
words = remaining_text.split()
if len(words) <= max_words:
chunks.append(remaining_text.strip())
break
for pattern in break_patterns:
for match in re.finditer(pattern, remaining_text):
break_pos = match.end()
candidate = remaining_text[:break_pos].strip()
if len(candidate.split()) <= max_words:
chunks.append(candidate)
remaining_text = remaining_text[break_pos:].strip()
break
else:
continue
break
else:
forced = " ".join(words[:max_words]) + ","
chunks.append(forced)
remaining_text = " ".join(words[max_words:]).strip()
return chunks
def fix_short_sentences(chunk_text):
short = re.findall(r'\b[A-Z][a-z]{1,3}\.\s+', chunk_text)
if len(short) >= 2:
merged = chunk_text.replace(". ", ", ")
if not merged.endswith("."):
merged += "."
return merged
return chunk_text
def detect_content_boundary(chunk_text):
if re.match(r'^\s*(Chapter \d+|CHAPTER \d+)', chunk_text, re.IGNORECASE):
return "chapter_start"
if re.search(r'\*\*\*|---|###', chunk_text):
return "section_break"
if chunk_text.endswith('\n\n') or chunk_text.endswith('\n'):
return "paragraph_end"
return None
def sentence_chunk_text(text, max_words=30, min_words=4):
sentence_end_re = re.compile(r'([.!?][\"\')]*\s+)')
lines = text.splitlines()
paragraph_buffer = []
final_chunks = []
def flush_paragraph(lines_in_para):
raw_sentences = []
for line in lines_in_para:
start = 0
for match in sentence_end_re.finditer(line):
end = match.end()
sentence = line[start:end].strip()
if sentence:
raw_sentences.append(sentence)
start = end
if start < len(line):
sentence = line[start:].strip()
if sentence:
raw_sentences.append(sentence)
# Now group into chunks
temp_chunks = []
short_group = []
for sentence in raw_sentences:
wc = len(sentence.split())
if wc > max_words:
split_chunks = break_long_sentence(sentence, max_words)
for chunk in split_chunks:
temp_chunks.append(chunk.strip())
elif wc < min_words:
short_group.append(sentence)
else:
if short_group:
merged_text = ", ".join(short_group + [sentence])
temp_chunks.append(merged_text.strip())
short_group = []
else:
temp_chunks.append(sentence.strip())
if short_group:
merged_text = ", ".join(short_group)
temp_chunks.append(merged_text.strip())
# Apply proper paragraph end tagging
for i, chunk in enumerate(temp_chunks):
final_chunks.append((
fix_short_sentences(chunk),
True if i == len(temp_chunks) - 1 else False
))
for line in lines:
stripped = line.strip()
if not stripped:
flush_paragraph(paragraph_buffer)
paragraph_buffer = []
else:
paragraph_buffer.append(stripped)
# Flush any remaining paragraph
if paragraph_buffer:
flush_paragraph(paragraph_buffer)
return final_chunks
def prechunk_text_file(path, max_words=30, min_words=4):
raw = Path(path).read_text(encoding='utf-8')
text = smart_punctuate(raw)
chunks = sentence_chunk_text(text, max_words=max_words, min_words=min_words)
enriched_chunks = []
for i, (chunk_text, is_para_end) in enumerate(chunks):
boundary = detect_content_boundary(chunk_text)
enriched_chunks.append({
"index": i,
"text": chunk_text.strip(),
"word_count": len(chunk_text.strip().split()),
"boundary_type": boundary or "none",
"is_paragraph_end": is_para_end
})
return enriched_chunks