File size: 5,235 Bytes
346d87a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python3
"""
Direct Audio Generation from JSON Tool

This script allows for generating audiobook chunks directly from a pre-existing
`chunks_info.json` file. It is intended for debugging and testing purposes,
allowing a user to manually edit the TTS parameters in the JSON file and
hear the results without the VADER analysis step.
"""

import torch
from pathlib import Path
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from datetime import timedelta

# Add project root to path to allow module imports
project_root = Path(__file__).parent
sys.path.append(str(project_root))

from config.config import *
from modules.tts_engine import load_optimized_model, process_one_chunk
from modules.file_manager import setup_book_directories, list_voice_samples, ensure_voice_sample_compatibility
from wrapper.chunk_loader import load_chunks
from chatterbox.tts import punc_norm
from modules.progress_tracker import log_chunk_progress, log_run

def main():
    """Main function to drive the generation process."""
    print(f"{BOLD}{CYAN}--- Direct Audio Generation from JSON Tool ---\{RESET}")
    
    # 1. Get Book Name
    book_name = input("Enter the book name (e.g., 'london'): ").strip()
    if not book_name:
        print("❌ Book name cannot be empty.")
        return

    # 2. Locate and Load JSON
    book_audio_dir = AUDIOBOOK_ROOT / book_name
    json_path = book_audio_dir / "TTS" / "text_chunks" / "chunks_info.json"

    if not json_path.exists():
        print(f"❌ Error: JSON file not found at {json_path}")
        print("Please ensure you have run the 'Prepare text file' option for this book first.")
        return

    print(f"πŸ“– Loading chunks from: {json_path}")
    all_chunks = load_chunks(str(json_path))
    print(f"βœ… Found {len(all_chunks)} chunks.")

    # 3. Select Voice
    voice_files = list_voice_samples()
    if not voice_files:
        print(f"❌ No voice samples found in {VOICE_SAMPLES_DIR}")
        return

    print("\nAvailable voices:")
    for i, voice_file in enumerate(voice_files, 1):
        print(f" [{i}] {voice_file.stem}")
    
    while True:
        try:
            choice = input("Select voice number: ").strip()
            idx = int(choice) - 1
            if 0 <= idx < len(voice_files):
                voice_path = voice_files[idx]
                break
            print("Invalid selection.")
        except (ValueError, IndexError):
            print("Invalid selection.")
    
    # Ensure voice compatibility
    voice_path = ensure_voice_sample_compatibility(voice_path)

    # 4. Setup Environment
    if torch.cuda.is_available():
        device = "cuda"
    elif torch.backends.mps.is_available():
        device = "mps"
    else:
        device = "cpu"
    
    print(f"\nπŸš€ Using device: {device}")
    print(f"🎀 Using voice: {Path(voice_path).name}")

    # 5. Load Model
    model = load_optimized_model(device)
    
    # 6. Prepare voice conditionals (THIS WAS MISSING!)
    print(f"🎀 Preparing voice conditionals with: {Path(voice_path).name}")
    model.prepare_conditionals(voice_path)

    # 7. Process Chunks
    output_root, tts_dir, text_chunks_dir, audio_chunks_dir = setup_book_directories(Path(TEXT_INPUT_ROOT) / book_name)
    
    # Clean existing audio chunks
    print("🧹 Clearing old audio chunks...")
    for wav_file in audio_chunks_dir.glob("*.wav"):
        wav_file.unlink()

    start_time = time.time()
    total_chunks = len(all_chunks)
    log_path = output_root / "debug_generation.log"
    
    print(f"\nπŸ”„ Generating {total_chunks} chunks...")

    with ThreadPoolExecutor(max_workers=2) as executor: # Test parallel processing
        futures = []
        for i, chunk_data in enumerate(all_chunks):
            # Extract exaggeration from JSON, force others to default
            chunk_tts_params = {
                "exaggeration": chunk_data.get("tts_params", {}).get("exaggeration", DEFAULT_EXAGGERATION),
                "cfg_weight": DEFAULT_CFG_WEIGHT,
                "temperature": DEFAULT_TEMPERATURE
            }

            future = executor.submit(
                process_one_chunk,
                i, chunk_data['text'], text_chunks_dir, audio_chunks_dir,
                voice_path, chunk_tts_params, start_time, total_chunks,
                punc_norm, book_name, log_run, log_path, device,
                model, None, chunk_data['is_paragraph_end'], all_chunks, chunk_data['boundary_type']
            )
            futures.append(future)

        for future in as_completed(futures):
            try:
                result = future.result()
                if result:
                    idx, _ = result
                    log_chunk_progress(idx, total_chunks, start_time, 0)
            except Exception as e:
                print(f"\n❌ An error occurred while processing a chunk: {e}")

    elapsed_time = time.time() - start_time
    print(f"\n{GREEN}βœ… Generation Complete!{RESET}")
    print(f"⏱️ Total time: {timedelta(seconds=int(elapsed_time))}")
    print(f"πŸ”Š Audio chunks are in: {audio_chunks_dir}")
    print("You can now use Option 3 from the main menu to combine them.")

if __name__ == "__main__":
    main()