chatterbox-ui/backend/app/services/dialog_processor_service.py

318 lines
17 KiB
Python

from pathlib import Path
from typing import List, Dict, Any, Union
import re
from .tts_service import TTSService
from .speaker_service import SpeakerManagementService
from app import config
# Potentially models for dialog structure if we define them
# from ..models.dialog_models import DialogItem # Example
class DialogProcessorService:
def __init__(self, tts_service: TTSService, speaker_service: SpeakerManagementService):
self.tts_service = tts_service
self.speaker_service = speaker_service
# Base directory for storing individual audio segments during processing
self.temp_audio_dir = config.TTS_TEMP_OUTPUT_DIR
self.temp_audio_dir.mkdir(parents=True, exist_ok=True)
def _split_text(self, text: str, max_length: int = 300) -> List[str]:
"""
Splits text into chunks suitable for TTS processing, attempting to respect sentence boundaries.
Similar to split_text_at_sentence_boundaries from the original Gradio app.
Max_length is approximate, as it tries to finish sentences.
"""
# Basic sentence splitting using common delimiters. More sophisticated NLP could be used.
# This regex tries to split by '.', '!', '?', '...', followed by space or end of string.
# It also handles cases where these delimiters might be followed by quotes or parentheses.
sentences = re.split(r'(?<=[.!?\u2026])\s+|(?<=[.!?\u2026])(?=["\')\]\}\u201d\u2019])|(?<=[.!?\u2026])$', text.strip())
sentences = [s.strip() for s in sentences if s and s.strip()]
chunks = []
current_chunk = ""
for sentence in sentences:
if not sentence:
continue
if not current_chunk: # First sentence for this chunk
current_chunk = sentence
elif len(current_chunk) + len(sentence) + 1 <= max_length:
current_chunk += " " + sentence
else:
chunks.append(current_chunk)
current_chunk = sentence
if current_chunk: # Add the last chunk
chunks.append(current_chunk)
# Further split any chunks that are still too long (e.g., a single very long sentence)
final_chunks = []
for chunk in chunks:
if len(chunk) > max_length:
# Simple split by length if a sentence itself is too long
for i in range(0, len(chunk), max_length):
final_chunks.append(chunk[i:i+max_length])
else:
final_chunks.append(chunk)
return final_chunks
async def process_dialog(self, dialog_items: List[Dict[str, Any]], output_base_name: str) -> Dict[str, Any]:
"""
Processes a list of dialog items (speech or silence) to generate audio segments.
Args:
dialog_items: A list of dictionaries, where each item has:
- 'type': 'speech' or 'silence'
- For 'speech': 'speaker_id': str, 'text': str
- For 'silence': 'duration': float (in seconds)
output_base_name: The base name for the output files.
Returns:
A dictionary containing paths to generated segments and other processing info.
Example: {
"log": "Processing complete...",
"segment_files": [
{"type": "speech", "path": "/path/to/segment1.wav", "speaker_id": "X", "text_chunk": "..."},
{"type": "silence", "duration": 0.5},
{"type": "speech", "path": "/path/to/segment2.wav", "speaker_id": "Y", "text_chunk": "..."}
],
"temp_dir": str(self.temp_audio_dir / output_base_name)
}
"""
segment_results = []
processing_log = []
# Create a unique subdirectory for this dialog's temporary files
dialog_temp_dir = self.temp_audio_dir / output_base_name
dialog_temp_dir.mkdir(parents=True, exist_ok=True)
processing_log.append(f"Created temporary directory for segments: {dialog_temp_dir}")
import shutil
segment_idx = 0
for i, item in enumerate(dialog_items):
item_type = item.get("type")
processing_log.append(f"Processing item {i+1}: type='{item_type}'")
# --- Universal: Handle reuse of existing audio for both speech and silence ---
use_existing_audio = item.get("use_existing_audio", False)
audio_url = item.get("audio_url")
if use_existing_audio and audio_url:
# Determine source path (handle both absolute and relative)
# Map web URL to actual file location in tts_generated_dialogs
if audio_url.startswith("/generated_audio/"):
src_audio_path = config.DIALOG_OUTPUT_DIR / audio_url[len("/generated_audio/"):]
else:
src_audio_path = Path(audio_url)
if not src_audio_path.is_absolute():
# Assume relative to the generated audio root dir
src_audio_path = config.DIALOG_OUTPUT_DIR / audio_url.lstrip("/\\")
# Now src_audio_path should point to the real file in tts_generated_dialogs
if src_audio_path.is_file():
segment_filename = f"{output_base_name}_seg{segment_idx}_reused.wav"
dest_path = (self.temp_audio_dir / output_base_name / segment_filename)
try:
if not src_audio_path.exists():
processing_log.append(f"[REUSE] Source audio file does not exist: {src_audio_path}")
else:
processing_log.append(f"[REUSE] Source audio file exists: {src_audio_path}, size={src_audio_path.stat().st_size} bytes")
shutil.copyfile(src_audio_path, dest_path)
if not dest_path.exists():
processing_log.append(f"[REUSE] Destination audio file was not created: {dest_path}")
else:
processing_log.append(f"[REUSE] Destination audio file created: {dest_path}, size={dest_path.stat().st_size} bytes")
# Only include 'type' and 'path' so the concatenator always includes this segment
segment_results.append({
"type": item_type,
"path": str(dest_path)
})
processing_log.append(f"Reused existing audio for item {i+1}: copied from {src_audio_path} to {dest_path}")
except Exception as e:
error_message = f"Failed to copy reused audio for item {i+1}: {e}"
processing_log.append(error_message)
segment_results.append({"type": "error", "message": error_message})
segment_idx += 1
continue
else:
error_message = f"Audio file for reuse not found at {src_audio_path} for item {i+1}."
processing_log.append(error_message)
segment_results.append({"type": "error", "message": error_message})
segment_idx += 1
continue
if item_type == "speech":
speaker_id = item.get("speaker_id")
text = item.get("text")
if not speaker_id or not text:
processing_log.append(f"Skipping speech item {i+1} due to missing speaker_id or text.")
segment_results.append({"type": "error", "message": "Missing speaker_id or text"})
continue
# Validate speaker_id and get speaker_sample_path
speaker_info = self.speaker_service.get_speaker_by_id(speaker_id)
if not speaker_info:
processing_log.append(f"Speaker ID '{speaker_id}' not found. Skipping item {i+1}.")
segment_results.append({"type": "error", "message": f"Speaker ID '{speaker_id}' not found"})
continue
if not speaker_info.sample_path:
processing_log.append(f"Speaker ID '{speaker_id}' has no sample path defined. Skipping item {i+1}.")
segment_results.append({"type": "error", "message": f"Speaker ID '{speaker_id}' has no sample path defined"})
continue
# speaker_info.sample_path is relative to config.SPEAKER_DATA_BASE_DIR
abs_speaker_sample_path = config.SPEAKER_DATA_BASE_DIR / speaker_info.sample_path
if not abs_speaker_sample_path.is_file():
processing_log.append(f"Speaker sample file not found or is not a file at '{abs_speaker_sample_path}' for speaker ID '{speaker_id}'. Skipping item {i+1}.")
segment_results.append({"type": "error", "message": f"Speaker sample not a file or not found: {abs_speaker_sample_path}"})
continue
text_chunks = self._split_text(text)
processing_log.append(f"Split text for speaker '{speaker_id}' into {len(text_chunks)} chunk(s).")
for chunk_idx, text_chunk in enumerate(text_chunks):
segment_filename_base = f"{output_base_name}_seg{segment_idx}_spk{speaker_id}_chunk{chunk_idx}"
processing_log.append(f"Generating speech for chunk: '{text_chunk[:50]}...' using speaker '{speaker_id}'")
try:
segment_output_path = await self.tts_service.generate_speech(
text=text_chunk,
speaker_id=speaker_id, # For metadata, actual sample path is used by TTS
speaker_sample_path=str(abs_speaker_sample_path),
output_filename_base=segment_filename_base,
output_dir=dialog_temp_dir, # Save to the dialog's temp dir
exaggeration=item.get('exaggeration', 0.5), # Default from Gradio, Pydantic model should provide this
cfg_weight=item.get('cfg_weight', 0.5), # Default from Gradio, Pydantic model should provide this
temperature=item.get('temperature', 0.8) # Default from Gradio, Pydantic model should provide this
)
segment_results.append({
"type": "speech",
"path": str(segment_output_path),
"speaker_id": speaker_id,
"text_chunk": text_chunk
})
processing_log.append(f"Successfully generated segment: {segment_output_path}")
except Exception as e:
error_message = f"Error generating speech for chunk '{text_chunk[:50]}...': {repr(e)}"
processing_log.append(error_message)
segment_results.append({"type": "error", "message": error_message, "text_chunk": text_chunk})
segment_idx += 1
elif item_type == "silence":
duration = item.get("duration")
if duration is None or duration < 0:
processing_log.append(f"Skipping silence item {i+1} due to invalid duration.")
segment_results.append({"type": "error", "message": "Invalid duration for silence"})
continue
segment_results.append({"type": "silence", "duration": float(duration)})
processing_log.append(f"Added silence of {duration}s.")
else:
processing_log.append(f"Unknown item type '{item_type}' at item {i+1}. Skipping.")
segment_results.append({"type": "error", "message": f"Unknown item type: {item_type}"})
# Log the full segment_results list for debugging
processing_log.append("[DEBUG] Final segment_results list:")
for idx, seg in enumerate(segment_results):
processing_log.append(f" [{idx}] {seg}")
return {
"log": "\n".join(processing_log),
"segment_files": segment_results,
"temp_dir": str(dialog_temp_dir) # For cleanup or zipping later
}
if __name__ == "__main__":
import asyncio
import pprint
async def main_test():
# Initialize services
tts_service = TTSService(device="mps") # or your preferred device
speaker_service = SpeakerManagementService()
dialog_processor = DialogProcessorService(tts_service, speaker_service)
# Ensure dummy speaker sample exists (TTSService test block usually creates this)
# For robustness, we can call the TTSService test logic or ensure it's run prior.
# Here, we assume dummy_speaker_test.wav is available as per previous steps.
# If not, the 'test_speaker_for_dialog_proc' will fail file validation.
# First, ensure the dummy speaker file is created by TTSService's own test logic
# This is a bit of a hack for testing; ideally, test assets are managed independently.
try:
print("Ensuring dummy speaker sample is created by running TTSService's main_test logic...")
from .tts_service import main_test as tts_main_test
await tts_main_test() # This will create the dummy_speaker_test.wav
print("TTSService main_test completed, dummy sample should exist.")
except ImportError:
print("Could not import tts_service.main_test directly. Ensure dummy_speaker_test.wav exists.")
except Exception as e:
print(f"Error running tts_service.main_test for dummy sample creation: {e}")
print("Proceeding, but 'test_speaker_for_dialog_proc' might fail if sample is missing.")
sample_dialog_items = [
{
"type": "speech",
"speaker_id": "test_speaker_for_dialog_proc", # Defined in speakers.yaml
"text": "Hello world! This is the first speech segment."
},
{
"type": "silence",
"duration": 0.75
},
{
"type": "speech",
"speaker_id": "test_speaker_for_dialog_proc",
"text": "This is a much longer piece of text that should definitely be split into multiple, smaller chunks by the dialog processor. It contains several sentences. Let's see how it handles this. The maximum length is set to 300 characters, but it tries to respect sentence boundaries. This sentence itself is quite long and might even be split mid-sentence if it exceeds the hard limit after sentence splitting. We will observe the output carefully to ensure it works as expected, creating multiple audio files for this single text block if necessary."
},
{
"type": "speech",
"speaker_id": "non_existent_speaker_id",
"text": "This should fail because the speaker does not exist."
},
{
"type": "invalid_type",
"text": "This item has an invalid type."
},
{
"type": "speech",
"speaker_id": "test_speaker_for_dialog_proc",
"text": None # Test missing text
},
{
"type": "speech",
"speaker_id": None, # Test missing speaker_id
"text": "This is a test with a missing speaker ID."
},
{
"type": "silence",
"duration": -0.5 # Invalid duration
}
]
output_base_name = "dialog_processor_test_run"
try:
print(f"\nLoading TTS model for DialogProcessorService test...")
# TTSService's generate_speech will load the model if not already loaded.
# However, explicit load/unload is good practice for a test block.
tts_service.load_model()
print(f"\nProcessing dialog items with base name: {output_base_name}...")
results = await dialog_processor.process_dialog(sample_dialog_items, output_base_name)
print("\n--- Processing Log ---")
print(results.get("log"))
print("\n--- Segment Files / Results ---")
pprint.pprint(results.get("segment_files"))
print(f"\nTemporary directory used: {results.get('temp_dir')}")
print("\nPlease check the temporary directory for generated audio segments.")
except Exception as e:
import traceback
print(f"\nAn error occurred during the DialogProcessorService test:")
traceback.print_exc()
finally:
print("\nUnloading TTS model...")
tts_service.unload_model()
print("DialogProcessorService test finished.")
asyncio.run(main_test())