""" Report synthesis module for the intelligent research system. This module provides functionality to synthesize reports from document chunks using LLMs with a map-reduce approach. """ import os import json import asyncio import logging from typing import Dict, List, Any, Optional, Tuple, Union import litellm from litellm import completion from config.config import get_config from report.report_detail_levels import get_report_detail_level_manager, DetailLevel # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class ReportSynthesizer: """ Report synthesizer for the intelligent research system. This class provides methods to synthesize reports from document chunks using LLMs with a map-reduce approach. """ def __init__(self, model_name: Optional[str] = None): """ Initialize the report synthesizer. Args: model_name: Name of the LLM model to use. If None, uses the default model from configuration. """ self.config = get_config() # Use specified model or default from config for report synthesis self.model_name = model_name or self.config.config_data.get('report_synthesis', {}).get('model', 'llama-3.3-70b-versatile') # Get model-specific configuration self.model_config = self.config.get_model_config(self.model_name) # Set up LiteLLM with the appropriate provider self._setup_provider() # Flag to process tags in model output self.process_thinking_tags = False def _setup_provider(self) -> None: """Set up the LLM provider based on the model configuration.""" provider = self.model_config.get('provider', 'groq') try: # Get API key for the provider api_key = self.config.get_api_key(provider) # Set environment variable for the provider os.environ[f"{provider.upper()}_API_KEY"] = api_key logger.info(f"Report synthesizer initialized with model: {self.model_name} (provider: {provider})") except ValueError as e: logger.error(f"Error setting up LLM provider: {e}") def _get_completion_params(self) -> Dict[str, Any]: """ Get parameters for LLM completion based on model configuration. Returns: Dictionary of parameters for LiteLLM completion """ params = { 'temperature': self.model_config.get('temperature', 0.3), # Lower temperature for factual reporting 'max_tokens': self.model_config.get('max_tokens', 4000), # Longer output for comprehensive reports 'top_p': self.model_config.get('top_p', 0.9) } # Handle different provider configurations provider = self.model_config.get('provider', 'groq') if provider == 'groq': # For Groq provider params['model'] = f"groq/{self.model_name}" elif provider == 'openrouter': # For OpenRouter provider params['model'] = self.model_config.get('model_name', self.model_name) params['api_base'] = self.model_config.get('endpoint') # Set HTTP headers for OpenRouter if needed params['headers'] = { 'HTTP-Referer': 'https://sim-search.app', # Replace with your actual app URL 'X-Title': 'Intelligent Research System' # Replace with your actual app name } else: # Standard provider (OpenAI, Anthropic, etc.) params['model'] = self.model_name return params async def generate_completion(self, messages: List[Dict[str, str]], stream: bool = False) -> Union[str, Any]: """ Generate a completion using the configured LLM. Args: messages: List of message dictionaries with 'role' and 'content' keys stream: Whether to stream the response Returns: If stream is False, returns the completion text as a string If stream is True, returns the completion response object for streaming """ try: params = self._get_completion_params() params['messages'] = messages params['stream'] = stream response = completion(**params) if stream: return response else: content = response.choices[0].message.content # Process tags if enabled if self.process_thinking_tags: content = self._process_thinking_tags(content) return content except Exception as e: logger.error(f"Error generating completion: {e}") return f"Error: {str(e)}" def _process_thinking_tags(self, content: str) -> str: """ Process and remove tags from model output. Some models like deepseek-r1-distill use tags for their internal reasoning. This method removes these tags and their content to produce a clean output. Args: content: The raw content from the model Returns: Processed content with thinking tags removed """ import re # Remove ... blocks clean_content = re.sub(r'.*?', '', content, flags=re.DOTALL) # Clean up any remaining tags clean_content = re.sub(r'', '', clean_content) # Remove extra newlines that might have been created clean_content = re.sub(r'\n{3,}', '\n\n', clean_content) return clean_content.strip() async def map_document_chunks(self, chunks: List[Dict[str, Any]], query: str, detail_level: str = "standard") -> List[Dict[str, Any]]: """ Map phase: Process individual document chunks to extract key information. Args: chunks: List of document chunks query: Original search query detail_level: Level of detail for the report (brief, standard, detailed, comprehensive) Returns: List of processed chunks with extracted information """ processed_chunks = [] # Get the appropriate extraction prompt based on detail level extraction_prompt = self._get_extraction_prompt(detail_level) for chunk in chunks: # Create a prompt for extracting key information from the chunk messages = [ {"role": "system", "content": extraction_prompt}, {"role": "user", "content": f"""Query: {query} Document title: {chunk.get('title', 'Untitled')} Document URL: {chunk.get('url', 'Unknown')} Document chunk content: {chunk.get('content', '')} Extract the most relevant information from this document chunk that addresses the query."""} ] # Process the chunk with the LLM extracted_info = await self.generate_completion(messages) # Add the extracted information to the chunk processed_chunk = chunk.copy() processed_chunk['extracted_info'] = extracted_info processed_chunks.append(processed_chunk) return processed_chunks def _get_extraction_prompt(self, detail_level: str) -> str: """ Get the appropriate extraction prompt based on detail level. Args: detail_level: Level of detail for the report (brief, standard, detailed, comprehensive) Returns: Extraction prompt as a string """ if detail_level.lower() in ["brief", "standard"]: return """You are an expert research assistant. Extract the most relevant information from this document chunk that addresses the user's query. Focus on factual information, key concepts, and important details. Include any relevant statistics, definitions, or explanations that would be valuable for a report. Format your response as a concise summary with bullet points for key facts.""" elif detail_level.lower() == "detailed": return """You are an expert research analyst with deep domain knowledge. Extract comprehensive information from this document chunk that addresses the user's query. Focus on: - Detailed factual information and evidence - Underlying principles and mechanisms - Causal relationships and correlations - Contextual factors and historical development - Different perspectives or interpretations - Quantitative data and qualitative insights - Nuances, edge cases, and exceptions Prioritize depth of analysis over breadth. Extract information that provides deeper understanding rather than just basic facts. Format your response with clear sections and bullet points for key insights.""" else: # comprehensive return """You are a world-class research analyst with exceptional analytical abilities. Extract the most comprehensive and nuanced information from this document chunk. Focus on: - Multi-layered analysis of all relevant facts and evidence - Complex causal networks and interaction effects - Theoretical frameworks and their applications - Historical evolution and future trajectories - Methodological considerations and limitations - Diverse perspectives and their epistemological foundations - Statistical data, case studies, and expert opinions - Contradictions, paradoxes, and unresolved questions Extract information that provides the deepest possible understanding of the topic as it relates to the query. Analyze the reliability and significance of the information. Format your response with clearly organized sections and detailed bullet points.""" async def reduce_processed_chunks(self, processed_chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory", detail_level: str = "standard") -> str: """ Reduce phase: Synthesize processed chunks into a coherent report. Args: processed_chunks: List of processed chunks with extracted information query: Original search query query_type: Type of query (factual, exploratory, comparative) detail_level: Level of detail for the report (brief, standard, detailed, comprehensive) Returns: Synthesized report as a string """ # Prepare the context with all extracted information context = "" for i, chunk in enumerate(processed_chunks): context += f"Document {i+1}: {chunk.get('title', 'Untitled')}\n" context += f"Source: {chunk.get('url', 'Unknown')}\n" context += f"Extracted information:\n{chunk.get('extracted_info', '')}\n\n" # Get template modifier based on detail level and query type detail_level_manager = get_report_detail_level_manager() template = detail_level_manager.get_template_modifier(detail_level, query_type) # Create the prompt for synthesizing the report messages = [ {"role": "system", "content": f"""You are an expert research assistant tasked with creating comprehensive, well-structured reports. {template} Format the report in Markdown with clear headings, subheadings, and bullet points where appropriate. Make the report readable, engaging, and informative while maintaining academic rigor."""}, {"role": "user", "content": f"""Query: {query} Information from sources: {context} Synthesize this information into a report that addresses the query. Use your own words to create a coherent narrative, but ensure all information is based on the provided sources. Include citations and a references section."""} ] # Generate the report report = await self.generate_completion(messages) return report async def synthesize_report(self, chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory", detail_level: str = "standard") -> str: """ Synthesize a report from document chunks using the map-reduce approach. Args: chunks: List of document chunks query: Original search query query_type: Type of query (factual, exploratory, comparative) detail_level: Level of detail for the report (brief, standard, detailed, comprehensive) Returns: Synthesized report as a string """ logger.info(f"Synthesizing report for query: {query}") logger.info(f"Using {len(chunks)} document chunks") logger.info(f"Detail level: {detail_level}") # Determine query type if not specified if query_type == "exploratory": # Try to infer query type from the query text if any(term in query.lower() for term in ["what is", "who is", "when did", "where is", "how does"]): query_type = "factual" elif any(term in query.lower() for term in ["compare", "difference", "versus", "pros and cons"]): query_type = "comparative" logger.info(f"Query type determined as: {query_type}") # Map phase: Process individual document chunks logger.info("Starting map phase: Processing individual document chunks") processed_chunks = await self.map_document_chunks(chunks, query, detail_level) logger.info(f"Map phase complete: Processed {len(processed_chunks)} chunks") # Reduce phase: Synthesize processed chunks into a coherent report logger.info("Starting reduce phase: Synthesizing processed chunks into a report") report = await self.reduce_processed_chunks(processed_chunks, query, query_type, detail_level) logger.info("Reduce phase complete: Report generated") return report # Create a singleton instance for global use report_synthesizer = ReportSynthesizer() def get_report_synthesizer(model_name: Optional[str] = None) -> ReportSynthesizer: """ Get the global report synthesizer instance or create a new one with a specific model. Args: model_name: Optional model name to use instead of the default Returns: ReportSynthesizer instance """ global report_synthesizer if model_name and model_name != report_synthesizer.model_name: report_synthesizer = ReportSynthesizer(model_name) return report_synthesizer async def test_report_synthesizer(): """Test the report synthesizer with sample document chunks.""" # Sample document chunks chunks = [ { "title": "Introduction to Python", "url": "https://docs.python.org/3/tutorial/index.html", "content": "Python is an easy to learn, powerful programming language. It has efficient high-level data structures and a simple but effective approach to object-oriented programming. Python's elegant syntax and dynamic typing, together with its interpreted nature, make it an ideal language for scripting and rapid application development in many areas on most platforms." }, { "title": "Python Features", "url": "https://www.python.org/about/", "content": "Python is a programming language that lets you work quickly and integrate systems more effectively. Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together." } ] # Initialize the report synthesizer synthesizer = get_report_synthesizer() # Test query query = "What are the key features of Python programming language?" # Map phase processed_chunks = await synthesizer.map_document_chunks(chunks, query, detail_level="detailed") # Print processed chunks print("Processed chunks:") for i, chunk in enumerate(processed_chunks): print(f"Chunk {i+1}: {chunk.get('title')}") print(f"Extracted information: {chunk.get('extracted_info')}") print() # Reduce phase report = await synthesizer.reduce_processed_chunks(processed_chunks, query, detail_level="detailed") # Print report print("Generated Report:") print(report) if __name__ == "__main__": asyncio.run(test_report_synthesizer())