""" Report synthesis module for the intelligent research system. This module provides functionality to synthesize reports from document chunks using LLMs with a map-reduce approach. """ import os import json import asyncio import logging from typing import Dict, List, Any, Optional, Tuple, Union import litellm from litellm import completion from config.config import get_config # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class ReportSynthesizer: """ Report synthesizer for the intelligent research system. This class provides methods to synthesize reports from document chunks using LLMs with a map-reduce approach. """ def __init__(self, model_name: Optional[str] = None): """ Initialize the report synthesizer. Args: model_name: Name of the LLM model to use. If None, uses the default model from configuration. """ self.config = get_config() # Use specified model or default from config for report synthesis self.model_name = model_name or self.config.config_data.get('report_synthesis', {}).get('model', 'llama-3.3-70b-versatile') # Get model-specific configuration self.model_config = self.config.get_model_config(self.model_name) # Set up LiteLLM with the appropriate provider self._setup_provider() def _setup_provider(self) -> None: """Set up the LLM provider based on the model configuration.""" provider = self.model_config.get('provider', 'groq') try: # Get API key for the provider api_key = self.config.get_api_key(provider) # Set environment variable for the provider os.environ[f"{provider.upper()}_API_KEY"] = api_key logger.info(f"Report synthesizer initialized with model: {self.model_name} (provider: {provider})") except ValueError as e: logger.error(f"Error setting up LLM provider: {e}") def _get_completion_params(self) -> Dict[str, Any]: """ Get parameters for LLM completion based on model configuration. Returns: Dictionary of parameters for LiteLLM completion """ params = { 'temperature': self.model_config.get('temperature', 0.3), # Lower temperature for factual reporting 'max_tokens': self.model_config.get('max_tokens', 4000), # Longer output for comprehensive reports 'top_p': self.model_config.get('top_p', 0.9) } # Handle different provider configurations provider = self.model_config.get('provider', 'groq') if provider == 'groq': # For Groq provider params['model'] = f"groq/{self.model_name}" elif provider == 'openrouter': # For OpenRouter provider params['model'] = self.model_config.get('model_name', self.model_name) params['api_base'] = self.model_config.get('endpoint') # Set HTTP headers for OpenRouter if needed params['headers'] = { 'HTTP-Referer': 'https://sim-search.app', # Replace with your actual app URL 'X-Title': 'Intelligent Research System' # Replace with your actual app name } else: # Standard provider (OpenAI, Anthropic, etc.) params['model'] = self.model_name return params async def generate_completion(self, messages: List[Dict[str, str]], stream: bool = False) -> Union[str, Any]: """ Generate a completion using the configured LLM. Args: messages: List of message dictionaries with 'role' and 'content' keys stream: Whether to stream the response Returns: If stream is False, returns the completion text as a string If stream is True, returns the completion response object for streaming """ try: params = self._get_completion_params() params['messages'] = messages params['stream'] = stream response = completion(**params) if stream: return response else: return response.choices[0].message.content except Exception as e: logger.error(f"Error generating completion: {e}") return f"Error: {str(e)}" async def map_document_chunks(self, chunks: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]: """ Map phase: Process individual document chunks to extract key information. Args: chunks: List of document chunks query: Original search query Returns: List of processed chunks with extracted information """ processed_chunks = [] for chunk in chunks: # Create a prompt for extracting key information from the chunk messages = [ {"role": "system", "content": """You are an expert research assistant. Extract the most relevant information from this document chunk that addresses the user's query. Focus on factual information, key concepts, and important details. Include any relevant statistics, definitions, or explanations that would be valuable for a comprehensive report. Format your response as a concise summary with bullet points for key facts."""}, {"role": "user", "content": f"""Query: {query} Document title: {chunk.get('title', 'Untitled')} Document URL: {chunk.get('url', 'Unknown')} Document chunk content: {chunk.get('content', '')} Extract the most relevant information from this document chunk that addresses the query."""} ] # Process the chunk with the LLM extracted_info = await self.generate_completion(messages) # Add the extracted information to the chunk processed_chunk = chunk.copy() processed_chunk['extracted_info'] = extracted_info processed_chunks.append(processed_chunk) return processed_chunks async def reduce_processed_chunks(self, processed_chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory") -> str: """ Reduce phase: Synthesize processed chunks into a coherent report. Args: processed_chunks: List of processed chunks with extracted information query: Original search query query_type: Type of query (factual, exploratory, comparative) Returns: Synthesized report as a string """ # Prepare the context with all extracted information context = "" for i, chunk in enumerate(processed_chunks): context += f"Document {i+1}: {chunk.get('title', 'Untitled')}\n" context += f"Source: {chunk.get('url', 'Unknown')}\n" context += f"Extracted information:\n{chunk.get('extracted_info', '')}\n\n" # Create a template based on query type if query_type == "factual": template = """Create a comprehensive factual report that directly answers the query. Focus on accuracy and clarity. Include: 1. A clear, direct answer to the query 2. Supporting evidence and facts from the sources 3. Any relevant context needed to understand the answer 4. Citations for all information (use numbered citations in square brackets [1], [2], etc.) 5. A references section at the end listing all sources""" elif query_type == "comparative": template = """Create a comprehensive comparative report that analyzes different perspectives on the query. Include: 1. An overview of the topic and why it's significant 2. A balanced presentation of different viewpoints or approaches 3. Analysis of similarities and differences 4. Evidence supporting each perspective 5. A synthesis of the information that highlights key insights 6. Citations for all information (use numbered citations in square brackets [1], [2], etc.) 7. A references section at the end listing all sources""" else: # exploratory (default) template = """Create a comprehensive exploratory report that investigates the query in depth. Include: 1. An introduction that frames the topic and its significance 2. Key concepts and definitions 3. Main findings and insights from the sources 4. Analysis of the information that highlights patterns and connections 5. Implications or applications of the findings 6. Citations for all information (use numbered citations in square brackets [1], [2], etc.) 7. A references section at the end listing all sources""" # Create the prompt for synthesizing the report messages = [ {"role": "system", "content": f"""You are an expert research assistant tasked with creating comprehensive, well-structured reports. {template} Format the report in Markdown with clear headings, subheadings, and bullet points where appropriate. Make the report readable, engaging, and informative while maintaining academic rigor."""}, {"role": "user", "content": f"""Query: {query} Information from sources: {context} Synthesize this information into a comprehensive report that addresses the query. Use your own words to create a coherent narrative, but ensure all information is based on the provided sources. Include citations and a references section."""} ] # Generate the report report = await self.generate_completion(messages) return report async def synthesize_report(self, chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory") -> str: """ Synthesize a report from document chunks using the map-reduce approach. Args: chunks: List of document chunks query: Original search query query_type: Type of query (factual, exploratory, comparative) Returns: Synthesized report as a string """ logger.info(f"Synthesizing report for query: {query}") logger.info(f"Using {len(chunks)} document chunks") # Determine query type if not specified if query_type == "exploratory": # Try to infer query type from the query text if any(term in query.lower() for term in ["what is", "who is", "when did", "where is", "how does"]): query_type = "factual" elif any(term in query.lower() for term in ["compare", "difference", "versus", "pros and cons"]): query_type = "comparative" logger.info(f"Query type determined as: {query_type}") # Map phase: Process individual document chunks logger.info("Starting map phase: Processing individual document chunks") processed_chunks = await self.map_document_chunks(chunks, query) logger.info(f"Map phase complete: Processed {len(processed_chunks)} chunks") # Reduce phase: Synthesize processed chunks into a coherent report logger.info("Starting reduce phase: Synthesizing processed chunks into a report") report = await self.reduce_processed_chunks(processed_chunks, query, query_type) logger.info("Reduce phase complete: Report generated") return report # Create a singleton instance for global use report_synthesizer = ReportSynthesizer() def get_report_synthesizer(model_name: Optional[str] = None) -> ReportSynthesizer: """ Get the global report synthesizer instance or create a new one with a specific model. Args: model_name: Optional model name to use instead of the default Returns: ReportSynthesizer instance """ global report_synthesizer if model_name and model_name != report_synthesizer.model_name: report_synthesizer = ReportSynthesizer(model_name) return report_synthesizer async def test_report_synthesizer(): """Test the report synthesizer with sample document chunks.""" # Sample document chunks chunks = [ { "title": "Introduction to Python", "url": "https://docs.python.org/3/tutorial/index.html", "content": "Python is an easy to learn, powerful programming language. It has efficient high-level data structures and a simple but effective approach to object-oriented programming. Python's elegant syntax and dynamic typing, together with its interpreted nature, make it an ideal language for scripting and rapid application development in many areas on most platforms." }, { "title": "Python Features", "url": "https://www.python.org/about/", "content": "Python is a programming language that lets you work quickly and integrate systems more effectively. Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together." } ] # Initialize the report synthesizer synthesizer = get_report_synthesizer() # Test query query = "What are the key features of Python programming language?" # Map phase processed_chunks = await synthesizer.map_document_chunks(chunks, query) # Print processed chunks print("Processed chunks:") for i, chunk in enumerate(processed_chunks): print(f"Chunk {i+1}: {chunk.get('title')}") print(f"Extracted information: {chunk.get('extracted_info')}") print() # Reduce phase report = await synthesizer.reduce_processed_chunks(processed_chunks, query) # Print report print("Generated Report:") print(report) if __name__ == "__main__": asyncio.run(test_report_synthesizer())