Add cbx-audiobook.py and import_helper.py

- Add audiobook generation CLI tool from dev branch
- Create import_helper.py to resolve backend service imports for CLI scripts
- All dependencies verified and working correctly

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Steve White 2025-06-26 15:04:55 -05:00
parent 3548485b4e
commit a983c31e54
2 changed files with 524 additions and 0 deletions

496
cbx-audiobook.py Executable file
View File

@ -0,0 +1,496 @@
#!/usr/bin/env python
"""
Chatterbox Audiobook Generator
This script converts a text file into an audiobook using the Chatterbox TTS system.
It parses the text file into manageable chunks, generates audio for each chunk,
and assembles them into a complete audiobook.
"""
import argparse
import asyncio
import gc
import os
import re
import subprocess
import sys
import torch
from pathlib import Path
import uuid
# Import helper to fix Python path
import import_helper
# Import backend services
from backend.app.services.tts_service import TTSService
from backend.app.services.speaker_service import SpeakerManagementService
from backend.app.services.audio_manipulation_service import AudioManipulationService
from backend.app.config import DIALOG_GENERATED_DIR, TTS_TEMP_OUTPUT_DIR
class AudiobookGenerator:
def __init__(self, speaker_id, output_base_name, device="mps",
exaggeration=0.5, cfg_weight=0.5, temperature=0.8,
pause_between_sentences=0.5, pause_between_paragraphs=1.0,
keep_model_loaded=False, cleanup_interval=10, use_subprocess=False):
"""
Initialize the audiobook generator.
Args:
speaker_id: ID of the speaker to use
output_base_name: Base name for output files
device: Device to use for TTS (mps, cuda, cpu)
exaggeration: Controls expressiveness (0.0-1.0)
cfg_weight: Controls alignment with speaker characteristics (0.0-1.0)
temperature: Controls randomness in generation (0.0-1.0)
pause_between_sentences: Pause duration between sentences in seconds
pause_between_paragraphs: Pause duration between paragraphs in seconds
keep_model_loaded: If True, keeps model loaded across chunks (more efficient but uses more memory)
cleanup_interval: How often to perform deep cleanup when keep_model_loaded=True
use_subprocess: If True, uses separate processes for each chunk (slower but guarantees memory release)
"""
self.speaker_id = speaker_id
self.output_base_name = output_base_name
self.device = device
self.exaggeration = exaggeration
self.cfg_weight = cfg_weight
self.temperature = temperature
self.pause_between_sentences = pause_between_sentences
self.pause_between_paragraphs = pause_between_paragraphs
self.keep_model_loaded = keep_model_loaded
self.cleanup_interval = cleanup_interval
self.use_subprocess = use_subprocess
self.chunk_counter = 0
# Initialize services
self.tts_service = TTSService(device=device)
self.speaker_service = SpeakerManagementService()
self.audio_manipulator = AudioManipulationService()
# Create output directories
self.output_dir = DIALOG_GENERATED_DIR / output_base_name
self.output_dir.mkdir(parents=True, exist_ok=True)
self.temp_dir = TTS_TEMP_OUTPUT_DIR / output_base_name
self.temp_dir.mkdir(parents=True, exist_ok=True)
# Validate speaker
self._validate_speaker()
def _validate_speaker(self):
"""Validate that the specified speaker exists."""
speaker_info = self.speaker_service.get_speaker_by_id(self.speaker_id)
if not speaker_info:
raise ValueError(f"Speaker ID '{self.speaker_id}' not found.")
if not speaker_info.sample_path:
raise ValueError(f"Speaker ID '{self.speaker_id}' has no sample path defined.")
# Store speaker info for later use
self.speaker_info = speaker_info
def _cleanup_memory(self):
"""Force memory cleanup and garbage collection."""
print("Performing memory cleanup...")
# Force garbage collection multiple times for thorough cleanup
for _ in range(3):
gc.collect()
# Clear device-specific caches
if self.device == "cuda" and torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
# Additional CUDA cleanup
try:
torch.cuda.reset_peak_memory_stats()
except:
pass
elif self.device == "mps" and torch.backends.mps.is_available():
if hasattr(torch.mps, "empty_cache"):
torch.mps.empty_cache()
if hasattr(torch.mps, "synchronize"):
torch.mps.synchronize()
# Try to free MPS memory more aggressively
try:
import os
# This forces MPS to release memory back to the system
if hasattr(torch.mps, "set_per_process_memory_fraction"):
current_allocated = torch.mps.current_allocated_memory() if hasattr(torch.mps, "current_allocated_memory") else 0
if current_allocated > 0:
torch.mps.empty_cache()
except:
pass
# Additional aggressive cleanup
if hasattr(torch, '_C') and hasattr(torch._C, '_cuda_clearCublasWorkspaces'):
try:
torch._C._cuda_clearCublasWorkspaces()
except:
pass
print("Memory cleanup completed.")
async def _generate_chunk_subprocess(self, chunk, segment_filename_base, speaker_sample_path):
"""
Generate a single chunk using cbx-generate.py in a subprocess.
This guarantees memory is released when the process exits.
"""
output_file = self.temp_dir / f"{segment_filename_base}.wav"
# Use cbx-generate.py for single chunk generation
cmd = [
sys.executable, "cbx-generate.py",
"--sample", str(speaker_sample_path),
"--output", str(output_file),
"--text", chunk,
"--device", self.device
]
print(f"Running subprocess: {' '.join(cmd[:4])} ... (text truncated)")
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300, # 5 minute timeout per chunk
cwd=Path(__file__).parent # Run from project root
)
if result.returncode != 0:
raise RuntimeError(f"Subprocess failed: {result.stderr}")
if not output_file.exists():
raise RuntimeError(f"Output file not created: {output_file}")
print(f"Subprocess completed successfully: {output_file}")
return output_file
except subprocess.TimeoutExpired:
raise RuntimeError(f"Subprocess timed out after 5 minutes")
except Exception as e:
raise RuntimeError(f"Subprocess error: {e}")
def split_text_into_chunks(self, text, max_length=300):
"""
Split text into chunks suitable for TTS processing.
This uses the same logic as the DialogProcessorService._split_text method
but adds additional paragraph handling.
"""
# Split text into paragraphs first
paragraphs = re.split(r'\n\s*\n', text)
paragraphs = [p.strip() for p in paragraphs if p.strip()]
all_chunks = []
for paragraph in paragraphs:
# Split paragraph into sentences
sentences = re.split(r'(?<=[.!?\u2026])\s+|(?<=[.!?\u2026])(?=[\"\')\]\}\u201d\u2019])|(?<=[.!?\u2026])$', paragraph.strip())
sentences = [s.strip() for s in sentences if s and s.strip()]
chunks = []
current_chunk = ""
for sentence in sentences:
if not sentence:
continue
if not current_chunk: # First sentence for this chunk
current_chunk = sentence
elif len(current_chunk) + len(sentence) + 1 <= max_length:
current_chunk += " " + sentence
else:
chunks.append(current_chunk)
current_chunk = sentence
if current_chunk: # Add the last chunk
chunks.append(current_chunk)
# Further split any chunks that are still too long
paragraph_chunks = []
for chunk in chunks:
if len(chunk) > max_length:
# Simple split by length if a sentence itself is too long
for i in range(0, len(chunk), max_length):
paragraph_chunks.append(chunk[i:i+max_length])
else:
paragraph_chunks.append(chunk)
# Add paragraph marker
if paragraph_chunks:
all_chunks.append({"type": "paragraph", "chunks": paragraph_chunks})
return all_chunks
async def generate_audiobook(self, text_file_path):
"""
Generate an audiobook from a text file.
Args:
text_file_path: Path to the text file to convert
Returns:
Path to the generated audiobook file
"""
# Read the text file
text_path = Path(text_file_path)
if not text_path.exists():
raise FileNotFoundError(f"Text file not found: {text_file_path}")
with open(text_path, 'r', encoding='utf-8') as f:
text = f.read()
print(f"Processing text file: {text_file_path}")
print(f"Text length: {len(text)} characters")
# Split text into chunks
paragraphs = self.split_text_into_chunks(text)
total_chunks = sum(len(p["chunks"]) for p in paragraphs)
print(f"Split into {len(paragraphs)} paragraphs with {total_chunks} total chunks")
# Generate audio for each chunk
segment_results = []
chunk_count = 0
# Pre-load model if keeping it loaded
if self.keep_model_loaded:
print("Pre-loading TTS model for batch processing...")
self.tts_service.load_model()
try:
for para_idx, paragraph in enumerate(paragraphs):
print(f"Processing paragraph {para_idx+1}/{len(paragraphs)}")
for chunk_idx, chunk in enumerate(paragraph["chunks"]):
chunk_count += 1
self.chunk_counter += 1
print(f" Generating audio for chunk {chunk_count}/{total_chunks}: {chunk[:50]}...")
# Generate unique filename for this chunk
segment_filename_base = f"{self.output_base_name}_p{para_idx}_c{chunk_idx}_{uuid.uuid4().hex[:8]}"
try:
# Get absolute speaker sample path
speaker_sample_path = Path(self.speaker_info.sample_path)
if not speaker_sample_path.is_absolute():
from backend.app.config import SPEAKER_DATA_BASE_DIR
speaker_sample_path = SPEAKER_DATA_BASE_DIR / speaker_sample_path
# Generate speech for this chunk
if self.use_subprocess:
# Use subprocess for guaranteed memory release
segment_output_path = await self._generate_chunk_subprocess(
chunk=chunk,
segment_filename_base=segment_filename_base,
speaker_sample_path=speaker_sample_path
)
else:
# Load model for this chunk (if not keeping loaded)
if not self.keep_model_loaded:
print("Loading TTS model...")
self.tts_service.load_model()
# Generate speech using the TTS service
segment_output_path = await self.tts_service.generate_speech(
text=chunk,
speaker_id=self.speaker_id,
speaker_sample_path=str(speaker_sample_path),
output_filename_base=segment_filename_base,
output_dir=self.temp_dir,
exaggeration=self.exaggeration,
cfg_weight=self.cfg_weight,
temperature=self.temperature
)
# Memory management strategy based on model lifecycle
if self.use_subprocess:
# No memory management needed - subprocess handles it
pass
elif self.keep_model_loaded:
# Light cleanup after each chunk
if self.chunk_counter % self.cleanup_interval == 0:
print(f"Performing periodic deep cleanup (chunk {self.chunk_counter})")
self._cleanup_memory()
else:
# Explicit memory cleanup after generation
self._cleanup_memory()
# Unload model after generation
print("Unloading TTS model...")
self.tts_service.unload_model()
# Additional memory cleanup after model unload
self._cleanup_memory()
# Add to segment results
segment_results.append({
"type": "speech",
"path": str(segment_output_path)
})
# Add pause between sentences
if chunk_idx < len(paragraph["chunks"]) - 1:
segment_results.append({
"type": "silence",
"duration": self.pause_between_sentences
})
except Exception as e:
print(f"Error generating speech for chunk: {e}")
# Ensure model is unloaded if there was an error and not using subprocess
if not self.use_subprocess:
if not self.keep_model_loaded and self.tts_service.model is not None:
print("Unloading TTS model after error...")
self.tts_service.unload_model()
# Force cleanup after error
self._cleanup_memory()
# Continue with next chunk
# Add longer pause between paragraphs
if para_idx < len(paragraphs) - 1:
segment_results.append({
"type": "silence",
"duration": self.pause_between_paragraphs
})
finally:
# Always unload model at the end if it was kept loaded
if self.keep_model_loaded and self.tts_service.model is not None:
print("Final cleanup: Unloading TTS model...")
self.tts_service.unload_model()
self._cleanup_memory()
# Concatenate all segments
print("Concatenating audio segments...")
concatenated_filename = f"{self.output_base_name}_audiobook.wav"
concatenated_path = self.output_dir / concatenated_filename
self.audio_manipulator.concatenate_audio_segments(
segment_results=segment_results,
output_concatenated_path=concatenated_path
)
# Create ZIP archive with all files
print("Creating ZIP archive...")
zip_filename = f"{self.output_base_name}_audiobook.zip"
zip_path = self.output_dir / zip_filename
# Collect all speech segment files
speech_segment_paths = [
Path(s["path"]) for s in segment_results
if s["type"] == "speech" and Path(s["path"]).exists()
]
self.audio_manipulator.create_zip_archive(
segment_file_paths=speech_segment_paths,
concatenated_audio_path=concatenated_path,
output_zip_path=zip_path
)
print(f"Audiobook generation complete!")
print(f"Audiobook file: {concatenated_path}")
print(f"ZIP archive: {zip_path}")
# Ensure model is unloaded at the end (just in case)
if self.tts_service.model is not None:
print("Final check: Unloading TTS model...")
self.tts_service.unload_model()
return concatenated_path
async def main():
parser = argparse.ArgumentParser(description="Generate an audiobook from a text file using Chatterbox TTS")
# Create a mutually exclusive group for the main operation vs listing speakers
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--list-speakers", action="store_true", help="List available speakers and exit")
group.add_argument("text_file", nargs="?", help="Path to the text file to convert")
# Other arguments
parser.add_argument("--speaker", "-s", help="ID of the speaker to use")
parser.add_argument("--output", "-o", help="Base name for output files (default: derived from text filename)")
parser.add_argument("--device", default="mps", choices=["mps", "cuda", "cpu"], help="Device to use for TTS (default: mps)")
parser.add_argument("--exaggeration", type=float, default=0.5, help="Controls expressiveness (0.0-1.0, default: 0.5)")
parser.add_argument("--cfg-weight", type=float, default=0.5, help="Controls alignment with speaker (0.0-1.0, default: 0.5)")
parser.add_argument("--temperature", type=float, default=0.8, help="Controls randomness (0.0-1.0, default: 0.8)")
parser.add_argument("--sentence-pause", type=float, default=0.5, help="Pause between sentences in seconds (default: 0.5)")
parser.add_argument("--paragraph-pause", type=float, default=1.0, help="Pause between paragraphs in seconds (default: 1.0)")
parser.add_argument("--keep-model-loaded", action="store_true", help="Keep model loaded between chunks (faster but uses more memory)")
parser.add_argument("--cleanup-interval", type=int, default=10, help="How often to perform deep cleanup when keeping model loaded (default: 10)")
parser.add_argument("--force-cpu-on-oom", action="store_true", help="Automatically switch to CPU if MPS/CUDA runs out of memory")
parser.add_argument("--max-chunk-length", type=int, default=300, help="Maximum chunk length for text splitting (default: 300)")
parser.add_argument("--use-subprocess", action="store_true", help="Use separate processes for each chunk (guarantees memory release but slower)")
args = parser.parse_args()
# List speakers if requested
if args.list_speakers:
speaker_service = SpeakerManagementService()
speakers = speaker_service.get_speakers()
print("Available speakers:")
for speaker in speakers:
print(f" {speaker.id}: {speaker.name}")
return
# Validate required arguments for audiobook generation
if not args.text_file:
parser.error("text_file is required when not using --list-speakers")
if not args.speaker:
parser.error("--speaker/-s is required when not using --list-speakers")
# Determine output base name if not provided
if not args.output:
text_path = Path(args.text_file)
args.output = text_path.stem
try:
# Create audiobook generator
generator = AudiobookGenerator(
speaker_id=args.speaker,
output_base_name=args.output,
device=args.device,
exaggeration=args.exaggeration,
cfg_weight=args.cfg_weight,
temperature=args.temperature,
pause_between_sentences=args.sentence_pause,
pause_between_paragraphs=args.paragraph_pause,
keep_model_loaded=args.keep_model_loaded,
cleanup_interval=args.cleanup_interval,
use_subprocess=args.use_subprocess
)
# Generate audiobook with automatic fallback
try:
await generator.generate_audiobook(args.text_file)
except (RuntimeError, torch.OutOfMemoryError) as e:
if args.force_cpu_on_oom and "out of memory" in str(e).lower() and args.device != "cpu":
print(f"\n⚠️ {args.device.upper()} out of memory: {e}")
print("🔄 Automatically switching to CPU and retrying...")
# Create new generator with CPU
generator = AudiobookGenerator(
speaker_id=args.speaker,
output_base_name=args.output,
device="cpu",
exaggeration=args.exaggeration,
cfg_weight=args.cfg_weight,
temperature=args.temperature,
pause_between_sentences=args.sentence_pause,
pause_between_paragraphs=args.paragraph_pause,
keep_model_loaded=args.keep_model_loaded,
cleanup_interval=args.cleanup_interval,
use_subprocess=args.use_subprocess
)
await generator.generate_audiobook(args.text_file)
print("✅ Successfully completed using CPU fallback!")
else:
raise
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(main()))

28
import_helper.py Normal file
View File

@ -0,0 +1,28 @@
#!/usr/bin/env python3
"""
Import helper module for CLI scripts that need to import backend services.
This ensures the Python path is set up correctly to import from the backend directory.
"""
import sys
from pathlib import Path
# Add the project root to the Python path
PROJECT_ROOT = Path(__file__).parent.resolve()
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
# Add the backend directory to the Python path for app.* imports
BACKEND_ROOT = PROJECT_ROOT / "backend"
if str(BACKEND_ROOT) not in sys.path:
sys.path.insert(0, str(BACKEND_ROOT))
# Verify that we can import from backend
try:
from backend.app.config import PROJECT_ROOT as CONFIG_PROJECT_ROOT
from app.services.tts_service import TTSService
from app.services.speaker_service import SpeakerManagementService
except ImportError as e:
print(f"Warning: Could not import backend services: {e}")
print(f"Make sure you're running from the project root directory: {PROJECT_ROOT}")
print(f"Backend directory: {BACKEND_ROOT}")