696 lines
32 KiB
Python
696 lines
32 KiB
Python
"""
|
|
Report synthesis module for the intelligent research system.
|
|
|
|
This module provides functionality to synthesize reports from document chunks
|
|
using LLMs with a map-reduce approach.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import asyncio
|
|
import logging
|
|
from typing import Dict, List, Any, Optional, Tuple, Union
|
|
|
|
import litellm
|
|
from litellm import completion
|
|
|
|
from config.config import get_config
|
|
from report.report_detail_levels import get_report_detail_level_manager, DetailLevel
|
|
from report.report_templates import QueryType, DetailLevel as TemplateDetailLevel, ReportTemplateManager, ReportTemplate
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Note: ReportTemplateManager and ReportTemplate are now imported from report_templates.py
|
|
|
|
class ReportSynthesizer:
|
|
"""
|
|
Report synthesizer for the intelligent research system.
|
|
|
|
This class provides methods to synthesize reports from document chunks
|
|
using LLMs with a map-reduce approach.
|
|
"""
|
|
|
|
def __init__(self, model_name: Optional[str] = None):
|
|
"""
|
|
Initialize the report synthesizer.
|
|
|
|
Args:
|
|
model_name: Name of the LLM model to use. If None, uses the default model
|
|
from configuration.
|
|
"""
|
|
self.config = get_config()
|
|
|
|
# Use specified model or default from config for report synthesis
|
|
self.model_name = model_name or self.config.config_data.get('report_synthesis', {}).get('model', 'llama-3.3-70b-versatile')
|
|
|
|
# Get model-specific configuration
|
|
self.model_config = self.config.get_model_config(self.model_name)
|
|
|
|
# Set up LiteLLM with the appropriate provider
|
|
self._setup_provider()
|
|
|
|
# Initialize template manager
|
|
self.template_manager = ReportTemplateManager()
|
|
self.template_manager.initialize_default_templates()
|
|
|
|
# Flag to process <thinking> tags in model output
|
|
self.process_thinking_tags = False
|
|
|
|
# Progress tracking
|
|
self.progress_callback = None
|
|
self.total_chunks = 0
|
|
self.processed_chunk_count = 0
|
|
|
|
def set_progress_callback(self, callback):
|
|
"""
|
|
Set a callback function to report progress.
|
|
|
|
Args:
|
|
callback: Function that takes (current_progress, total, current_report) as arguments
|
|
"""
|
|
self.progress_callback = callback
|
|
|
|
def _report_progress(self, current_report=None):
|
|
"""Report progress through the callback if set."""
|
|
if self.progress_callback and self.total_chunks > 0:
|
|
progress = min(self.processed_chunk_count / self.total_chunks, 1.0)
|
|
self.progress_callback(progress, self.total_chunks, current_report)
|
|
|
|
def _setup_provider(self) -> None:
|
|
"""Set up the LLM provider based on the model configuration."""
|
|
provider = self.model_config.get('provider', 'groq')
|
|
|
|
try:
|
|
# Get API key for the provider
|
|
api_key = self.config.get_api_key(provider)
|
|
|
|
# Set environment variable for the provider
|
|
if provider.lower() == 'google' or provider.lower() == 'gemini':
|
|
os.environ["GEMINI_API_KEY"] = api_key
|
|
elif provider.lower() == 'vertex_ai':
|
|
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = api_key
|
|
else:
|
|
os.environ[f"{provider.upper()}_API_KEY"] = api_key
|
|
|
|
logger.info(f"Report synthesizer initialized with model: {self.model_name} (provider: {provider})")
|
|
except ValueError as e:
|
|
logger.error(f"Error setting up LLM provider: {e}")
|
|
|
|
def _get_completion_params(self) -> Dict[str, Any]:
|
|
"""
|
|
Get parameters for LLM completion based on model configuration.
|
|
|
|
Returns:
|
|
Dictionary of parameters for LiteLLM completion
|
|
"""
|
|
params = {
|
|
'temperature': self.model_config.get('temperature', 0.3), # Lower temperature for factual reporting
|
|
'max_tokens': self.model_config.get('max_tokens', 4000), # Longer output for comprehensive reports
|
|
'top_p': self.model_config.get('top_p', 0.9)
|
|
}
|
|
|
|
# Handle different provider configurations
|
|
provider = self.model_config.get('provider', 'groq')
|
|
|
|
if provider == 'groq':
|
|
# For Groq provider
|
|
params['model'] = f"groq/{self.model_name}"
|
|
elif provider == 'openrouter':
|
|
# For OpenRouter provider
|
|
params['model'] = self.model_config.get('model_name', self.model_name)
|
|
params['api_base'] = self.model_config.get('endpoint')
|
|
|
|
# Set HTTP headers for OpenRouter if needed
|
|
params['headers'] = {
|
|
'HTTP-Referer': 'https://sim-search.app', # Replace with your actual app URL
|
|
'X-Title': 'Intelligent Research System' # Replace with your actual app name
|
|
}
|
|
elif provider == 'google' or provider == 'gemini':
|
|
# Special handling for Google Gemini models
|
|
# Format: gemini/model_name (e.g., gemini/gemini-2.0-flash)
|
|
params['model'] = f"gemini/{self.model_config.get('model_name', self.model_name)}"
|
|
|
|
# Add additional parameters for Gemini
|
|
params['custom_llm_provider'] = 'gemini'
|
|
elif provider == 'vertex_ai':
|
|
# Special handling for Vertex AI Gemini models
|
|
params['model'] = f"vertex_ai/{self.model_config.get('model_name', self.model_name)}"
|
|
|
|
# Add Vertex AI specific parameters
|
|
params['vertex_project'] = self.model_config.get('vertex_project', 'sim-search')
|
|
params['vertex_location'] = self.model_config.get('vertex_location', 'us-central1')
|
|
|
|
# Set custom provider
|
|
params['custom_llm_provider'] = 'vertex_ai'
|
|
else:
|
|
# Standard provider (OpenAI, Anthropic, etc.)
|
|
params['model'] = self.model_name
|
|
|
|
return params
|
|
|
|
async def generate_completion(self, messages: List[Dict[str, str]], stream: bool = False) -> Union[str, Any]:
|
|
"""
|
|
Generate a completion using the configured LLM.
|
|
|
|
Args:
|
|
messages: List of message dictionaries with 'role' and 'content' keys
|
|
stream: Whether to stream the response
|
|
|
|
Returns:
|
|
If stream is False, returns the completion text as a string
|
|
If stream is True, returns the completion response object for streaming
|
|
"""
|
|
# Get provider from model config
|
|
provider = self.model_config.get('provider', 'groq').lower()
|
|
|
|
# Special handling for Gemini models - they use 'user' and 'model' roles
|
|
if provider == 'gemini':
|
|
formatted_messages = []
|
|
for msg in messages:
|
|
role = msg['role']
|
|
# Map 'system' to 'user' for the first message
|
|
if role == 'system' and not formatted_messages:
|
|
formatted_messages.append({
|
|
'role': 'user',
|
|
'content': msg['content']
|
|
})
|
|
# Map 'assistant' to 'model'
|
|
elif role == 'assistant':
|
|
formatted_messages.append({
|
|
'role': 'model',
|
|
'content': msg['content']
|
|
})
|
|
# Keep 'user' as is
|
|
else:
|
|
formatted_messages.append(msg)
|
|
else:
|
|
formatted_messages = messages
|
|
|
|
# Get completion parameters
|
|
params = self._get_completion_params()
|
|
|
|
try:
|
|
# Generate completion
|
|
if stream:
|
|
response = litellm.completion(
|
|
messages=formatted_messages,
|
|
stream=True,
|
|
**params
|
|
)
|
|
return response
|
|
else:
|
|
response = litellm.completion(
|
|
messages=formatted_messages,
|
|
**params
|
|
)
|
|
|
|
# Extract content from response
|
|
content = response.choices[0].message.content
|
|
|
|
# Process thinking tags if enabled
|
|
if self.process_thinking_tags:
|
|
content = self._process_thinking_tags(content)
|
|
|
|
return content
|
|
except Exception as e:
|
|
error_msg = f"Error generating completion: {str(e)}"
|
|
logger.error(error_msg)
|
|
|
|
# Return error message in a user-friendly format
|
|
return f"I encountered an error while processing your request: {str(e)}"
|
|
|
|
def _process_thinking_tags(self, content: str) -> str:
|
|
"""
|
|
Process and remove <thinking> tags from model output.
|
|
|
|
Some models like deepseek-r1-distill use <thinking> tags for their internal reasoning.
|
|
This method removes these tags and their content to produce a clean output.
|
|
|
|
Args:
|
|
content: The raw content from the model
|
|
|
|
Returns:
|
|
Processed content with thinking tags removed
|
|
"""
|
|
import re
|
|
|
|
# Remove <thinking>...</thinking> blocks
|
|
clean_content = re.sub(r'<thinking>.*?</thinking>', '', content, flags=re.DOTALL)
|
|
|
|
# Clean up any remaining tags
|
|
clean_content = re.sub(r'</?thinking>', '', clean_content)
|
|
|
|
# Remove extra newlines that might have been created
|
|
clean_content = re.sub(r'\n{3,}', '\n\n', clean_content)
|
|
|
|
return clean_content.strip()
|
|
|
|
async def map_document_chunks(self, chunks: List[Dict[str, Any]], query: str, detail_level: str = "standard") -> List[Dict[str, Any]]:
|
|
"""
|
|
Map phase: Process individual document chunks to extract key information.
|
|
|
|
Args:
|
|
chunks: List of document chunks
|
|
query: Original search query
|
|
detail_level: Level of detail for the report (brief, standard, detailed, comprehensive)
|
|
|
|
Returns:
|
|
List of processed chunks with extracted information
|
|
"""
|
|
processed_chunks = []
|
|
|
|
# Get the appropriate extraction prompt based on detail level
|
|
extraction_prompt = self._get_extraction_prompt(detail_level)
|
|
|
|
total_chunks = len(chunks)
|
|
logger.info(f"Starting to process {total_chunks} document chunks")
|
|
|
|
# Determine batch size based on the model - Gemini can handle larger batches
|
|
if "gemini" in self.model_name.lower():
|
|
batch_size = 8 # Larger batch size for Gemini models with 1M token windows
|
|
else:
|
|
batch_size = 3 # Smaller batch size for other models
|
|
|
|
logger.info(f"Using batch size of {batch_size} for model {self.model_name}")
|
|
|
|
for i in range(0, len(chunks), batch_size):
|
|
batch = chunks[i:i+batch_size]
|
|
logger.info(f"Processing batch {i//batch_size + 1}/{(len(chunks) + batch_size - 1)//batch_size} with {len(batch)} chunks")
|
|
|
|
# Process this batch
|
|
batch_results = []
|
|
for j, chunk in enumerate(batch):
|
|
chunk_title = chunk.get('title', 'Untitled')
|
|
chunk_index = i + j + 1
|
|
logger.info(f"Processing chunk {chunk_index}/{total_chunks}: {chunk_title[:50] if chunk_title else 'Untitled'}...")
|
|
|
|
# Create a prompt for extracting key information from the chunk
|
|
messages = [
|
|
{"role": "system", "content": extraction_prompt},
|
|
{"role": "user", "content": f"""Query: {query}
|
|
|
|
Document title: {chunk.get('title', 'Untitled')}
|
|
Document URL: {chunk.get('url', 'Unknown')}
|
|
|
|
Document chunk content:
|
|
{chunk.get('content', '')}
|
|
|
|
Extract the most relevant information from this document chunk that addresses the query."""}
|
|
]
|
|
|
|
try:
|
|
# Process the chunk with the LLM
|
|
extracted_info = await self.generate_completion(messages)
|
|
|
|
# Add the extracted information to the chunk
|
|
processed_chunk = chunk.copy()
|
|
processed_chunk['extracted_info'] = extracted_info
|
|
batch_results.append(processed_chunk)
|
|
|
|
# Update progress
|
|
self.processed_chunk_count += 1
|
|
self._report_progress()
|
|
|
|
logger.info(f"Completed chunk {chunk_index}/{total_chunks} ({chunk_index/total_chunks*100:.1f}% complete)")
|
|
except Exception as e:
|
|
logger.error(f"Error processing chunk {chunk_index}/{total_chunks}: {str(e)}")
|
|
# Add a placeholder for the failed chunk to maintain document order
|
|
processed_chunk = chunk.copy()
|
|
processed_chunk['extracted_info'] = f"Error extracting information: {str(e)}"
|
|
batch_results.append(processed_chunk)
|
|
|
|
# Update progress even for failed chunks
|
|
self.processed_chunk_count += 1
|
|
self._report_progress()
|
|
|
|
processed_chunks.extend(batch_results)
|
|
|
|
# Add a small delay between batches to avoid rate limiting
|
|
if i + batch_size < len(chunks):
|
|
logger.info("Pausing briefly between batches...")
|
|
await asyncio.sleep(2)
|
|
|
|
logger.info(f"Completed processing all {total_chunks} chunks")
|
|
return processed_chunks
|
|
|
|
def _get_extraction_prompt(self, detail_level: str) -> str:
|
|
"""
|
|
Get the appropriate extraction prompt based on detail level.
|
|
|
|
Args:
|
|
detail_level: Level of detail for the report (brief, standard, detailed, comprehensive)
|
|
|
|
Returns:
|
|
Extraction prompt as a string
|
|
"""
|
|
if detail_level.lower() in ["brief", "standard"]:
|
|
return """You are an expert research assistant. Extract the most relevant information from this document chunk that addresses the user's query.
|
|
Focus on factual information, key concepts, and important details.
|
|
Include any relevant statistics, definitions, or explanations that would be valuable for a report.
|
|
Format your response as a concise summary with bullet points for key facts."""
|
|
elif detail_level.lower() == "detailed":
|
|
return """You are an expert research analyst with deep domain knowledge. Extract comprehensive information from this document chunk that addresses the user's query.
|
|
Focus on:
|
|
- Detailed factual information and evidence
|
|
- Underlying principles and mechanisms
|
|
- Causal relationships and correlations
|
|
- Contextual factors and historical development
|
|
- Different perspectives or interpretations
|
|
- Quantitative data and qualitative insights
|
|
- Nuances, edge cases, and exceptions
|
|
|
|
Prioritize depth of analysis over breadth. Extract information that provides deeper understanding rather than just basic facts.
|
|
Format your response with clear sections and bullet points for key insights."""
|
|
else: # comprehensive
|
|
return """You are a world-class research analyst with exceptional analytical abilities. Extract the most comprehensive and nuanced information from this document chunk.
|
|
Focus on:
|
|
- Multi-layered analysis of all relevant facts and evidence
|
|
- Complex causal networks and interaction effects
|
|
- Theoretical frameworks and their applications
|
|
- Historical evolution and future trajectories
|
|
- Methodological considerations and limitations
|
|
- Diverse perspectives and their epistemological foundations
|
|
- Statistical data, case studies, and expert opinions
|
|
- Contradictions, paradoxes, and unresolved questions
|
|
|
|
Extract information that provides the deepest possible understanding of the topic as it relates to the query.
|
|
Analyze the reliability and significance of the information.
|
|
Format your response with clearly organized sections and detailed bullet points."""
|
|
|
|
def _get_template_from_strings(self, query_type_str: str, detail_level_str: str) -> Optional[ReportTemplate]:
|
|
"""
|
|
Helper method to get a template using string values for query_type and detail_level.
|
|
|
|
Args:
|
|
query_type_str: String value of query type (factual, exploratory, comparative)
|
|
detail_level_str: String value of detail level (brief, standard, detailed, comprehensive)
|
|
|
|
Returns:
|
|
ReportTemplate object or None if not found
|
|
"""
|
|
try:
|
|
# Convert string values to enum objects
|
|
query_type_enum = QueryType(query_type_str)
|
|
detail_level_enum = TemplateDetailLevel(detail_level_str)
|
|
|
|
# Get template using enum objects
|
|
template = self.template_manager.get_template(query_type_enum, detail_level_enum)
|
|
if template:
|
|
logger.info(f"Found template for {query_type_str} {detail_level_str}")
|
|
else:
|
|
logger.warning(f"No template found for {query_type_str} {detail_level_str}")
|
|
return template
|
|
except (ValueError, KeyError) as e:
|
|
logger.error(f"Error getting template for {query_type_str} {detail_level_str}: {str(e)}")
|
|
return None
|
|
|
|
async def reduce_processed_chunks(self, processed_chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory", detail_level: str = "standard") -> str:
|
|
"""
|
|
Reduce phase: Synthesize processed chunks into a coherent report.
|
|
|
|
Args:
|
|
processed_chunks: List of processed chunks with extracted information
|
|
query: Original search query
|
|
query_type: Type of query (factual, exploratory, comparative)
|
|
detail_level: Level of detail for the report (brief, standard, detailed, comprehensive)
|
|
|
|
Returns:
|
|
Synthesized report as a string
|
|
"""
|
|
# Prepare the context with all extracted information
|
|
context = ""
|
|
for i, chunk in enumerate(processed_chunks):
|
|
title = chunk.get('title', 'Untitled')
|
|
url = chunk.get('url', 'Unknown')
|
|
|
|
context += f"Document {i+1}:\n"
|
|
context += f"Title: {title}\n"
|
|
context += f"URL: {url}\n"
|
|
context += f"Source URL: {url}\n" # Duplicate for emphasis
|
|
context += f"Extracted information:\n{chunk.get('extracted_info', '')}\n\n"
|
|
|
|
# Get template modifier based on detail level and query type using helper method
|
|
template = self._get_template_from_strings(query_type, detail_level)
|
|
|
|
if not template:
|
|
raise ValueError(f"No template found for {query_type} {detail_level}")
|
|
|
|
# Add specific instructions for references formatting
|
|
reference_instructions = """
|
|
When including references, use a consistent format:
|
|
|
|
[1] Title of the Article/Page. URL
|
|
|
|
IMPORTANT:
|
|
1. DO NOT use generic placeholders like "Document 1" for references
|
|
2. ALWAYS include the actual URL from the source documents
|
|
3. Each reference MUST include both the title and the URL
|
|
4. Make sure all references are complete and properly formatted
|
|
5. Number the references sequentially starting from 1
|
|
6. Include the URL for EACH reference - this is critical.
|
|
"""
|
|
|
|
# Special handling for Gemini models
|
|
if "gemini" in self.model_name.lower():
|
|
reference_instructions += """
|
|
IMPORTANT: Due to token limitations, ensure the References section is completed properly.
|
|
If you feel you might run out of tokens, start the References section earlier and make it more concise.
|
|
Never leave the References section incomplete or cut off mid-reference.
|
|
"""
|
|
|
|
# Create the prompt for synthesizing the report
|
|
messages = [
|
|
{"role": "system", "content": f"""You are an expert research assistant tasked with creating comprehensive, well-structured reports.
|
|
{template.template}
|
|
|
|
Format the report in Markdown with clear headings, subheadings, and bullet points where appropriate.
|
|
Make the report readable, engaging, and informative while maintaining academic rigor.
|
|
|
|
{reference_instructions}"""},
|
|
{"role": "user", "content": f"""Query: {query}
|
|
|
|
Information from sources:
|
|
{context}
|
|
|
|
Synthesize this information into a report that addresses the query. Use your own words to create a coherent narrative, but ensure all information is based on the provided sources. Include citations and a references section."""}
|
|
]
|
|
|
|
# Generate the report
|
|
report = await self.generate_completion(messages)
|
|
|
|
# Check if the report might be cut off at the end
|
|
if report.strip().endswith('[') or report.strip().endswith(']') or report.strip().endswith('...'):
|
|
logger.warning("Report appears to be cut off at the end. Attempting to fix references section.")
|
|
|
|
# Try to fix the references section by generating it separately
|
|
try:
|
|
# Extract what we have so far without the incomplete references
|
|
if "References" in report:
|
|
report_without_refs = report.split("References")[0].strip()
|
|
else:
|
|
report_without_refs = report
|
|
|
|
# Generate just the references section
|
|
ref_messages = [
|
|
{"role": "system", "content": """You are an expert at formatting reference lists. Create a properly formatted References section for the following documents.
|
|
|
|
IMPORTANT:
|
|
1. Use the actual title and URL from each document
|
|
2. DO NOT use generic placeholders like "Document 1"
|
|
3. Format each reference as: [1] Title of the Article/Page. URL
|
|
4. Each reference MUST include both the title and the URL
|
|
5. Make sure all references are complete and properly formatted
|
|
6. Number the references sequentially starting from 1"""},
|
|
{"role": "user", "content": f"""Here are the documents used in the report:
|
|
|
|
{context}
|
|
|
|
Create a complete, properly formatted References section in Markdown format.
|
|
Remember to include the URL for EACH reference - this is critical."""}
|
|
]
|
|
|
|
references = await self.generate_completion(ref_messages)
|
|
|
|
# Combine the report with the fixed references
|
|
report = f"{report_without_refs}\n\n## References\n\n{references}"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fixing references section: {str(e)}")
|
|
|
|
return report
|
|
|
|
async def synthesize_report(self, chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory", detail_level: str = "standard") -> str:
|
|
"""
|
|
Synthesize a report from document chunks using the map-reduce approach.
|
|
|
|
Args:
|
|
chunks: List of document chunks
|
|
query: Original search query
|
|
query_type: Type of query (factual, exploratory, comparative)
|
|
detail_level: Level of detail for the report (brief, standard, detailed, comprehensive)
|
|
|
|
Returns:
|
|
Synthesized report as a string
|
|
"""
|
|
if not chunks:
|
|
logger.warning("No document chunks provided for report synthesis.")
|
|
return "No information found for the given query."
|
|
|
|
# Reset progress tracking
|
|
self.total_chunks = len(chunks)
|
|
self.processed_chunk_count = 0
|
|
|
|
# Verify that a template exists for the given query type and detail level
|
|
template = self._get_template_from_strings(query_type, detail_level)
|
|
if not template:
|
|
logger.warning(f"No template found for {query_type} {detail_level}, falling back to standard template")
|
|
# Fall back to standard detail level if the requested one doesn't exist
|
|
detail_level = "standard"
|
|
|
|
# Get detail level configuration
|
|
detail_level_manager = get_report_detail_level_manager()
|
|
config = detail_level_manager.get_detail_level_config(detail_level)
|
|
token_budget = config.get("token_budget", 100000)
|
|
|
|
# Determine query type if not specified
|
|
if query_type == "exploratory":
|
|
# Try to infer query type from the query text
|
|
if any(term in query.lower() for term in ["what is", "who is", "when did", "where is", "how does"]):
|
|
query_type = "factual"
|
|
elif any(term in query.lower() for term in ["compare", "difference", "versus", "pros and cons"]):
|
|
query_type = "comparative"
|
|
|
|
logger.info(f"Query type determined as: {query_type}")
|
|
|
|
# Estimate total tokens in chunks
|
|
total_tokens = sum(len(chunk.get('content', '').split()) * 1.3 for chunk in chunks) # Rough estimate
|
|
logger.info(f"Estimated total tokens in {len(chunks)} chunks: {total_tokens}")
|
|
|
|
# If total tokens exceeds 80% of the token budget, reduce the number of chunks
|
|
if total_tokens > token_budget * 0.8:
|
|
max_chunks = int(len(chunks) * (token_budget * 0.8 / total_tokens))
|
|
max_chunks = max(3, max_chunks) # Ensure we have at least 3 chunks
|
|
logger.warning(f"Token count ({total_tokens}) exceeds 80% of budget ({token_budget}). Reducing chunks from {len(chunks)} to {max_chunks}.")
|
|
chunks = chunks[:max_chunks]
|
|
# Recalculate estimated tokens
|
|
total_tokens = sum(len(chunk.get('content', '').split()) * 1.3 for chunk in chunks)
|
|
logger.info(f"Reduced to {len(chunks)} chunks with estimated {total_tokens} tokens")
|
|
|
|
# Update total chunks for progress tracking
|
|
self.total_chunks = len(chunks)
|
|
|
|
logger.info(f"Starting map phase for {len(chunks)} document chunks with query type '{query_type}' and detail level '{detail_level}'")
|
|
|
|
# Process chunks in batches to avoid hitting payload limits
|
|
# Determine batch size based on the model - Gemini can handle larger batches
|
|
if "gemini" in self.model_name.lower():
|
|
batch_size = 8 # Larger batch size for Gemini models with 1M token windows
|
|
else:
|
|
batch_size = 3 # Smaller batch size for other models
|
|
|
|
logger.info(f"Using batch size of {batch_size} for model {self.model_name}")
|
|
processed_chunks = []
|
|
|
|
for i in range(0, len(chunks), batch_size):
|
|
batch = chunks[i:i+batch_size]
|
|
logger.info(f"Processing batch {i//batch_size + 1}/{(len(chunks) + batch_size - 1)//batch_size} with {len(batch)} chunks")
|
|
|
|
# Ensure all chunks have a title, even if it's 'Untitled'
|
|
for chunk in batch:
|
|
if chunk.get('title') is None:
|
|
chunk['title'] = 'Untitled'
|
|
|
|
# Process this batch
|
|
batch_results = await self.map_document_chunks(batch, query, detail_level)
|
|
processed_chunks.extend(batch_results)
|
|
|
|
# Add a small delay between batches to avoid rate limiting
|
|
if i + batch_size < len(chunks):
|
|
logger.info("Pausing briefly between batches...")
|
|
await asyncio.sleep(2)
|
|
|
|
logger.info(f"Starting reduce phase to synthesize report from {len(processed_chunks)} processed chunks")
|
|
|
|
# Update progress status for reduce phase
|
|
if self.progress_callback:
|
|
self.progress_callback(0.9, self.total_chunks, "Synthesizing final report...")
|
|
|
|
# Reduce phase: Synthesize processed chunks into a coherent report
|
|
report = await self.reduce_processed_chunks(processed_chunks, query, query_type, detail_level)
|
|
|
|
# Process thinking tags if enabled
|
|
if self.process_thinking_tags and "<thinking>" in report:
|
|
logger.info("Processing thinking tags in report")
|
|
report = self._process_thinking_tags(report)
|
|
|
|
# Final progress update
|
|
if self.progress_callback:
|
|
self.progress_callback(1.0, self.total_chunks, report)
|
|
|
|
return report
|
|
|
|
|
|
# Create a singleton instance for global use
|
|
report_synthesizer = ReportSynthesizer()
|
|
|
|
def get_report_synthesizer(model_name: Optional[str] = None) -> ReportSynthesizer:
|
|
"""
|
|
Get the global report synthesizer instance or create a new one with a specific model.
|
|
|
|
Args:
|
|
model_name: Optional model name to use instead of the default
|
|
|
|
Returns:
|
|
ReportSynthesizer instance
|
|
"""
|
|
global report_synthesizer
|
|
|
|
if model_name and model_name != report_synthesizer.model_name:
|
|
report_synthesizer = ReportSynthesizer(model_name)
|
|
|
|
return report_synthesizer
|
|
|
|
async def test_report_synthesizer():
|
|
"""Test the report synthesizer with sample document chunks."""
|
|
# Sample document chunks
|
|
chunks = [
|
|
{
|
|
"title": "Introduction to Python",
|
|
"url": "https://docs.python.org/3/tutorial/index.html",
|
|
"content": "Python is an easy to learn, powerful programming language. It has efficient high-level data structures and a simple but effective approach to object-oriented programming. Python's elegant syntax and dynamic typing, together with its interpreted nature, make it an ideal language for scripting and rapid application development in many areas on most platforms."
|
|
},
|
|
{
|
|
"title": "Python Features",
|
|
"url": "https://www.python.org/about/",
|
|
"content": "Python is a programming language that lets you work quickly and integrate systems more effectively. Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together."
|
|
}
|
|
]
|
|
|
|
# Initialize the report synthesizer
|
|
synthesizer = get_report_synthesizer()
|
|
|
|
# Test query
|
|
query = "What are the key features of Python programming language?"
|
|
|
|
# Map phase
|
|
processed_chunks = await synthesizer.map_document_chunks(chunks, query, detail_level="detailed")
|
|
|
|
# Print processed chunks
|
|
print("Processed chunks:")
|
|
for i, chunk in enumerate(processed_chunks):
|
|
print(f"Chunk {i+1}: {chunk.get('title')}")
|
|
print(f"Extracted information: {chunk.get('extracted_info')}")
|
|
print()
|
|
|
|
# Reduce phase
|
|
report = await synthesizer.reduce_processed_chunks(processed_chunks, query, detail_level="detailed")
|
|
|
|
# Print report
|
|
print("Generated Report:")
|
|
print(report)
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(test_report_synthesizer())
|