File size: 17,923 Bytes
7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 67b64d0 7e5b4d7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 | """
ChatterboxTTS File Management & Media Processing Module
======================================================
OVERVIEW:
This module handles all file system operations, media format conversions, and
metadata management for ChatterboxTTS. It manages the complex directory structure
for audiobook production and handles conversion to final distribution formats.
MAIN COMPONENTS:
1. DIRECTORY MANAGEMENT: Creates and maintains audiobook processing directories
2. AUDIO DISCOVERY: Locates and validates audio files across directory structures
3. M4B CONVERSION: Converts WAV chunks to M4B audiobook format using FFmpeg
4. METADATA HANDLING: Adds cover art, chapters, and book information to audiobooks
5. FILE VALIDATION: Ensures audio file compatibility and format requirements
6. VOICE SAMPLE MANAGEMENT: Handles voice sample discovery and validation
KEY OPERATIONS:
- Directory structure setup for new audiobooks
- Audio chunk discovery and organization
- WAV to M4B conversion with chapter markers
- Cover art integration and metadata embedding
- File compatibility checking (24kHz requirement for voice samples)
- Final audiobook packaging and organization
DIRECTORY STRUCTURE MANAGED:
```
Audiobook/[book_name]/
βββ TTS/
β βββ text_chunks/ # Individual text chunk files
β βββ audio_chunks/ # Generated WAV audio chunks
βββ [book_name].m4b # Final audiobook file
βββ processing.log # Processing logs
βββ metadata files # Cover art, chapter info
```
TECHNICAL FEATURES:
- FFmpeg integration for media processing
- Automatic cover art detection and integration
- Chapter marker generation from chunk structure
- Metadata preservation across format conversions
- File system safety with validation and error handling
- Cross-platform file operations (Windows/Linux/Mac)
PERFORMANCE CONSIDERATIONS:
Handles large audio files efficiently with streaming processing
and manages disk space through temporary file cleanup.
"""
import subprocess
import soundfile as sf
import os
import re
import time
import logging
from pathlib import Path
from config.config import *
# ============================================================================
# VOICE SAMPLE MANAGEMENT
# ============================================================================
def list_voice_samples():
"""List available voice samples"""
return sorted(VOICE_SAMPLES_DIR.glob("*.wav"), key=lambda x: x.stem.lower())
def ensure_voice_sample_compatibility(input_path, output_dir=None):
"""Ensure voice sample is compatible with TTS (24kHz mono)"""
input_path = str(input_path)
ext = os.path.splitext(input_path)[1].lower()
basename = os.path.splitext(os.path.basename(input_path))[0]
output_dir = output_dir or os.path.dirname(input_path)
output_path = os.path.join(output_dir, basename + "_ttsready.wav")
try:
info = sf.info(input_path)
if (ext == '.wav' and info.samplerate == 24000 and info.channels == 1):
return input_path
except Exception:
pass
cmd = [
"ffmpeg", "-y",
"-i", input_path,
"-ar", "24000",
"-ac", "1",
output_path
]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return output_path
# ============================================================================
# FFMPEG OPERATIONS
# ============================================================================
def run_ffmpeg(cmd):
"""Run FFmpeg command with error handling"""
try:
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError as e:
logging.info(f"FFmpeg command failed: {' '.join(cmd)}")
logging.info(f"Error: {e}")
subprocess.run(cmd)
raise
# ============================================================================
# M4B CONVERSION WITH NORMALIZATION
# ============================================================================
def convert_to_m4b_with_peak_normalization(wav_path, temp_m4b_path, target_db=-3.0, custom_speed=None):
"""Convert WAV to M4B with peak normalization"""
print("π Converting to m4b with peak normalization...")
# Build audio filter chain
speed_to_use = custom_speed if custom_speed is not None else ATEMPO_SPEED
audio_filters = [f"loudnorm=I=-16:TP={target_db}:LRA=11"]
if speed_to_use != 1.0:
audio_filters.append(f"atempo={speed_to_use}")
print(f"π Converting to m4b with peak normalization and speed {speed_to_use}x...")
cmd = [
"ffmpeg", "-y",
"-i", str(wav_path),
"-af", ",".join(audio_filters),
"-c:a", "aac",
str(temp_m4b_path)
]
start_time = time.time()
process = subprocess.Popen(cmd, stderr=subprocess.PIPE, text=True)
audio_secs = 0.0
for line in process.stderr:
match = re.search(r"time=(\d{2}):(\d{2}):(\d{2})\.(\d{2})", line)
if match:
h, m, s, ms = map(int, match.groups())
audio_secs = h * 3600 + m * 60 + s + ms / 100
elapsed = time.time() - start_time
factor = audio_secs / elapsed if elapsed > 0 else 0.0
print(f"πΌ FFmpeg (normalizing): {match.group(0)} | {factor:.2f}x realtime", end='\r')
process.wait()
print("\nβ
Conversion with normalization complete.")
def convert_to_m4b_with_loudness_normalization(wav_path, temp_m4b_path, custom_speed=None):
"""Convert WAV to M4B with two-pass loudness normalization"""
import json
print("π Converting to m4b with loudness normalization...")
# Step 1: Analyze audio loudness
print("π Analyzing audio loudness...")
analyze_cmd = [
"ffmpeg", "-y",
"-i", str(wav_path),
"-af", "loudnorm=I=-16:TP=-1.5:LRA=11:print_format=json",
"-f", "null", "-"
]
result = subprocess.run(analyze_cmd, capture_output=True, text=True)
# Extract loudness measurements from stderr
loudness_data = None
for line in result.stderr.split('\n'):
if line.strip().startswith('{'):
try:
loudness_data = json.loads(line.strip())
break
except:
continue
if not loudness_data:
print("β οΈ Could not analyze loudness, falling back to single-pass...")
return convert_to_m4b_with_peak_normalization(wav_path, temp_m4b_path)
# Step 2: Apply normalization with measured values
print("π§ Applying normalization...")
# Build audio filter chain
speed_to_use = custom_speed if custom_speed is not None else ATEMPO_SPEED
audio_filters = [f"loudnorm=I=-16:TP=-1.5:LRA=11:measured_I={loudness_data['input_i']}:measured_LRA={loudness_data['input_lra']}:measured_TP={loudness_data['input_tp']}:measured_thresh={loudness_data['input_thresh']}:offset={loudness_data['target_offset']}:linear=true:print_format=summary"]
if speed_to_use != 1.0:
audio_filters.append(f"atempo={speed_to_use}")
cmd = [
"ffmpeg", "-y",
"-i", str(wav_path),
"-af", ",".join(audio_filters),
"-c:a", "aac",
str(temp_m4b_path)
]
start_time = time.time()
process = subprocess.Popen(cmd, stderr=subprocess.PIPE, text=True)
audio_secs = 0.0
for line in process.stderr:
match = re.search(r"time=(\d{2}):(\d{2}):(\d{2})\.(\d{2})", line)
if match:
h, m, s, ms = map(int, match.groups())
audio_secs = h * 3600 + m * 60 + s + ms / 100
elapsed = time.time() - start_time
factor = audio_secs / elapsed if elapsed > 0 else 0.0
print(f"πΌ FFmpeg (normalizing): {match.group(0)} | {factor:.2f}x realtime", end='\r')
process.wait()
print("\nβ
Two-pass normalization complete.")
def convert_to_m4b_with_simple_normalization(wav_path, temp_m4b_path, target_db=-6.0, custom_speed=None):
"""Convert WAV to M4B with simple peak normalization"""
print("π Converting to m4b with simple normalization...")
# Build audio filter chain
speed_to_use = custom_speed if custom_speed is not None else ATEMPO_SPEED
audio_filters = [f"volume={target_db}dB"]
if speed_to_use != 1.0:
audio_filters.append(f"atempo={speed_to_use}")
cmd = [
"ffmpeg", "-y",
"-i", str(wav_path),
"-af", ",".join(audio_filters),
"-c:a", "aac",
str(temp_m4b_path)
]
start_time = time.time()
process = subprocess.Popen(cmd, stderr=subprocess.PIPE, text=True)
audio_secs = 0.0
for line in process.stderr:
match = re.search(r"time=(\d{2}):(\d{2}):(\d{2})\.(\d{2})", line)
if match:
h, m, s, ms = map(int, match.groups())
audio_secs = h * 3600 + m * 60 + s + ms / 100
elapsed = time.time() - start_time
factor = audio_secs / elapsed if elapsed > 0 else 0.0
print(f"πΌ FFmpeg (normalizing): {match.group(0)} | {factor:.2f}x realtime", end='\r')
process.wait()
print("\nβ
Simple normalization complete.")
def convert_to_m4b(wav_path, temp_m4b_path, custom_speed=None):
"""Convert WAV to M4B with configurable normalization and optional custom speed"""
# Determine speed to use (custom speed overrides config)
speed_to_use = custom_speed if custom_speed is not None else ATEMPO_SPEED
if not ENABLE_NORMALIZATION or NORMALIZATION_TYPE == "none":
# Original function without normalization
print(f"π Converting to m4b with speed {speed_to_use}x...")
# Build audio filter for atempo if needed
audio_filter = []
if speed_to_use != 1.0:
audio_filter = ["-filter:a", f"atempo={speed_to_use}"]
cmd = [
"ffmpeg", "-y",
"-i", str(wav_path)
] + audio_filter + [
"-c:a", "aac",
str(temp_m4b_path)
]
elif NORMALIZATION_TYPE == "loudness":
# EBU R128 loudness normalization (recommended for audiobooks)
return convert_to_m4b_with_loudness_normalization(wav_path, temp_m4b_path, custom_speed)
elif NORMALIZATION_TYPE == "peak":
# Peak normalization
return convert_to_m4b_with_peak_normalization(wav_path, temp_m4b_path, TARGET_PEAK_DB, custom_speed)
elif NORMALIZATION_TYPE == "simple":
# Simple volume adjustment
return convert_to_m4b_with_simple_normalization(wav_path, temp_m4b_path, TARGET_PEAK_DB, custom_speed)
else:
# Fallback to no normalization
# Build audio filter for atempo if needed
audio_filter = []
if speed_to_use != 1.0:
audio_filter = ["-filter:a", f"atempo={speed_to_use}"]
cmd = [
"ffmpeg", "-y",
"-i", str(wav_path)
] + audio_filter + [
"-c:a", "aac",
str(temp_m4b_path)
]
# Run the conversion (if not handled by specialized functions above)
start_time = time.time()
process = subprocess.Popen(cmd, stderr=subprocess.PIPE, text=True)
audio_secs = 0.0
for line in process.stderr:
match = re.search(r"time=(\d{2}):(\d{2}):(\d{2})\.(\d{2})", line)
if match:
h, m, s, ms = map(int, match.groups())
audio_secs = h * 3600 + m * 60 + s + ms / 100
elapsed = time.time() - start_time
factor = audio_secs / elapsed if elapsed > 0 else 0.0
print(f"πΌ FFmpeg: {match.group(0)} | {factor:.2f}x realtime", end='\r')
process.wait()
print("\nβ
Conversion complete.")
def add_metadata_to_m4b(temp_m4b_path, final_m4b_path, cover_path=None, nfo_path=None):
"""Add metadata and cover to M4B"""
cmd = ["ffmpeg", "-y", "-i", str(temp_m4b_path)]
if cover_path and cover_path.exists():
cmd.extend(["-i", str(cover_path), "-map", "0", "-map", "1", "-c", "copy", "-disposition:v:0", "attached_pic"])
else:
cmd.extend(["-map", "0", "-c", "copy"])
if nfo_path and nfo_path.exists():
with open(nfo_path, 'r', encoding='utf-8') as f:
for line in f:
if ':' in line:
key, val = line.strip().split(':', 1)
cmd.extend(["-metadata", f"{key.strip()}={val.strip()}"])
cmd.append(str(final_m4b_path))
run_ffmpeg(cmd)
temp_m4b_path.unlink(missing_ok=True)
# ============================================================================
# FILE UTILITIES
# ============================================================================
def chunk_sort_key(f):
"""Extracts the chunk number for natural sorting"""
m = re.match(r"chunk_(\d+)\.wav", f.name)
return int(m.group(1)) if m else 0
def create_concat_file(chunk_paths, output_path):
"""Create FFmpeg concat file for audio chunks"""
with open(output_path, 'w') as f:
for p in chunk_paths:
# Use absolute path to ensure FFmpeg can find the files
f.write(f"file '{str(p.resolve())}'\n")
logging.info(f"concat.txt written with {len(chunk_paths)} chunks.")
return output_path
def cleanup_temp_files(directory, patterns):
"""Clean up temporary files matching patterns"""
files_cleaned = 0
for pattern in patterns:
for temp_file in directory.glob(pattern):
temp_file.unlink(missing_ok=True)
files_cleaned += 1
return files_cleaned
# ============================================================================
# DIRECTORY MANAGEMENT
# ============================================================================
def setup_book_directories(book_dir):
"""Set up directory structure for book processing"""
basename = book_dir.name
output_root = AUDIOBOOK_ROOT / basename
tts_dir = output_root / "TTS"
text_chunks_dir = tts_dir / "text_chunks"
audio_chunks_dir = tts_dir / "audio_chunks"
# Create directories
for d in [output_root, tts_dir, text_chunks_dir, audio_chunks_dir]:
d.mkdir(parents=True, exist_ok=True)
return output_root, tts_dir, text_chunks_dir, audio_chunks_dir
def find_book_files(book_dir):
"""Find text files, cover, and metadata for a book"""
text_files = sorted(book_dir.glob("*.txt"))
nfo_file = book_dir / "book.nfo"
cover_jpg = book_dir / "cover.jpg"
cover_png = book_dir / "cover.png"
cover_file = cover_jpg if cover_jpg.exists() else cover_png if cover_png.exists() else None
return {
'text': text_files[0] if text_files else None,
'cover': cover_file,
'nfo': nfo_file if nfo_file.exists() else None
}
# ============================================================================
# AUDIO FILE OPERATIONS
# ============================================================================
def combine_audio_chunks(chunk_paths, output_path):
"""Combine audio chunks into single file using FFmpeg"""
concat_list_path = output_path.parent / "concat.txt"
create_concat_file(chunk_paths, concat_list_path)
run_ffmpeg([
"ffmpeg", "-y", "-f", "concat", "-safe", "0",
"-i", str(concat_list_path.resolve()),
"-c", "copy", str(output_path.resolve())
])
return output_path
def get_audio_files_in_directory(directory, pattern="chunk_*.wav"):
"""Get sorted list of audio files matching pattern"""
chunk_paths = sorted([f for f in directory.glob(pattern)
if re.fullmatch(r'chunk_\d{3,}\.wav', f.name)],
key=chunk_sort_key)
return chunk_paths
# ============================================================================
# VALIDATION AND VERIFICATION
# ============================================================================
def verify_audio_file(wav_path):
"""Verify audio file is valid and readable"""
try:
info = sf.info(str(wav_path))
return info.frames > 0 and info.samplerate > 0
except Exception as e:
logging.error(f"Invalid audio file {wav_path}: {e}")
return False
def verify_chunk_completeness(audio_chunks_dir, expected_count):
"""Verify all expected chunks exist and are valid"""
missing_chunks = []
invalid_chunks = []
for i in range(1, expected_count + 1):
chunk_path = audio_chunks_dir / f"chunk_{i:05}.wav"
if not chunk_path.exists():
missing_chunks.append(i)
elif not verify_audio_file(chunk_path):
invalid_chunks.append(i)
return missing_chunks, invalid_chunks
# ============================================================================
# EXPORT AND IMPORT FUNCTIONS
# ============================================================================
def export_processing_log(output_dir, processing_info):
"""Export comprehensive processing log"""
log_path = output_dir / "processing_complete.log"
with open(log_path, 'w', encoding='utf-8') as f:
f.write("GenTTS Processing Complete\n")
f.write("=" * 50 + "\n\n")
for key, value in processing_info.items():
f.write(f"{key}: {value}\n")
return log_path
def save_chunk_info(text_chunks_dir, chunks_info):
"""Save chunk information for debugging/resume"""
info_path = text_chunks_dir / "chunks_info.json"
import json
with open(info_path, 'w', encoding='utf-8') as f:
json.dump(chunks_info, f, indent=2, ensure_ascii=False)
return info_path
def load_chunk_info(text_chunks_dir):
"""Load chunk information if available"""
info_path = text_chunks_dir / "chunks_info.json"
if not info_path.exists():
return None
import json
try:
with open(info_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logging.warning(f"Could not load chunk info: {e}")
return None
|