493 lines
23 KiB
Python
493 lines
23 KiB
Python
"""
|
|
Report synthesis module for the intelligent research system.
|
|
|
|
This module provides functionality to synthesize reports from document chunks
|
|
using LLMs with a map-reduce approach.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import asyncio
|
|
import logging
|
|
from typing import Dict, List, Any, Optional, Tuple, Union
|
|
|
|
import litellm
|
|
from litellm import completion
|
|
|
|
from config.config import get_config
|
|
from report.report_detail_levels import get_report_detail_level_manager, DetailLevel
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ReportSynthesizer:
|
|
"""
|
|
Report synthesizer for the intelligent research system.
|
|
|
|
This class provides methods to synthesize reports from document chunks
|
|
using LLMs with a map-reduce approach.
|
|
"""
|
|
|
|
def __init__(self, model_name: Optional[str] = None):
|
|
"""
|
|
Initialize the report synthesizer.
|
|
|
|
Args:
|
|
model_name: Name of the LLM model to use. If None, uses the default model
|
|
from configuration.
|
|
"""
|
|
self.config = get_config()
|
|
|
|
# Use specified model or default from config for report synthesis
|
|
self.model_name = model_name or self.config.config_data.get('report_synthesis', {}).get('model', 'llama-3.3-70b-versatile')
|
|
|
|
# Get model-specific configuration
|
|
self.model_config = self.config.get_model_config(self.model_name)
|
|
|
|
# Set up LiteLLM with the appropriate provider
|
|
self._setup_provider()
|
|
|
|
# Flag to process <thinking> tags in model output
|
|
self.process_thinking_tags = False
|
|
|
|
def _setup_provider(self) -> None:
|
|
"""Set up the LLM provider based on the model configuration."""
|
|
provider = self.model_config.get('provider', 'groq')
|
|
|
|
try:
|
|
# Get API key for the provider
|
|
api_key = self.config.get_api_key(provider)
|
|
|
|
# Set environment variable for the provider
|
|
if provider.lower() == 'google':
|
|
os.environ["GEMINI_API_KEY"] = api_key
|
|
else:
|
|
os.environ[f"{provider.upper()}_API_KEY"] = api_key
|
|
|
|
logger.info(f"Report synthesizer initialized with model: {self.model_name} (provider: {provider})")
|
|
except ValueError as e:
|
|
logger.error(f"Error setting up LLM provider: {e}")
|
|
|
|
def _get_completion_params(self) -> Dict[str, Any]:
|
|
"""
|
|
Get parameters for LLM completion based on model configuration.
|
|
|
|
Returns:
|
|
Dictionary of parameters for LiteLLM completion
|
|
"""
|
|
params = {
|
|
'temperature': self.model_config.get('temperature', 0.3), # Lower temperature for factual reporting
|
|
'max_tokens': self.model_config.get('max_tokens', 4000), # Longer output for comprehensive reports
|
|
'top_p': self.model_config.get('top_p', 0.9)
|
|
}
|
|
|
|
# Handle different provider configurations
|
|
provider = self.model_config.get('provider', 'groq')
|
|
|
|
if provider == 'groq':
|
|
# For Groq provider
|
|
params['model'] = f"groq/{self.model_name}"
|
|
elif provider == 'openrouter':
|
|
# For OpenRouter provider
|
|
params['model'] = self.model_config.get('model_name', self.model_name)
|
|
params['api_base'] = self.model_config.get('endpoint')
|
|
|
|
# Set HTTP headers for OpenRouter if needed
|
|
params['headers'] = {
|
|
'HTTP-Referer': 'https://sim-search.app', # Replace with your actual app URL
|
|
'X-Title': 'Intelligent Research System' # Replace with your actual app name
|
|
}
|
|
elif provider == 'google':
|
|
# Special handling for Google Gemini models
|
|
params['model'] = f"gemini/{self.model_config.get('model_name', self.model_name)}"
|
|
# Google Gemini uses a different API base
|
|
params['api_base'] = self.model_config.get('endpoint', 'https://generativelanguage.googleapis.com/v1')
|
|
else:
|
|
# Standard provider (OpenAI, Anthropic, etc.)
|
|
params['model'] = self.model_name
|
|
|
|
return params
|
|
|
|
async def generate_completion(self, messages: List[Dict[str, str]], stream: bool = False) -> Union[str, Any]:
|
|
"""
|
|
Generate a completion using the configured LLM.
|
|
|
|
Args:
|
|
messages: List of message dictionaries with 'role' and 'content' keys
|
|
stream: Whether to stream the response
|
|
|
|
Returns:
|
|
If stream is False, returns the completion text as a string
|
|
If stream is True, returns the completion response object for streaming
|
|
"""
|
|
try:
|
|
params = self._get_completion_params()
|
|
params['messages'] = messages
|
|
params['stream'] = stream
|
|
|
|
logger.info(f"Generating completion with model: {params.get('model')}")
|
|
logger.info(f"Provider: {self.model_config.get('provider')}")
|
|
|
|
response = completion(**params)
|
|
|
|
if stream:
|
|
return response
|
|
else:
|
|
content = response.choices[0].message.content
|
|
|
|
# Process <thinking> tags if enabled
|
|
if self.process_thinking_tags:
|
|
content = self._process_thinking_tags(content)
|
|
|
|
return content
|
|
except Exception as e:
|
|
logger.error(f"Error generating completion: {e}")
|
|
logger.error(f"Model params: {params}")
|
|
|
|
# More detailed error for debugging
|
|
if hasattr(e, '__dict__'):
|
|
for key, value in e.__dict__.items():
|
|
logger.error(f"Error detail - {key}: {value}")
|
|
|
|
return f"Error: {str(e)}"
|
|
|
|
def _process_thinking_tags(self, content: str) -> str:
|
|
"""
|
|
Process and remove <thinking> tags from model output.
|
|
|
|
Some models like deepseek-r1-distill use <thinking> tags for their internal reasoning.
|
|
This method removes these tags and their content to produce a clean output.
|
|
|
|
Args:
|
|
content: The raw content from the model
|
|
|
|
Returns:
|
|
Processed content with thinking tags removed
|
|
"""
|
|
import re
|
|
|
|
# Remove <thinking>...</thinking> blocks
|
|
clean_content = re.sub(r'<thinking>.*?</thinking>', '', content, flags=re.DOTALL)
|
|
|
|
# Clean up any remaining tags
|
|
clean_content = re.sub(r'</?thinking>', '', clean_content)
|
|
|
|
# Remove extra newlines that might have been created
|
|
clean_content = re.sub(r'\n{3,}', '\n\n', clean_content)
|
|
|
|
return clean_content.strip()
|
|
|
|
async def map_document_chunks(self, chunks: List[Dict[str, Any]], query: str, detail_level: str = "standard") -> List[Dict[str, Any]]:
|
|
"""
|
|
Map phase: Process individual document chunks to extract key information.
|
|
|
|
Args:
|
|
chunks: List of document chunks
|
|
query: Original search query
|
|
detail_level: Level of detail for the report (brief, standard, detailed, comprehensive)
|
|
|
|
Returns:
|
|
List of processed chunks with extracted information
|
|
"""
|
|
processed_chunks = []
|
|
|
|
# Get the appropriate extraction prompt based on detail level
|
|
extraction_prompt = self._get_extraction_prompt(detail_level)
|
|
|
|
total_chunks = len(chunks)
|
|
logger.info(f"Starting to process {total_chunks} document chunks")
|
|
|
|
# Determine batch size based on the model - Gemini can handle larger batches
|
|
if "gemini" in self.model_name.lower():
|
|
batch_size = 8 # Larger batch size for Gemini models with 1M token windows
|
|
else:
|
|
batch_size = 3 # Smaller batch size for other models
|
|
|
|
logger.info(f"Using batch size of {batch_size} for model {self.model_name}")
|
|
|
|
for i in range(0, len(chunks), batch_size):
|
|
batch = chunks[i:i+batch_size]
|
|
logger.info(f"Processing batch {i//batch_size + 1}/{(len(chunks) + batch_size - 1)//batch_size} with {len(batch)} chunks")
|
|
|
|
# Process this batch
|
|
batch_results = []
|
|
for chunk in batch:
|
|
chunk_title = chunk.get('title', 'Untitled')
|
|
logger.info(f"Processing chunk {i+1}/{total_chunks}: {chunk_title[:50]}...")
|
|
|
|
# Create a prompt for extracting key information from the chunk
|
|
messages = [
|
|
{"role": "system", "content": extraction_prompt},
|
|
{"role": "user", "content": f"""Query: {query}
|
|
|
|
Document title: {chunk.get('title', 'Untitled')}
|
|
Document URL: {chunk.get('url', 'Unknown')}
|
|
|
|
Document chunk content:
|
|
{chunk.get('content', '')}
|
|
|
|
Extract the most relevant information from this document chunk that addresses the query."""}
|
|
]
|
|
|
|
try:
|
|
# Process the chunk with the LLM
|
|
extracted_info = await self.generate_completion(messages)
|
|
|
|
# Add the extracted information to the chunk
|
|
processed_chunk = chunk.copy()
|
|
processed_chunk['extracted_info'] = extracted_info
|
|
batch_results.append(processed_chunk)
|
|
|
|
logger.info(f"Completed chunk {i+1}/{total_chunks} ({(i+1)/total_chunks*100:.1f}% complete)")
|
|
except Exception as e:
|
|
logger.error(f"Error processing chunk {i+1}/{total_chunks}: {str(e)}")
|
|
# Add a placeholder for the failed chunk to maintain document order
|
|
processed_chunk = chunk.copy()
|
|
processed_chunk['extracted_info'] = f"Error extracting information: {str(e)}"
|
|
batch_results.append(processed_chunk)
|
|
|
|
processed_chunks.extend(batch_results)
|
|
|
|
# Add a small delay between batches to avoid rate limiting
|
|
if i + batch_size < len(chunks):
|
|
logger.info("Pausing briefly between batches...")
|
|
await asyncio.sleep(2)
|
|
|
|
logger.info(f"Completed processing all {total_chunks} chunks")
|
|
return processed_chunks
|
|
|
|
def _get_extraction_prompt(self, detail_level: str) -> str:
|
|
"""
|
|
Get the appropriate extraction prompt based on detail level.
|
|
|
|
Args:
|
|
detail_level: Level of detail for the report (brief, standard, detailed, comprehensive)
|
|
|
|
Returns:
|
|
Extraction prompt as a string
|
|
"""
|
|
if detail_level.lower() in ["brief", "standard"]:
|
|
return """You are an expert research assistant. Extract the most relevant information from this document chunk that addresses the user's query.
|
|
Focus on factual information, key concepts, and important details.
|
|
Include any relevant statistics, definitions, or explanations that would be valuable for a report.
|
|
Format your response as a concise summary with bullet points for key facts."""
|
|
elif detail_level.lower() == "detailed":
|
|
return """You are an expert research analyst with deep domain knowledge. Extract comprehensive information from this document chunk that addresses the user's query.
|
|
Focus on:
|
|
- Detailed factual information and evidence
|
|
- Underlying principles and mechanisms
|
|
- Causal relationships and correlations
|
|
- Contextual factors and historical development
|
|
- Different perspectives or interpretations
|
|
- Quantitative data and qualitative insights
|
|
- Nuances, edge cases, and exceptions
|
|
|
|
Prioritize depth of analysis over breadth. Extract information that provides deeper understanding rather than just basic facts.
|
|
Format your response with clear sections and bullet points for key insights."""
|
|
else: # comprehensive
|
|
return """You are a world-class research analyst with exceptional analytical abilities. Extract the most comprehensive and nuanced information from this document chunk.
|
|
Focus on:
|
|
- Multi-layered analysis of all relevant facts and evidence
|
|
- Complex causal networks and interaction effects
|
|
- Theoretical frameworks and their applications
|
|
- Historical evolution and future trajectories
|
|
- Methodological considerations and limitations
|
|
- Diverse perspectives and their epistemological foundations
|
|
- Statistical data, case studies, and expert opinions
|
|
- Contradictions, paradoxes, and unresolved questions
|
|
|
|
Extract information that provides the deepest possible understanding of the topic as it relates to the query.
|
|
Analyze the reliability and significance of the information.
|
|
Format your response with clearly organized sections and detailed bullet points."""
|
|
|
|
async def reduce_processed_chunks(self, processed_chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory", detail_level: str = "standard") -> str:
|
|
"""
|
|
Reduce phase: Synthesize processed chunks into a coherent report.
|
|
|
|
Args:
|
|
processed_chunks: List of processed chunks with extracted information
|
|
query: Original search query
|
|
query_type: Type of query (factual, exploratory, comparative)
|
|
detail_level: Level of detail for the report (brief, standard, detailed, comprehensive)
|
|
|
|
Returns:
|
|
Synthesized report as a string
|
|
"""
|
|
# Prepare the context with all extracted information
|
|
context = ""
|
|
for i, chunk in enumerate(processed_chunks):
|
|
context += f"Document {i+1}: {chunk.get('title', 'Untitled')}\n"
|
|
context += f"Source: {chunk.get('url', 'Unknown')}\n"
|
|
context += f"Extracted information:\n{chunk.get('extracted_info', '')}\n\n"
|
|
|
|
# Get template modifier based on detail level and query type
|
|
detail_level_manager = get_report_detail_level_manager()
|
|
template = detail_level_manager.get_template_modifier(detail_level, query_type)
|
|
|
|
# Create the prompt for synthesizing the report
|
|
messages = [
|
|
{"role": "system", "content": f"""You are an expert research assistant tasked with creating comprehensive, well-structured reports.
|
|
{template}
|
|
|
|
Format the report in Markdown with clear headings, subheadings, and bullet points where appropriate.
|
|
Make the report readable, engaging, and informative while maintaining academic rigor."""},
|
|
{"role": "user", "content": f"""Query: {query}
|
|
|
|
Information from sources:
|
|
{context}
|
|
|
|
Synthesize this information into a report that addresses the query. Use your own words to create a coherent narrative, but ensure all information is based on the provided sources. Include citations and a references section."""}
|
|
]
|
|
|
|
# Generate the report
|
|
report = await self.generate_completion(messages)
|
|
|
|
return report
|
|
|
|
async def synthesize_report(self, chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory", detail_level: str = "standard") -> str:
|
|
"""
|
|
Synthesize a report from document chunks using the map-reduce approach.
|
|
|
|
Args:
|
|
chunks: List of document chunks
|
|
query: Original search query
|
|
query_type: Type of query (factual, exploratory, comparative)
|
|
detail_level: Level of detail for the report (brief, standard, detailed, comprehensive)
|
|
|
|
Returns:
|
|
Synthesized report as a string
|
|
"""
|
|
if not chunks:
|
|
logger.warning("No document chunks provided for report synthesis.")
|
|
return "No information found for the given query."
|
|
|
|
# Get detail level configuration
|
|
detail_level_manager = get_report_detail_level_manager()
|
|
config = detail_level_manager.get_detail_level_config(detail_level)
|
|
token_budget = config.get("token_budget", 100000)
|
|
|
|
# Determine query type if not specified
|
|
if query_type == "exploratory":
|
|
# Try to infer query type from the query text
|
|
if any(term in query.lower() for term in ["what is", "who is", "when did", "where is", "how does"]):
|
|
query_type = "factual"
|
|
elif any(term in query.lower() for term in ["compare", "difference", "versus", "pros and cons"]):
|
|
query_type = "comparative"
|
|
|
|
logger.info(f"Query type determined as: {query_type}")
|
|
|
|
# Estimate total tokens in chunks
|
|
total_tokens = sum(len(chunk.get('content', '').split()) * 1.3 for chunk in chunks) # Rough estimate
|
|
logger.info(f"Estimated total tokens in {len(chunks)} chunks: {total_tokens}")
|
|
|
|
# If total tokens exceeds 80% of the token budget, reduce the number of chunks
|
|
if total_tokens > token_budget * 0.8:
|
|
max_chunks = int(len(chunks) * (token_budget * 0.8 / total_tokens))
|
|
max_chunks = max(3, max_chunks) # Ensure we have at least 3 chunks
|
|
logger.warning(f"Token count ({total_tokens}) exceeds 80% of budget ({token_budget}). Reducing chunks from {len(chunks)} to {max_chunks}.")
|
|
chunks = chunks[:max_chunks]
|
|
# Recalculate estimated tokens
|
|
total_tokens = sum(len(chunk.get('content', '').split()) * 1.3 for chunk in chunks)
|
|
logger.info(f"Reduced to {len(chunks)} chunks with estimated {total_tokens} tokens")
|
|
|
|
logger.info(f"Starting map phase for {len(chunks)} document chunks with query type '{query_type}' and detail level '{detail_level}'")
|
|
|
|
# Process chunks in batches to avoid hitting payload limits
|
|
# Determine batch size based on the model - Gemini can handle larger batches
|
|
if "gemini" in self.model_name.lower():
|
|
batch_size = 8 # Larger batch size for Gemini models with 1M token windows
|
|
else:
|
|
batch_size = 3 # Smaller batch size for other models
|
|
|
|
logger.info(f"Using batch size of {batch_size} for model {self.model_name}")
|
|
processed_chunks = []
|
|
|
|
for i in range(0, len(chunks), batch_size):
|
|
batch = chunks[i:i+batch_size]
|
|
logger.info(f"Processing batch {i//batch_size + 1}/{(len(chunks) + batch_size - 1)//batch_size} with {len(batch)} chunks")
|
|
|
|
# Process this batch
|
|
batch_results = await self.map_document_chunks(batch, query, detail_level)
|
|
processed_chunks.extend(batch_results)
|
|
|
|
# Add a small delay between batches to avoid rate limiting
|
|
if i + batch_size < len(chunks):
|
|
logger.info("Pausing briefly between batches...")
|
|
await asyncio.sleep(2)
|
|
|
|
logger.info(f"Starting reduce phase to synthesize report from {len(processed_chunks)} processed chunks")
|
|
|
|
# Reduce phase: Synthesize processed chunks into a coherent report
|
|
report = await self.reduce_processed_chunks(processed_chunks, query, query_type, detail_level)
|
|
|
|
# Process thinking tags if enabled
|
|
if self.process_thinking_tags and "<thinking>" in report:
|
|
logger.info("Processing thinking tags in report")
|
|
report = self._process_thinking_tags(report)
|
|
|
|
return report
|
|
|
|
|
|
# Create a singleton instance for global use
|
|
report_synthesizer = ReportSynthesizer()
|
|
|
|
def get_report_synthesizer(model_name: Optional[str] = None) -> ReportSynthesizer:
|
|
"""
|
|
Get the global report synthesizer instance or create a new one with a specific model.
|
|
|
|
Args:
|
|
model_name: Optional model name to use instead of the default
|
|
|
|
Returns:
|
|
ReportSynthesizer instance
|
|
"""
|
|
global report_synthesizer
|
|
|
|
if model_name and model_name != report_synthesizer.model_name:
|
|
report_synthesizer = ReportSynthesizer(model_name)
|
|
|
|
return report_synthesizer
|
|
|
|
async def test_report_synthesizer():
|
|
"""Test the report synthesizer with sample document chunks."""
|
|
# Sample document chunks
|
|
chunks = [
|
|
{
|
|
"title": "Introduction to Python",
|
|
"url": "https://docs.python.org/3/tutorial/index.html",
|
|
"content": "Python is an easy to learn, powerful programming language. It has efficient high-level data structures and a simple but effective approach to object-oriented programming. Python's elegant syntax and dynamic typing, together with its interpreted nature, make it an ideal language for scripting and rapid application development in many areas on most platforms."
|
|
},
|
|
{
|
|
"title": "Python Features",
|
|
"url": "https://www.python.org/about/",
|
|
"content": "Python is a programming language that lets you work quickly and integrate systems more effectively. Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together."
|
|
}
|
|
]
|
|
|
|
# Initialize the report synthesizer
|
|
synthesizer = get_report_synthesizer()
|
|
|
|
# Test query
|
|
query = "What are the key features of Python programming language?"
|
|
|
|
# Map phase
|
|
processed_chunks = await synthesizer.map_document_chunks(chunks, query, detail_level="detailed")
|
|
|
|
# Print processed chunks
|
|
print("Processed chunks:")
|
|
for i, chunk in enumerate(processed_chunks):
|
|
print(f"Chunk {i+1}: {chunk.get('title')}")
|
|
print(f"Extracted information: {chunk.get('extracted_info')}")
|
|
print()
|
|
|
|
# Reduce phase
|
|
report = await synthesizer.reduce_processed_chunks(processed_chunks, query, detail_level="detailed")
|
|
|
|
# Print report
|
|
print("Generated Report:")
|
|
print(report)
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(test_report_synthesizer())
|