File size: 5,235 Bytes
346d87a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 | #!/usr/bin/env python3
"""
Direct Audio Generation from JSON Tool
This script allows for generating audiobook chunks directly from a pre-existing
`chunks_info.json` file. It is intended for debugging and testing purposes,
allowing a user to manually edit the TTS parameters in the JSON file and
hear the results without the VADER analysis step.
"""
import torch
from pathlib import Path
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from datetime import timedelta
# Add project root to path to allow module imports
project_root = Path(__file__).parent
sys.path.append(str(project_root))
from config.config import *
from modules.tts_engine import load_optimized_model, process_one_chunk
from modules.file_manager import setup_book_directories, list_voice_samples, ensure_voice_sample_compatibility
from wrapper.chunk_loader import load_chunks
from chatterbox.tts import punc_norm
from modules.progress_tracker import log_chunk_progress, log_run
def main():
"""Main function to drive the generation process."""
print(f"{BOLD}{CYAN}--- Direct Audio Generation from JSON Tool ---\{RESET}")
# 1. Get Book Name
book_name = input("Enter the book name (e.g., 'london'): ").strip()
if not book_name:
print("β Book name cannot be empty.")
return
# 2. Locate and Load JSON
book_audio_dir = AUDIOBOOK_ROOT / book_name
json_path = book_audio_dir / "TTS" / "text_chunks" / "chunks_info.json"
if not json_path.exists():
print(f"β Error: JSON file not found at {json_path}")
print("Please ensure you have run the 'Prepare text file' option for this book first.")
return
print(f"π Loading chunks from: {json_path}")
all_chunks = load_chunks(str(json_path))
print(f"β
Found {len(all_chunks)} chunks.")
# 3. Select Voice
voice_files = list_voice_samples()
if not voice_files:
print(f"β No voice samples found in {VOICE_SAMPLES_DIR}")
return
print("\nAvailable voices:")
for i, voice_file in enumerate(voice_files, 1):
print(f" [{i}] {voice_file.stem}")
while True:
try:
choice = input("Select voice number: ").strip()
idx = int(choice) - 1
if 0 <= idx < len(voice_files):
voice_path = voice_files[idx]
break
print("Invalid selection.")
except (ValueError, IndexError):
print("Invalid selection.")
# Ensure voice compatibility
voice_path = ensure_voice_sample_compatibility(voice_path)
# 4. Setup Environment
if torch.cuda.is_available():
device = "cuda"
elif torch.backends.mps.is_available():
device = "mps"
else:
device = "cpu"
print(f"\nπ Using device: {device}")
print(f"π€ Using voice: {Path(voice_path).name}")
# 5. Load Model
model = load_optimized_model(device)
# 6. Prepare voice conditionals (THIS WAS MISSING!)
print(f"π€ Preparing voice conditionals with: {Path(voice_path).name}")
model.prepare_conditionals(voice_path)
# 7. Process Chunks
output_root, tts_dir, text_chunks_dir, audio_chunks_dir = setup_book_directories(Path(TEXT_INPUT_ROOT) / book_name)
# Clean existing audio chunks
print("π§Ή Clearing old audio chunks...")
for wav_file in audio_chunks_dir.glob("*.wav"):
wav_file.unlink()
start_time = time.time()
total_chunks = len(all_chunks)
log_path = output_root / "debug_generation.log"
print(f"\nπ Generating {total_chunks} chunks...")
with ThreadPoolExecutor(max_workers=2) as executor: # Test parallel processing
futures = []
for i, chunk_data in enumerate(all_chunks):
# Extract exaggeration from JSON, force others to default
chunk_tts_params = {
"exaggeration": chunk_data.get("tts_params", {}).get("exaggeration", DEFAULT_EXAGGERATION),
"cfg_weight": DEFAULT_CFG_WEIGHT,
"temperature": DEFAULT_TEMPERATURE
}
future = executor.submit(
process_one_chunk,
i, chunk_data['text'], text_chunks_dir, audio_chunks_dir,
voice_path, chunk_tts_params, start_time, total_chunks,
punc_norm, book_name, log_run, log_path, device,
model, None, chunk_data['is_paragraph_end'], all_chunks, chunk_data['boundary_type']
)
futures.append(future)
for future in as_completed(futures):
try:
result = future.result()
if result:
idx, _ = result
log_chunk_progress(idx, total_chunks, start_time, 0)
except Exception as e:
print(f"\nβ An error occurred while processing a chunk: {e}")
elapsed_time = time.time() - start_time
print(f"\n{GREEN}β
Generation Complete!{RESET}")
print(f"β±οΈ Total time: {timedelta(seconds=int(elapsed_time))}")
print(f"π Audio chunks are in: {audio_chunks_dir}")
print("You can now use Option 3 from the main menu to combine them.")
if __name__ == "__main__":
main()
|