""" Report synthesis module for the intelligent research system. This module provides functionality to synthesize reports from document chunks using LLMs with a map-reduce approach. """ import os import json import asyncio import logging from typing import Dict, List, Any, Optional, Tuple, Union import litellm from litellm import completion from config.config import get_config from report.report_detail_levels import get_report_detail_level_manager, DetailLevel from report.report_templates import QueryType, DetailLevel as TemplateDetailLevel, ReportTemplateManager, ReportTemplate # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Note: ReportTemplateManager and ReportTemplate are now imported from report_templates.py class ReportSynthesizer: """ Report synthesizer for the intelligent research system. This class provides methods to synthesize reports from document chunks using LLMs with a map-reduce approach. """ def __init__(self, model_name: Optional[str] = None): """ Initialize the report synthesizer. Args: model_name: Name of the LLM model to use. If None, uses the default model from configuration. """ self.config = get_config() # Use specified model or default from config for report synthesis self.model_name = model_name or self.config.config_data.get('report_synthesis', {}).get('model', 'llama-3.3-70b-versatile') # Get model-specific configuration self.model_config = self.config.get_model_config(self.model_name) # Set up LiteLLM with the appropriate provider self._setup_provider() # Initialize template manager self.template_manager = ReportTemplateManager() self.template_manager.initialize_default_templates() # Flag to process tags in model output self.process_thinking_tags = False # Progress tracking self.progress_callback = None self.total_chunks = 0 self.processed_chunk_count = 0 def set_progress_callback(self, callback): """ Set a callback function to report progress. Args: callback: Function that takes (current_progress, total, current_report) as arguments """ self.progress_callback = callback def _report_progress(self, current_report=None): """Report progress through the callback if set.""" if self.progress_callback and self.total_chunks > 0: progress = min(self.processed_chunk_count / self.total_chunks, 1.0) self.progress_callback(progress, self.total_chunks, current_report) def _setup_provider(self) -> None: """Set up the LLM provider based on the model configuration.""" provider = self.model_config.get('provider', 'groq') try: # Get API key for the provider api_key = self.config.get_api_key(provider) # Set environment variable for the provider if provider.lower() == 'google' or provider.lower() == 'gemini': os.environ["GEMINI_API_KEY"] = api_key elif provider.lower() == 'vertex_ai': os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = api_key else: os.environ[f"{provider.upper()}_API_KEY"] = api_key logger.info(f"Report synthesizer initialized with model: {self.model_name} (provider: {provider})") except ValueError as e: logger.error(f"Error setting up LLM provider: {e}") def _get_completion_params(self) -> Dict[str, Any]: """ Get parameters for LLM completion based on model configuration. Returns: Dictionary of parameters for LiteLLM completion """ params = { 'temperature': self.model_config.get('temperature', 0.3), # Lower temperature for factual reporting 'max_tokens': self.model_config.get('max_tokens', 4000), # Longer output for comprehensive reports 'top_p': self.model_config.get('top_p', 0.9) } # Handle different provider configurations provider = self.model_config.get('provider', 'groq') if provider == 'groq': # For Groq provider params['model'] = f"groq/{self.model_name}" elif provider == 'openrouter': # For OpenRouter provider params['model'] = self.model_config.get('model_name', self.model_name) params['api_base'] = self.model_config.get('endpoint') # Set HTTP headers for OpenRouter if needed params['headers'] = { 'HTTP-Referer': 'https://sim-search.app', # Replace with your actual app URL 'X-Title': 'Intelligent Research System' # Replace with your actual app name } elif provider == 'google' or provider == 'gemini': # Special handling for Google Gemini models # Format: gemini/model_name (e.g., gemini/gemini-2.0-flash) params['model'] = f"gemini/{self.model_config.get('model_name', self.model_name)}" # Add additional parameters for Gemini params['custom_llm_provider'] = 'gemini' elif provider == 'vertex_ai': # Special handling for Vertex AI Gemini models params['model'] = f"vertex_ai/{self.model_config.get('model_name', self.model_name)}" # Add Vertex AI specific parameters params['vertex_project'] = self.model_config.get('vertex_project', 'sim-search') params['vertex_location'] = self.model_config.get('vertex_location', 'us-central1') # Set custom provider params['custom_llm_provider'] = 'vertex_ai' else: # Standard provider (OpenAI, Anthropic, etc.) params['model'] = self.model_name return params async def generate_completion(self, messages: List[Dict[str, str]], stream: bool = False) -> Union[str, Any]: """ Generate a completion using the configured LLM. Args: messages: List of message dictionaries with 'role' and 'content' keys stream: Whether to stream the response Returns: If stream is False, returns the completion text as a string If stream is True, returns the completion response object for streaming """ # Get provider from model config provider = self.model_config.get('provider', 'groq').lower() # Special handling for Gemini models - they use 'user' and 'model' roles if provider == 'gemini': formatted_messages = [] for msg in messages: role = msg['role'] # Map 'system' to 'user' for the first message if role == 'system' and not formatted_messages: formatted_messages.append({ 'role': 'user', 'content': msg['content'] }) # Map 'assistant' to 'model' elif role == 'assistant': formatted_messages.append({ 'role': 'model', 'content': msg['content'] }) # Keep 'user' as is else: formatted_messages.append(msg) else: formatted_messages = messages # Get completion parameters params = self._get_completion_params() try: # Generate completion if stream: response = litellm.completion( messages=formatted_messages, stream=True, **params ) return response else: response = litellm.completion( messages=formatted_messages, **params ) # Extract content from response content = response.choices[0].message.content # Process thinking tags if enabled if self.process_thinking_tags: content = self._process_thinking_tags(content) return content except Exception as e: error_msg = f"Error generating completion: {str(e)}" logger.error(error_msg) # Return error message in a user-friendly format return f"I encountered an error while processing your request: {str(e)}" def _process_thinking_tags(self, content: str) -> str: """ Process and remove tags from model output. Some models like deepseek-r1-distill use tags for their internal reasoning. This method removes these tags and their content to produce a clean output. Args: content: The raw content from the model Returns: Processed content with thinking tags removed """ import re # Remove ... blocks clean_content = re.sub(r'.*?', '', content, flags=re.DOTALL) # Clean up any remaining tags clean_content = re.sub(r'', '', clean_content) # Remove extra newlines that might have been created clean_content = re.sub(r'\n{3,}', '\n\n', clean_content) return clean_content.strip() async def map_document_chunks(self, chunks: List[Dict[str, Any]], query: str, detail_level: str = "standard", query_type: str = "exploratory") -> List[Dict[str, Any]]: """ Map phase: Process individual document chunks to extract key information. Args: chunks: List of document chunks query: Original search query detail_level: Level of detail for the report (brief, standard, detailed, comprehensive) query_type: Type of query (factual, exploratory, comparative) Returns: List of processed chunks with extracted information """ processed_chunks = [] # Get the appropriate extraction prompt based on detail level and query type extraction_prompt = self._get_extraction_prompt(detail_level, query_type) total_chunks = len(chunks) logger.info(f"Starting to process {total_chunks} document chunks") # Determine batch size based on the model - Gemini can handle larger batches if "gemini" in self.model_name.lower(): batch_size = 8 # Larger batch size for Gemini models with 1M token windows else: batch_size = 3 # Smaller batch size for other models logger.info(f"Using batch size of {batch_size} for model {self.model_name}") for i in range(0, len(chunks), batch_size): batch = chunks[i:i+batch_size] logger.info(f"Processing batch {i//batch_size + 1}/{(len(chunks) + batch_size - 1)//batch_size} with {len(batch)} chunks") # Process this batch batch_results = [] for j, chunk in enumerate(batch): chunk_title = chunk.get('title', 'Untitled') chunk_index = i + j + 1 logger.info(f"Processing chunk {chunk_index}/{total_chunks}: {chunk_title[:50] if chunk_title else 'Untitled'}...") # Create a prompt for extracting key information from the chunk messages = [ {"role": "system", "content": extraction_prompt}, {"role": "user", "content": f"""Query: {query} Document title: {chunk.get('title', 'Untitled')} Document URL: {chunk.get('url', 'Unknown')} Document chunk content: {chunk.get('content', '')} Extract the most relevant information from this document chunk that addresses the query."""} ] try: # Process the chunk with the LLM extracted_info = await self.generate_completion(messages) # Add the extracted information to the chunk processed_chunk = chunk.copy() processed_chunk['extracted_info'] = extracted_info batch_results.append(processed_chunk) # Update progress self.processed_chunk_count += 1 self._report_progress() logger.info(f"Completed chunk {chunk_index}/{total_chunks} ({chunk_index/total_chunks*100:.1f}% complete)") except Exception as e: logger.error(f"Error processing chunk {chunk_index}/{total_chunks}: {str(e)}") # Add a placeholder for the failed chunk to maintain document order processed_chunk = chunk.copy() processed_chunk['extracted_info'] = f"Error extracting information: {str(e)}" batch_results.append(processed_chunk) # Update progress even for failed chunks self.processed_chunk_count += 1 self._report_progress() processed_chunks.extend(batch_results) # Add a small delay between batches to avoid rate limiting if i + batch_size < len(chunks): logger.info("Pausing briefly between batches...") await asyncio.sleep(2) logger.info(f"Completed processing all {total_chunks} chunks") return processed_chunks def _get_extraction_prompt(self, detail_level: str, query_type: str = "exploratory") -> str: """ Get the appropriate extraction prompt based on detail level and query type. Args: detail_level: Level of detail for the report (brief, standard, detailed, comprehensive) query_type: Type of query (factual, exploratory, comparative) Returns: Extraction prompt as a string """ # Base prompts by detail level if detail_level.lower() in ["brief", "standard"]: base_prompt = """You are an expert research assistant. Extract the most relevant information from this document chunk that addresses the user's query. Focus on factual information, key concepts, and important details. Include any relevant statistics, definitions, or explanations that would be valuable for a report. Format your response as a concise summary with bullet points for key facts.""" elif detail_level.lower() == "detailed": base_prompt = """You are an expert research analyst with deep domain knowledge. Extract comprehensive information from this document chunk that addresses the user's query. Focus on: - Detailed factual information and evidence - Underlying principles and mechanisms - Causal relationships and correlations - Contextual factors and historical development - Different perspectives or interpretations - Quantitative data and qualitative insights - Nuances, edge cases, and exceptions Prioritize depth of analysis over breadth. Extract information that provides deeper understanding rather than just basic facts. Format your response with clear sections and bullet points for key insights.""" else: # comprehensive base_prompt = """You are a world-class research analyst with exceptional analytical abilities. Extract the most comprehensive and nuanced information from this document chunk. Focus on: - Multi-layered analysis of all relevant facts and evidence - Complex causal networks and interaction effects - Theoretical frameworks and their applications - Historical evolution and future trajectories - Methodological considerations and limitations - Diverse perspectives and their epistemological foundations - Statistical data, case studies, and expert opinions - Contradictions, paradoxes, and unresolved questions Extract information that provides the deepest possible understanding of the topic as it relates to the query. Analyze the reliability and significance of the information. Format your response with clearly organized sections and detailed bullet points.""" # Add specific instructions for comparative queries # Handle the case where query_type is None if query_type is not None and query_type.lower() == "comparative": comparative_instructions = """ IMPORTANT: This is a COMPARATIVE query. The user is asking to compare two or more things. When extracting information, focus specifically on: 1. Characteristics, features, or attributes of EACH item being compared 2. Direct comparisons between the items mentioned in the query 3. Advantages and disadvantages of each item 4. Similarities and differences between the items 5. Contexts where one item might be preferred over others Make sure to clearly identify which information relates to which item being compared. Organize your extraction to facilitate easy comparison between the items. """ return base_prompt + comparative_instructions return base_prompt def _get_template_from_strings(self, query_type_str: Optional[str], detail_level_str: str) -> Optional[ReportTemplate]: """ Helper method to get a template using string values for query_type and detail_level. Args: query_type_str: String value of query type (factual, exploratory, comparative), or None detail_level_str: String value of detail level (brief, standard, detailed, comprehensive) Returns: ReportTemplate object or None if not found """ try: # Handle None query_type by defaulting to "exploratory" if query_type_str is None: query_type_str = "exploratory" logger.info(f"Query type is None, defaulting to {query_type_str}") # Convert string values to enum objects query_type_enum = QueryType(query_type_str) detail_level_enum = TemplateDetailLevel(detail_level_str) # Get template using enum objects template = self.template_manager.get_template(query_type_enum, detail_level_enum) if template: logger.info(f"Found template for {query_type_str} {detail_level_str}") else: logger.warning(f"No template found for {query_type_str} {detail_level_str}") return template except (ValueError, KeyError) as e: logger.error(f"Error getting template for {query_type_str} {detail_level_str}: {str(e)}") return None async def reduce_processed_chunks(self, processed_chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory", detail_level: str = "standard") -> str: """ Reduce phase: Synthesize processed chunks into a coherent report. Args: processed_chunks: List of processed chunks with extracted information query: Original search query query_type: Type of query (factual, exploratory, comparative) detail_level: Level of detail for the report (brief, standard, detailed, comprehensive) Returns: Synthesized report as a string """ # Prepare the context with all extracted information context = "" for i, chunk in enumerate(processed_chunks): title = chunk.get('title', 'Untitled') url = chunk.get('url', 'Unknown') context += f"Document {i+1}:\n" context += f"Title: {title}\n" context += f"URL: {url}\n" context += f"Source URL: {url}\n" # Duplicate for emphasis context += f"Extracted information:\n{chunk.get('extracted_info', '')}\n\n" # Get template modifier based on detail level and query type using helper method template = self._get_template_from_strings(query_type, detail_level) if not template: raise ValueError(f"No template found for {query_type} {detail_level}") # Add specific instructions for references formatting reference_instructions = """ When including references, use a consistent format: [1] Title of the Article/Page. URL IMPORTANT: 1. DO NOT use generic placeholders like "Document 1" for references 2. ALWAYS include the actual URL from the source documents 3. Each reference MUST include both the title and the URL 4. Make sure all references are complete and properly formatted 5. Number the references sequentially starting from 1 6. Include the URL for EACH reference - this is critical. """ # Special handling for Gemini models if "gemini" in self.model_name.lower(): reference_instructions += """ IMPORTANT: Due to token limitations, ensure the References section is completed properly. If you feel you might run out of tokens, start the References section earlier and make it more concise. Never leave the References section incomplete or cut off mid-reference. """ # Create the prompt for synthesizing the report messages = [ {"role": "system", "content": f"""You are an expert research assistant tasked with creating comprehensive, well-structured reports. {template.template} Format the report in Markdown with clear headings, subheadings, and bullet points where appropriate. Make the report readable, engaging, and informative while maintaining academic rigor. {reference_instructions}"""}, {"role": "user", "content": f"""Query: {query} Information from sources: {context} Synthesize this information into a report that addresses the query. Use your own words to create a coherent narrative, but ensure all information is based on the provided sources. Include citations and a references section."""} ] # Generate the report report = await self.generate_completion(messages) # Check if the report might be cut off at the end if report.strip().endswith('[') or report.strip().endswith(']') or report.strip().endswith('...'): logger.warning("Report appears to be cut off at the end. Attempting to fix references section.") # Try to fix the references section by generating it separately try: # Extract what we have so far without the incomplete references if "References" in report: report_without_refs = report.split("References")[0].strip() else: report_without_refs = report # Generate just the references section ref_messages = [ {"role": "system", "content": """You are an expert at formatting reference lists. Create a properly formatted References section for the following documents. IMPORTANT: 1. Use the actual title and URL from each document 2. DO NOT use generic placeholders like "Document 1" 3. Format each reference as: [1] Title of the Article/Page. URL 4. Each reference MUST include both the title and the URL 5. Make sure all references are complete and properly formatted 6. Number the references sequentially starting from 1"""}, {"role": "user", "content": f"""Here are the documents used in the report: {context} Create a complete, properly formatted References section in Markdown format. Remember to include the URL for EACH reference - this is critical."""} ] references = await self.generate_completion(ref_messages) # Combine the report with the fixed references report = f"{report_without_refs}\n\n## References\n\n{references}" except Exception as e: logger.error(f"Error fixing references section: {str(e)}") return report async def synthesize_report(self, chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory", detail_level: str = "standard") -> str: """ Synthesize a report from document chunks using the map-reduce approach. Args: chunks: List of document chunks query: Original search query query_type: Type of query (factual, exploratory, comparative) detail_level: Level of detail for the report (brief, standard, detailed, comprehensive) Returns: Synthesized report as a string """ if not chunks: logger.warning("No document chunks provided for report synthesis.") return "No information found for the given query." # Reset progress tracking self.total_chunks = len(chunks) self.processed_chunk_count = 0 # Verify that a template exists for the given query type and detail level template = self._get_template_from_strings(query_type, detail_level) if not template: logger.warning(f"No template found for {query_type} {detail_level}, falling back to standard template") # Fall back to standard detail level if the requested one doesn't exist detail_level = "standard" # Get detail level configuration detail_level_manager = get_report_detail_level_manager() config = detail_level_manager.get_detail_level_config(detail_level) token_budget = config.get("token_budget", 100000) # Determine query type based on the query text # Always try to infer the query type, regardless of what was passed in if any(term in query.lower() for term in ["what is", "who is", "when did", "where is", "how does"]): query_type = "factual" elif any(term in query.lower() for term in ["compare", "difference", "versus", "vs", "pros and cons"]): query_type = "comparative" else: # Default to exploratory if no specific pattern is detected query_type = "exploratory" logger.info(f"Query type determined as: {query_type}") # Estimate total tokens in chunks total_tokens = sum(len(chunk.get('content', '').split()) * 1.3 for chunk in chunks) # Rough estimate logger.info(f"Estimated total tokens in {len(chunks)} chunks: {total_tokens}") # If total tokens exceeds 80% of the token budget, reduce the number of chunks if total_tokens > token_budget * 0.8: max_chunks = int(len(chunks) * (token_budget * 0.8 / total_tokens)) max_chunks = max(3, max_chunks) # Ensure we have at least 3 chunks logger.warning(f"Token count ({total_tokens}) exceeds 80% of budget ({token_budget}). Reducing chunks from {len(chunks)} to {max_chunks}.") chunks = chunks[:max_chunks] # Recalculate estimated tokens total_tokens = sum(len(chunk.get('content', '').split()) * 1.3 for chunk in chunks) logger.info(f"Reduced to {len(chunks)} chunks with estimated {total_tokens} tokens") # Update total chunks for progress tracking self.total_chunks = len(chunks) logger.info(f"Starting map phase for {len(chunks)} document chunks with query type '{query_type}' and detail level '{detail_level}'") # Process chunks in batches to avoid hitting payload limits # Determine batch size based on the model - Gemini can handle larger batches if "gemini" in self.model_name.lower(): batch_size = 8 # Larger batch size for Gemini models with 1M token windows else: batch_size = 3 # Smaller batch size for other models logger.info(f"Using batch size of {batch_size} for model {self.model_name}") processed_chunks = [] for i in range(0, len(chunks), batch_size): batch = chunks[i:i+batch_size] logger.info(f"Processing batch {i//batch_size + 1}/{(len(chunks) + batch_size - 1)//batch_size} with {len(batch)} chunks") # Ensure all chunks have a title, even if it's 'Untitled' for chunk in batch: if chunk.get('title') is None: chunk['title'] = 'Untitled' # Process this batch batch_results = await self.map_document_chunks(batch, query, detail_level, query_type) processed_chunks.extend(batch_results) # Add a small delay between batches to avoid rate limiting if i + batch_size < len(chunks): logger.info("Pausing briefly between batches...") await asyncio.sleep(2) logger.info(f"Starting reduce phase to synthesize report from {len(processed_chunks)} processed chunks") # Update progress status for reduce phase if self.progress_callback: self.progress_callback(0.9, self.total_chunks, "Synthesizing final report...") # Reduce phase: Synthesize processed chunks into a coherent report report = await self.reduce_processed_chunks(processed_chunks, query, query_type, detail_level) # Process thinking tags if enabled if self.process_thinking_tags and "" in report: logger.info("Processing thinking tags in report") report = self._process_thinking_tags(report) # Final progress update if self.progress_callback: self.progress_callback(1.0, self.total_chunks, report) return report # Create a singleton instance for global use report_synthesizer = ReportSynthesizer() def get_report_synthesizer(model_name: Optional[str] = None) -> ReportSynthesizer: """ Get the global report synthesizer instance or create a new one with a specific model. Args: model_name: Optional model name to use instead of the default Returns: ReportSynthesizer instance """ global report_synthesizer if model_name and model_name != report_synthesizer.model_name: report_synthesizer = ReportSynthesizer(model_name) return report_synthesizer async def test_report_synthesizer(): """Test the report synthesizer with sample document chunks.""" # Sample document chunks chunks = [ { "title": "Introduction to Python", "url": "https://docs.python.org/3/tutorial/index.html", "content": "Python is an easy to learn, powerful programming language. It has efficient high-level data structures and a simple but effective approach to object-oriented programming. Python's elegant syntax and dynamic typing, together with its interpreted nature, make it an ideal language for scripting and rapid application development in many areas on most platforms." }, { "title": "Python Features", "url": "https://www.python.org/about/", "content": "Python is a programming language that lets you work quickly and integrate systems more effectively. Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together." } ] # Initialize the report synthesizer synthesizer = get_report_synthesizer() # Test query query = "What are the key features of Python programming language?" # Map phase processed_chunks = await synthesizer.map_document_chunks(chunks, query, detail_level="detailed") # Print processed chunks print("Processed chunks:") for i, chunk in enumerate(processed_chunks): print(f"Chunk {i+1}: {chunk.get('title')}") print(f"Extracted information: {chunk.get('extracted_info')}") print() # Reduce phase report = await synthesizer.reduce_processed_chunks(processed_chunks, query, detail_level="detailed") # Print report print("Generated Report:") print(report) if __name__ == "__main__": asyncio.run(test_report_synthesizer())