""" Text Processing Module Handles text chunking, abbreviations, and preprocessing for TTS """ import re import logging from pathlib import Path from config import * # ============================================================================ # ABBREVIATION REPLACEMENT SYSTEM # ============================================================================ def load_abbreviations(file_path="abbreviations.txt"): """Load abbreviation replacements from external file""" replacements = {} abbrev_file = Path(file_path) if not abbrev_file.exists(): print(f"⚠️ {YELLOW}Abbreviations file not found: {file_path}{RESET}") print(f"📝 Creating sample file...") create_sample_abbreviations_file(abbrev_file) return replacements try: with open(abbrev_file, 'r', encoding='utf-8') as f: for line_num, line in enumerate(f, 1): line = line.strip() # Skip empty lines and comments if not line or line.startswith('#'): continue # Parse "abbrev -> replacement" format if ' -> ' in line: abbrev, replacement = line.split(' -> ', 1) replacements[abbrev.strip()] = replacement.strip() else: print(f"⚠️ Invalid format on line {line_num}: {line}") print(f"✅ Loaded {len(replacements)} abbreviation replacements from {file_path}") except Exception as e: print(f"❌ Error loading abbreviations: {e}") return replacements def create_sample_abbreviations_file(file_path): """Create a sample abbreviations file with common replacements""" sample_content = """# Abbreviation Replacements for TTS # Format: abbreviation -> replacement # Lines starting with # are comments # Common titles and abbreviations Dr. -> Doctor Mr. -> Mister Mrs. -> Missus Ms. -> Miss Prof. -> Professor Rev. -> Reverend Lt. -> Lieutenant Capt. -> Captain Gen. -> General Col. -> Colonel Jr. -> Junior Sr. -> Senior # Political and organizations M.P. -> MP U.S. -> US U.K. -> UK U.N. -> UN F.B.I. -> FBI C.I.A. -> CIA N.A.S.A. -> NASA # Common abbreviations etc. -> et cetera vs. -> versus e.g. -> for example i.e. -> that is Inc. -> Incorporated Corp. -> Corporation Ltd. -> Limited Co. -> Company # Numbers and ordinals 1st -> first 2nd -> second 3rd -> third 4th -> fourth 5th -> fifth 10th -> tenth 20th -> twentieth 21st -> twenty-first 30th -> thirtieth 40th -> fortieth 50th -> fiftieth 60th -> sixtieth 70th -> seventieth 80th -> eightieth 90th -> ninetieth 100th -> one hundredth # Time abbreviations a.m. -> AM p.m. -> PM A.M. -> AM P.M. -> PM """ try: with open(file_path, 'w', encoding='utf-8') as f: f.write(sample_content) print(f"📝 Created sample abbreviations file: {file_path}") print(f"💡 Edit this file to add your own replacements!") except Exception as e: print(f"❌ Error creating sample file: {e}") def preprocess_abbreviations(text, replacements): """Replace abbreviations with TTS-friendly versions""" if not replacements: return text original_text = text replacements_made = 0 # Apply replacements (order matters for overlapping patterns) for abbrev, replacement in replacements.items(): if abbrev in text: text = text.replace(abbrev, replacement) replacements_made += 1 if replacements_made > 0: logging.info(f"📝 Applied {replacements_made} abbreviation replacements") return text # ============================================================================ # TEXT PREPROCESSING AND CHUNKING # ============================================================================ def smart_punctuate(text): """Enhanced punctuation normalization with abbreviation replacement""" # Load abbreviations and apply them abbreviation_replacements = load_abbreviations() text = preprocess_abbreviations(text, abbreviation_replacements) # Then continue with existing punctuation logic lines = text.splitlines() out = [] for l in lines: stripped = l.strip() # Preserve empty lines (paragraph breaks) if not stripped: out.append("") # Keep the blank line # Process non-empty lines elif not re.search(r'[.!?]$', stripped): out.append(stripped + ".") else: out.append(stripped) result = "\n".join(out) # Enhanced text preprocessing - use simple string replacements result = result.replace('"', '"').replace('"', '"') # Replace smart double quotes result = result.replace(''', "'").replace(''', "'") # Replace smart single quotes # Remove problematic formatting result = re.sub(r'\*\*([^*]+)\*\*', r'\1', result) # Remove bold markdown result = re.sub(r'_{2,}', '', result) # Remove underlines return result def fix_short_sentence_artifacts(chunk_text): """ Fix multiple short sentences that cause TTS errors. Example: "Yes. No. Maybe." → "Yes, no, maybe." "Right." → "Right," (if it's a single-word chunk) """ # Handle full chunk that is just one short sentence words = chunk_text.strip().split() if len(words) == 1 and chunk_text.strip().endswith('.'): return chunk_text.strip()[:-1] + ',' # Replace period with comma parts = re.split(r'([.!?])', chunk_text.strip()) if len(parts) < 2: return chunk_text # nothing to fix # Reconstruct sentence-punctuation pairs sentences = [] for i in range(0, len(parts)-1, 2): sentence = parts[i].strip() punct = parts[i+1] if sentence: word_count = len(sentence.split()) sentences.append((sentence, punct, word_count)) # Handle multiple short sentences short_count = sum(1 for _, _, wc in sentences if wc <= 3) if short_count >= 2 and len(sentences) >= 2: merged = ", ".join(s for s, _, _ in sentences) + "." return merged # Handle case where first sentence is a single word if len(sentences) >= 2 and sentences[0][2] == 1 and sentences[0][1] == ".": # Replace period with comma first, second = sentences[0][0], sentences[1][0] rest = " ".join(s for s, _, _ in sentences[2:]) new_text = f"{first}, {second}" if rest: new_text += " " + rest return new_text return chunk_text def sentence_chunk_text(text, max_words=MAX_CHUNK_WORDS, min_words=MIN_CHUNK_WORDS): """Enhanced sentence chunking with smart mid-sentence breaking for long sentences""" # First, split into sentences sentence_end_re = re.compile(r'([.!?][\"\'\)]*\s+)') sentences = [] start_index = 0 for match in sentence_end_re.finditer(text): end_index = match.end() sentence = text[start_index:end_index].strip() if sentence: sentences.append(sentence) start_index = end_index if start_index < len(text): remainder = text[start_index:].strip() if remainder: sentences.append(remainder) # Process each sentence and break if too long processed_chunks = [] for sentence in sentences: sentence = sentence.strip() if not sentence: continue # Check if sentence exceeds word limit sentence_words = sentence.split() if len(sentence_words) <= max_words: # Sentence is fine as-is is_para_end = sentence.endswith("\n") or sentence.endswith("\n\n") processed_chunks.append((sentence, is_para_end)) else: # Sentence is too long - need to break it intelligently broken_chunks = break_long_sentence(sentence, max_words) processed_chunks.extend(broken_chunks) # Now group short chunks together (original grouping logic) final_chunks = [] short_group = [] for chunk_text, is_para_end in processed_chunks: word_count = len(chunk_text.split()) if word_count < min_words and not is_para_end: # Collect short chunks for grouping short_group.append(re.sub(r'[.!?]+$', '', chunk_text.strip())) else: # Process any accumulated short chunks if short_group: if word_count < min_words: # This chunk is also short, add it to the group short_group.append(re.sub(r'[.!?]+$', '', chunk_text.strip())) merged = ", ".join(short_group) + "." final_chunks.append((merged.strip(), is_para_end)) short_group = [] else: # Merge short group with current chunk merged = ", ".join(short_group) + ", " + chunk_text final_chunks.append((merged.strip(), is_para_end)) short_group = [] else: # Normal chunk final_chunks.append((chunk_text, is_para_end)) # Handle any remaining short group if short_group: merged = ", ".join(short_group) + "." final_chunks.append((merged.strip(), False)) # Apply short sentence cleanup fixed_chunks = [] for chunk_text, is_para_end in final_chunks: fixed_text = fix_short_sentence_artifacts(chunk_text) fixed_chunks.append((fixed_text, is_para_end)) return fixed_chunks def break_long_sentence(sentence, max_words): """Break a long sentence at natural pause points""" # Define break points in order of preference break_patterns = [ r'(,\s+and\s+)', # ", and " r'(,\s+but\s+)', # ", but " r'(,\s+yet\s+)', # ", yet " r'(,\s+or\s+)', # ", or " r'(,\s+so\s+)', # ", so " r'(;\s*)', # "; " r'(—\s*)', # "— " (em dash) r'(\s+—\s+)', # " — " (spaced em dash) r'(\.\s*")', # '." ' (end quote) r'("\s*)', # '" ' (start quote) r'(,\s+which\s+)', # ", which " r'(,\s+when\s+)', # ", when " r'(,\s+where\s+)', # ", where " r'(,\s+while\s+)', # ", while " r'(,\s+though\s+)', # ", though " r'(,\s+)', # ", " (any comma - last resort) ] chunks = [] remaining_text = sentence.strip() is_para_end = sentence.endswith("\n") or sentence.endswith("\n\n") while remaining_text: words = remaining_text.split() if len(words) <= max_words: # Remaining text fits in one chunk chunks.append((remaining_text.strip(), is_para_end if not chunks else False)) break # Find the best break point within the word limit best_break_pos = None best_break_text = "" # Try each break pattern for pattern in break_patterns: for match in re.finditer(pattern, remaining_text): break_pos = match.end() # Check if this break point gives us a good chunk size potential_chunk = remaining_text[:break_pos].strip() chunk_words = potential_chunk.split() if len(chunk_words) <= max_words and len(chunk_words) >= min(6, max_words // 3): best_break_pos = break_pos best_break_text = potential_chunk break if best_break_pos: break if best_break_pos: # Found a good break point chunks.append((best_break_text, False)) remaining_text = remaining_text[best_break_pos:].strip() else: # No good break point found - force break at word limit words = remaining_text.split() force_break_words = words[:max_words] force_break_text = " ".join(force_break_words) # Try to end at a reasonable point if not force_break_text.endswith(('.', '!', '?', ',', ';')): force_break_text += "," chunks.append((force_break_text, False)) remaining_text = " ".join(words[max_words:]).strip() return chunks # ============================================================================ # CONTENT BOUNDARY DETECTION # ============================================================================ def detect_content_boundaries(chunk_text, chunk_index, all_chunks): """Detect chapter breaks and paragraph endings for appropriate silence""" boundary_type = None # Chapter detection (flexible patterns) chapter_patterns = [ r'^(Chapter \d+|CHAPTER \d+)', r'^(Ch\. \d+|CH\. \d+)', r'^\d+\.', # Simple "1." numbering r'^[IVX]+\.', # Roman numerals "I.", "II.", etc. ] for pattern in chapter_patterns: if re.search(pattern, chunk_text.strip(), re.MULTILINE): boundary_type = "chapter_start" break # Look ahead for chapter start (current chunk ends chapter) if chunk_index + 1 < len(all_chunks): next_chunk = all_chunks[chunk_index + 1] for pattern in chapter_patterns: if re.search(pattern, next_chunk.strip()): boundary_type = "chapter_end" break # Section breaks (asterisks, multiple line breaks) if re.search(r'\*{3,}|\#{3,}|—{3,}', chunk_text): boundary_type = "section_break" # Paragraph ending (already detected in chunking) if chunk_text.endswith('\n\n') or chunk_text.endswith('\n'): if boundary_type is None: boundary_type = "paragraph_end" return boundary_type # ============================================================================ # UTILITY FUNCTIONS # ============================================================================ def reload_abbreviations(): """Reload abbreviations from file (useful for testing changes)""" return load_abbreviations() def test_abbreviations(test_text="Dr. Smith met with the M.P. at 3:30 p.m. on the 21st."): """Test abbreviation replacements on sample text""" abbreviation_replacements = load_abbreviations() print(f"Original: {test_text}") processed = preprocess_abbreviations(test_text, abbreviation_replacements) print(f"Processed: {processed}") return processed def test_chunking(test_text=None, max_words=20, min_words=4): """Test the enhanced chunking with sample or custom text""" if test_text is None: test_text = '''Though perfectly worldly-wise, and able, as she expressed it, to take care of herself, there was yet something curiously ingenuous in her single-minded attitude towards life, and her whole-hearted determination to "make good." This glimpse of a world unknown to me was not without its charm, and I enjoyed seeing her vivid little face light up as she talked.''' chunks = sentence_chunk_text(test_text, max_words=max_words, min_words=min_words) print("Enhanced Chunking Results:") for i, (chunk, is_para) in enumerate(chunks): word_count = len(chunk.split()) print(f"Chunk {i+1} ({word_count} words): {chunk}") if word_count > max_words: print(f" ⚠️ WARNING: Still over limit!") print() return chunks