ira/report/report_synthesis.py

756 lines
34 KiB
Python

"""
Report synthesis module for the intelligent research system.
This module provides functionality to synthesize reports from document chunks
using LLMs with a map-reduce approach.
"""
import os
import json
import asyncio
import logging
from typing import Dict, List, Any, Optional, Tuple, Union
import litellm
from litellm import completion
from config.config import get_config
from report.report_detail_levels import get_report_detail_level_manager, DetailLevel
from report.report_templates import QueryType, DetailLevel as TemplateDetailLevel, ReportTemplateManager, ReportTemplate
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Note: ReportTemplateManager and ReportTemplate are now imported from report_templates.py
class ReportSynthesizer:
"""
Report synthesizer for the intelligent research system.
This class provides methods to synthesize reports from document chunks
using LLMs with a map-reduce approach.
"""
def __init__(self, model_name: Optional[str] = None):
"""
Initialize the report synthesizer.
Args:
model_name: Name of the LLM model to use. If None, uses the default model
from configuration.
"""
self.config = get_config()
# Use specified model or default from config for report synthesis
self.model_name = model_name or self.config.config_data.get('report_synthesis', {}).get('model', 'llama-3.3-70b-versatile')
# Get model-specific configuration
self.model_config = self.config.get_model_config(self.model_name)
# Set up LiteLLM with the appropriate provider
self._setup_provider()
# Initialize template manager
self.template_manager = ReportTemplateManager()
self.template_manager.initialize_default_templates()
# Flag to process <thinking> tags in model output
self.process_thinking_tags = False
# Progress tracking
self.progress_callback = None
self.total_chunks = 0
self.processed_chunk_count = 0
self.current_chunk_title = ""
self.current_stage = "preparation" # Can be: preparation, processing, finalizing
def set_progress_callback(self, callback):
"""
Set a callback function to report progress.
Args:
callback: Function that takes (current_progress, total, current_report) as arguments
"""
self.progress_callback = callback
def _report_progress(self, current_report=None):
"""Report progress through the callback if set."""
if self.progress_callback:
# Calculate progress as a fraction between 0 and 1
if self.total_chunks > 0:
progress = min(self.processed_chunk_count / self.total_chunks, 1.0)
else:
progress = 0.0
# Store current report text for progressive reports
if current_report:
self.current_report_text = current_report
# Call the progress callback with detailed information
self.progress_callback(
progress,
self.total_chunks,
current_report or getattr(self, 'current_report_text', None)
)
def _setup_provider(self) -> None:
"""Set up the LLM provider based on the model configuration."""
provider = self.model_config.get('provider', 'groq')
try:
# Get API key for the provider
api_key = self.config.get_api_key(provider)
# Set environment variable for the provider
if provider.lower() == 'google' or provider.lower() == 'gemini':
os.environ["GEMINI_API_KEY"] = api_key
elif provider.lower() == 'vertex_ai':
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = api_key
else:
os.environ[f"{provider.upper()}_API_KEY"] = api_key
logger.info(f"Report synthesizer initialized with model: {self.model_name} (provider: {provider})")
except ValueError as e:
logger.error(f"Error setting up LLM provider: {e}")
def _get_completion_params(self) -> Dict[str, Any]:
"""
Get parameters for LLM completion based on model configuration.
Returns:
Dictionary of parameters for LiteLLM completion
"""
params = {
'temperature': self.model_config.get('temperature', 0.3), # Lower temperature for factual reporting
'max_tokens': self.model_config.get('max_tokens', 4000), # Longer output for comprehensive reports
'top_p': self.model_config.get('top_p', 0.9)
}
# Handle different provider configurations
provider = self.model_config.get('provider', 'groq')
if provider == 'groq':
# For Groq provider
params['model'] = f"groq/{self.model_name}"
elif provider == 'openrouter':
# For OpenRouter provider
params['model'] = self.model_config.get('model_name', self.model_name)
# Get the endpoint from the model config and ensure it has the correct format
endpoint = self.model_config.get('endpoint', 'https://openrouter.ai/api')
# Ensure the endpoint ends with /v1 for OpenRouter API v1
if not endpoint.endswith('/v1'):
if endpoint.endswith('/'):
endpoint = f"{endpoint}v1"
else:
endpoint = f"{endpoint}/v1"
params['api_base'] = endpoint
# Set custom provider for OpenRouter
params['custom_llm_provider'] = 'openrouter'
# Set HTTP headers for OpenRouter if needed
params['headers'] = {
'HTTP-Referer': 'https://sim-search.app', # Replace with your actual app URL
'X-Title': 'Intelligent Research System' # Replace with your actual app name
}
elif provider == 'google' or provider == 'gemini':
# Special handling for Google Gemini models
# Format: gemini/model_name (e.g., gemini/gemini-2.0-flash)
params['model'] = f"gemini/{self.model_config.get('model_name', self.model_name)}"
# Add additional parameters for Gemini
params['custom_llm_provider'] = 'gemini'
elif provider == 'vertex_ai':
# Special handling for Vertex AI Gemini models
params['model'] = f"vertex_ai/{self.model_config.get('model_name', self.model_name)}"
# Add Vertex AI specific parameters
params['vertex_project'] = self.model_config.get('vertex_project', 'sim-search')
params['vertex_location'] = self.model_config.get('vertex_location', 'us-central1')
# Set custom provider
params['custom_llm_provider'] = 'vertex_ai'
elif provider == 'mistral' or 'mistralai' in self.model_name.lower():
# Special handling for Mistral models
# Format: mistral/model_name (e.g., mistral/mistral-medium)
model_name = self.model_config.get('model_name', self.model_name)
params['model'] = f"mistral/{model_name}"
# Add Mistral-specific parameters
params['custom_llm_provider'] = 'mistral'
else:
# Standard provider (OpenAI, Anthropic, etc.)
params['model'] = self.model_name
return params
async def generate_completion(self, messages: List[Dict[str, str]], stream: bool = False) -> Union[str, Any]:
"""
Generate a completion using the configured LLM.
Args:
messages: List of message dictionaries with 'role' and 'content' keys
stream: Whether to stream the response
Returns:
If stream is False, returns the completion text as a string
If stream is True, returns the completion response object for streaming
"""
# Get provider from model config
provider = self.model_config.get('provider', 'groq').lower()
# Special handling for Gemini models - they use 'user' and 'model' roles
if provider == 'gemini':
formatted_messages = []
for msg in messages:
role = msg['role']
# Map 'system' to 'user' for the first message
if role == 'system' and not formatted_messages:
formatted_messages.append({
'role': 'user',
'content': msg['content']
})
# Map 'assistant' to 'model'
elif role == 'assistant':
formatted_messages.append({
'role': 'model',
'content': msg['content']
})
# Keep 'user' as is
else:
formatted_messages.append(msg)
else:
formatted_messages = messages
# Get completion parameters
params = self._get_completion_params()
try:
# Generate completion
if stream:
response = litellm.completion(
messages=formatted_messages,
stream=True,
**params
)
return response
else:
response = litellm.completion(
messages=formatted_messages,
**params
)
# Extract content from response
content = response.choices[0].message.content
# Process thinking tags if enabled
if self.process_thinking_tags:
content = self._process_thinking_tags(content)
return content
except Exception as e:
error_msg = f"Error generating completion: {str(e)}"
logger.error(error_msg)
# Return error message in a user-friendly format
return f"I encountered an error while processing your request: {str(e)}"
def _process_thinking_tags(self, content: str) -> str:
"""
Process and remove <thinking> tags from model output.
Some models like deepseek-r1-distill use <thinking> tags for their internal reasoning.
This method removes these tags and their content to produce a clean output.
Args:
content: The raw content from the model
Returns:
Processed content with thinking tags removed
"""
import re
# Remove <thinking>...</thinking> blocks
clean_content = re.sub(r'<thinking>.*?</thinking>', '', content, flags=re.DOTALL)
# Clean up any remaining tags
clean_content = re.sub(r'</?thinking>', '', clean_content)
# Remove extra newlines that might have been created
clean_content = re.sub(r'\n{3,}', '\n\n', clean_content)
return clean_content.strip()
async def map_document_chunks(self, chunks: List[Dict[str, Any]], query: str, detail_level: str = "standard", query_type: str = "exploratory") -> List[Dict[str, Any]]:
"""
Map phase: Process individual document chunks to extract key information.
Args:
chunks: List of document chunks
query: Original search query
detail_level: Level of detail for the report (brief, standard, detailed, comprehensive)
query_type: Type of query (factual, exploratory, comparative)
Returns:
List of processed chunks with extracted information
"""
processed_chunks = []
# Get the appropriate extraction prompt based on detail level and query type
extraction_prompt = self._get_extraction_prompt(detail_level, query_type)
total_chunks = len(chunks)
logger.info(f"Starting to process {total_chunks} document chunks")
# Update progress tracking state
self.total_chunks = total_chunks
self.processed_chunk_count = 0
self.current_stage = "processing"
self._report_progress()
# Ensure all chunks have a title, even if it's 'Untitled'
for chunk in chunks:
if chunk.get('title') is None or chunk.get('title') == '':
chunk['title'] = 'Untitled'
# Process each chunk individually to provide detailed progress updates
for i, chunk in enumerate(chunks):
chunk_title = chunk.get('title', 'Untitled')
chunk_index = i + 1
# Update current chunk title for progress reporting
self.current_chunk_title = chunk_title[:50] if chunk_title else 'Untitled'
logger.info(f"Processing chunk {chunk_index}/{total_chunks}: {self.current_chunk_title}...")
# Create a prompt for extracting key information from the chunk
messages = [
{"role": "system", "content": extraction_prompt},
{"role": "user", "content": f"""Query: {query}
Document title: {chunk.get('title', 'Untitled')}
Document URL: {chunk.get('url', 'Unknown')}
Document chunk content:
{chunk.get('content', '')}
Extract the most relevant information from this document chunk that addresses the query."""}
]
try:
# Process the chunk with the LLM
extracted_info = await self.generate_completion(messages)
# Add the extracted information to the chunk
processed_chunk = chunk.copy()
processed_chunk['extracted_info'] = extracted_info
processed_chunks.append(processed_chunk)
# Update progress
self.processed_chunk_count += 1
self._report_progress()
logger.info(f"Completed chunk {chunk_index}/{total_chunks} ({chunk_index/total_chunks*100:.1f}% complete)")
except Exception as e:
logger.error(f"Error processing chunk {chunk_index}/{total_chunks}: {str(e)}")
# Add a placeholder for the failed chunk to maintain document order
processed_chunk = chunk.copy()
processed_chunk['extracted_info'] = f"Error extracting information: {str(e)}"
processed_chunks.append(processed_chunk)
# Update progress even for failed chunks
self.processed_chunk_count += 1
self._report_progress()
# Add a small delay between chunks to avoid rate limiting
if i < len(chunks) - 1:
await asyncio.sleep(0.5)
logger.info(f"Completed processing all {total_chunks} chunks")
return processed_chunks
def _get_extraction_prompt(self, detail_level: str, query_type: str = "exploratory") -> str:
"""
Get the appropriate extraction prompt based on detail level and query type.
Args:
detail_level: Level of detail for the report (brief, standard, detailed, comprehensive)
query_type: Type of query (factual, exploratory, comparative)
Returns:
Extraction prompt as a string
"""
# Base prompts by detail level
if detail_level.lower() in ["brief", "standard"]:
base_prompt = """You are an expert research assistant. Extract the most relevant information from this document chunk that addresses the user's query.
Focus on factual information, key concepts, and important details.
Include any relevant statistics, definitions, or explanations that would be valuable for a report.
Format your response as a concise summary with bullet points for key facts."""
elif detail_level.lower() == "detailed":
base_prompt = """You are an expert research analyst with deep domain knowledge. Extract comprehensive information from this document chunk that addresses the user's query.
Focus on:
- Detailed factual information and evidence
- Underlying principles and mechanisms
- Causal relationships and correlations
- Contextual factors and historical development
- Different perspectives or interpretations
- Quantitative data and qualitative insights
- Nuances, edge cases, and exceptions
Prioritize depth of analysis over breadth. Extract information that provides deeper understanding rather than just basic facts.
Format your response with clear sections and bullet points for key insights."""
else: # comprehensive
base_prompt = """You are a world-class research analyst with exceptional analytical abilities. Extract the most comprehensive and nuanced information from this document chunk.
Focus on:
- Multi-layered analysis of all relevant facts and evidence
- Complex causal networks and interaction effects
- Theoretical frameworks and their applications
- Historical evolution and future trajectories
- Methodological considerations and limitations
- Diverse perspectives and their epistemological foundations
- Statistical data, case studies, and expert opinions
- Contradictions, paradoxes, and unresolved questions
Extract information that provides the deepest possible understanding of the topic as it relates to the query.
Analyze the reliability and significance of the information.
Format your response with clearly organized sections and detailed bullet points."""
# Add specific instructions for comparative queries
# Handle the case where query_type is None
if query_type is not None and query_type.lower() == "comparative":
comparative_instructions = """
IMPORTANT: This is a COMPARATIVE query. The user is asking to compare two or more things.
When extracting information, focus specifically on:
1. Characteristics, features, or attributes of EACH item being compared
2. Direct comparisons between the items mentioned in the query
3. Advantages and disadvantages of each item
4. Similarities and differences between the items
5. Contexts where one item might be preferred over others
Make sure to clearly identify which information relates to which item being compared.
Organize your extraction to facilitate easy comparison between the items.
"""
return base_prompt + comparative_instructions
return base_prompt
def _get_template_from_strings(self, query_type_str: Optional[str], detail_level_str: str) -> Optional[ReportTemplate]:
"""
Helper method to get a template using string values for query_type and detail_level.
Args:
query_type_str: String value of query type (factual, exploratory, comparative), or None
detail_level_str: String value of detail level (brief, standard, detailed, comprehensive)
Returns:
ReportTemplate object or None if not found
"""
try:
# Handle None query_type by defaulting to "exploratory"
if query_type_str is None:
query_type_str = "exploratory"
logger.info(f"Query type is None, defaulting to {query_type_str}")
# Convert string values to enum objects
query_type_enum = QueryType(query_type_str)
detail_level_enum = TemplateDetailLevel(detail_level_str)
# Get template using enum objects
template = self.template_manager.get_template(query_type_enum, detail_level_enum)
if template:
logger.info(f"Found template for {query_type_str} {detail_level_str}")
else:
logger.warning(f"No template found for {query_type_str} {detail_level_str}")
return template
except (ValueError, KeyError) as e:
logger.error(f"Error getting template for {query_type_str} {detail_level_str}: {str(e)}")
return None
async def reduce_processed_chunks(self, processed_chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory", detail_level: str = "standard") -> str:
"""
Reduce phase: Synthesize processed chunks into a coherent report.
Args:
processed_chunks: List of processed chunks with extracted information
query: Original search query
query_type: Type of query (factual, exploratory, comparative)
detail_level: Level of detail for the report (brief, standard, detailed, comprehensive)
Returns:
Synthesized report as a string
"""
# Prepare the context with all extracted information
context = ""
for i, chunk in enumerate(processed_chunks):
title = chunk.get('title', 'Untitled')
url = chunk.get('url', 'Unknown')
context += f"Document {i+1}:\n"
context += f"Title: {title}\n"
context += f"URL: {url}\n"
context += f"Source URL: {url}\n" # Duplicate for emphasis
context += f"Extracted information:\n{chunk.get('extracted_info', '')}\n\n"
# Get template modifier based on detail level and query type using helper method
template = self._get_template_from_strings(query_type, detail_level)
if not template:
raise ValueError(f"No template found for {query_type} {detail_level}")
# Add specific instructions for references formatting
reference_instructions = """
When including references, use a consistent format:
[1] Title of the Article/Page. URL
IMPORTANT:
1. DO NOT use generic placeholders like "Document 1" for references
2. ALWAYS include the actual URL from the source documents
3. Each reference MUST include both the title and the URL
4. Make sure all references are complete and properly formatted
5. Number the references sequentially starting from 1
6. Include the URL for EACH reference - this is critical.
"""
# Special handling for Gemini models
if "gemini" in self.model_name.lower():
reference_instructions += """
IMPORTANT: Due to token limitations, ensure the References section is completed properly.
If you feel you might run out of tokens, start the References section earlier and make it more concise.
Never leave the References section incomplete or cut off mid-reference.
"""
# Create the prompt for synthesizing the report
messages = [
{"role": "system", "content": f"""You are an expert research assistant tasked with creating comprehensive, well-structured reports.
{template.template}
Format the report in Markdown with clear headings, subheadings, and bullet points where appropriate.
Make the report readable, engaging, and informative while maintaining academic rigor.
{reference_instructions}"""},
{"role": "user", "content": f"""Query: {query}
Information from sources:
{context}
Synthesize this information into a report that addresses the query. Use your own words to create a coherent narrative, but ensure all information is based on the provided sources. Include citations and a references section."""}
]
# Generate the report
report = await self.generate_completion(messages)
# Check if the report might be cut off at the end
if report.strip().endswith('[') or report.strip().endswith(']') or report.strip().endswith('...'):
logger.warning("Report appears to be cut off at the end. Attempting to fix references section.")
# Try to fix the references section by generating it separately
try:
# Extract what we have so far without the incomplete references
if "References" in report:
report_without_refs = report.split("References")[0].strip()
else:
report_without_refs = report
# Generate just the references section
ref_messages = [
{"role": "system", "content": """You are an expert at formatting reference lists. Create a properly formatted References section for the following documents.
IMPORTANT:
1. Use the actual title and URL from each document
2. DO NOT use generic placeholders like "Document 1"
3. Format each reference as: [1] Title of the Article/Page. URL
4. Each reference MUST include both the title and the URL
5. Make sure all references are complete and properly formatted
6. Number the references sequentially starting from 1"""},
{"role": "user", "content": f"""Here are the documents used in the report:
{context}
Create a complete, properly formatted References section in Markdown format.
Remember to include the URL for EACH reference - this is critical."""}
]
references = await self.generate_completion(ref_messages)
# Combine the report with the fixed references
report = f"{report_without_refs}\n\n## References\n\n{references}"
except Exception as e:
logger.error(f"Error fixing references section: {str(e)}")
return report
async def synthesize_report(self, chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory", detail_level: str = "standard") -> str:
"""
Synthesize a report from document chunks using the map-reduce approach.
Args:
chunks: List of document chunks
query: Original search query
query_type: Type of query (factual, exploratory, comparative)
detail_level: Level of detail for the report (brief, standard, detailed, comprehensive)
Returns:
Synthesized report as a string
"""
if not chunks:
logger.warning("No document chunks provided for report synthesis.")
return "No information found for the given query."
# Reset progress tracking
self.total_chunks = len(chunks)
self.processed_chunk_count = 0
self.current_chunk_title = ""
self.current_stage = "preparation"
# Report initial progress
self._report_progress()
# Verify that a template exists for the given query type and detail level
template = self._get_template_from_strings(query_type, detail_level)
if not template:
logger.warning(f"No template found for {query_type} {detail_level}, falling back to standard template")
# Fall back to standard detail level if the requested one doesn't exist
detail_level = "standard"
# Get detail level configuration
detail_level_manager = get_report_detail_level_manager()
config = detail_level_manager.get_detail_level_config(detail_level)
token_budget = config.get("token_budget", 100000)
# Determine query type based on the query text
# Always try to infer the query type, regardless of what was passed in
if any(term in query.lower() for term in ["what is", "who is", "when did", "where is", "how does"]):
query_type = "factual"
elif any(term in query.lower() for term in ["compare", "difference", "versus", "vs", "pros and cons"]):
query_type = "comparative"
else:
# Default to exploratory if no specific pattern is detected
query_type = "exploratory"
logger.info(f"Query type determined as: {query_type}")
# Estimate total tokens in chunks
total_tokens = sum(len(chunk.get('content', '').split()) * 1.3 for chunk in chunks) # Rough estimate
logger.info(f"Estimated total tokens in {len(chunks)} chunks: {total_tokens}")
# If total tokens exceeds 80% of the token budget, reduce the number of chunks
if total_tokens > token_budget * 0.8:
max_chunks = int(len(chunks) * (token_budget * 0.8 / total_tokens))
max_chunks = max(3, max_chunks) # Ensure we have at least 3 chunks
logger.warning(f"Token count ({total_tokens}) exceeds 80% of budget ({token_budget}). Reducing chunks from {len(chunks)} to {max_chunks}.")
chunks = chunks[:max_chunks]
# Recalculate estimated tokens
total_tokens = sum(len(chunk.get('content', '').split()) * 1.3 for chunk in chunks)
logger.info(f"Reduced to {len(chunks)} chunks with estimated {total_tokens} tokens")
# Update total chunks for progress tracking
self.total_chunks = len(chunks)
logger.info(f"Starting map phase for {len(chunks)} document chunks with query type '{query_type}' and detail level '{detail_level}'")
# Set stage to processing for progress tracking
self.current_stage = "processing"
self._report_progress()
# Map phase: Process each document chunk to extract key information
logger.info("Starting map phase: Processing document chunks...")
processed_chunks = await self.map_document_chunks(chunks, query, detail_level, query_type)
# Update stage to finalizing
self.current_stage = "finalizing"
self._report_progress()
# Reduce phase: Synthesize the processed chunks into a coherent report
logger.info("Starting reduce phase: Synthesizing report...")
# Report progress before starting the reduce phase
if self.progress_callback:
self.progress_callback(0.9, self.total_chunks, "Synthesizing final report...")
# Synthesize the report
report = await self.reduce_processed_chunks(processed_chunks, query, query_type, detail_level)
# Set progress to 100% complete
self.processed_chunk_count = self.total_chunks
self._report_progress(report)
# Process thinking tags if enabled
if self.process_thinking_tags and "<thinking>" in report:
logger.info("Processing thinking tags in report")
report = self._process_thinking_tags(report)
# Final progress update
if self.progress_callback:
self.progress_callback(1.0, self.total_chunks, report)
return report
# Create a singleton instance for global use
report_synthesizer = ReportSynthesizer()
def get_report_synthesizer(model_name: Optional[str] = None) -> ReportSynthesizer:
"""
Get the global report synthesizer instance or create a new one with a specific model.
Args:
model_name: Optional model name to use instead of the default
Returns:
ReportSynthesizer instance
"""
global report_synthesizer
if model_name and model_name != report_synthesizer.model_name:
report_synthesizer = ReportSynthesizer(model_name)
return report_synthesizer
async def test_report_synthesizer():
"""Test the report synthesizer with sample document chunks."""
# Sample document chunks
chunks = [
{
"title": "Introduction to Python",
"url": "https://docs.python.org/3/tutorial/index.html",
"content": "Python is an easy to learn, powerful programming language. It has efficient high-level data structures and a simple but effective approach to object-oriented programming. Python's elegant syntax and dynamic typing, together with its interpreted nature, make it an ideal language for scripting and rapid application development in many areas on most platforms."
},
{
"title": "Python Features",
"url": "https://www.python.org/about/",
"content": "Python is a programming language that lets you work quickly and integrate systems more effectively. Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together."
}
]
# Initialize the report synthesizer
synthesizer = get_report_synthesizer()
# Test query
query = "What are the key features of Python programming language?"
# Map phase
processed_chunks = await synthesizer.map_document_chunks(chunks, query, detail_level="detailed")
# Print processed chunks
print("Processed chunks:")
for i, chunk in enumerate(processed_chunks):
print(f"Chunk {i+1}: {chunk.get('title')}")
print(f"Extracted information: {chunk.get('extracted_info')}")
print()
# Reduce phase
report = await synthesizer.reduce_processed_chunks(processed_chunks, query, detail_level="detailed")
# Print report
print("Generated Report:")
print(report)
if __name__ == "__main__":
asyncio.run(test_report_synthesizer())