""" Progressive report synthesis module for the intelligent research system. This module provides functionality to synthesize reports from document chunks using LLMs with a progressive approach, where chunks are processed iteratively and the report is refined over time. """ import os import json import asyncio import logging import time from typing import Dict, List, Any, Optional, Tuple, Union, Set from dataclasses import dataclass, field import litellm from litellm import completion from config.config import get_config from report.report_detail_levels import get_report_detail_level_manager, DetailLevel from report.report_templates import QueryType, DetailLevel as TemplateDetailLevel, ReportTemplateManager, ReportTemplate from report.report_synthesis import ReportSynthesizer # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) @dataclass class ReportState: """Class to track the state of a progressive report.""" current_report: str = "" processed_chunks: Set[str] = field(default_factory=set) version: int = 0 last_update_time: float = field(default_factory=time.time) improvement_scores: List[float] = field(default_factory=list) is_complete: bool = False termination_reason: Optional[str] = None class ProgressiveReportSynthesizer(ReportSynthesizer): """ Progressive report synthesizer for the intelligent research system. This class extends the ReportSynthesizer to implement a progressive approach to report generation, where chunks are processed iteratively and the report is refined over time. """ def __init__(self, model_name: Optional[str] = None): """ Initialize the progressive report synthesizer. Args: model_name: Name of the LLM model to use. If None, uses the default model from configuration. """ super().__init__(model_name) # Initialize report state self.report_state = ReportState() # Configuration for progressive generation self.min_improvement_threshold = 0.2 # Minimum improvement score to continue self.max_consecutive_low_improvements = 3 # Max number of consecutive low improvements before stopping self.batch_size = 3 # Number of chunks to process in each iteration self.max_iterations = 20 # Maximum number of iterations self.consecutive_low_improvements = 0 # Counter for consecutive low improvements # Progress tracking self.total_chunks = 0 self.processed_chunk_count = 0 self.progress_callback = None def set_progress_callback(self, callback): """ Set a callback function to report progress. Args: callback: Function that takes (current_progress, total, current_report) as arguments """ self.progress_callback = callback def _report_progress(self): """Report progress through the callback if set.""" if self.progress_callback and self.total_chunks > 0: progress = min(self.processed_chunk_count / self.total_chunks, 1.0) self.progress_callback(progress, self.total_chunks, self.report_state.current_report) def prioritize_chunks(self, chunks: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]: """ Prioritize chunks based on relevance to the query and other factors. Args: chunks: List of document chunks query: Original search query Returns: List of chunks sorted by priority """ # Start with chunks already prioritized by the document processor # Further refine based on additional criteria if needed # Filter out chunks that have already been processed unprocessed_chunks = [ chunk for chunk in chunks if chunk.get('document_id') and str(chunk.get('document_id')) not in self.report_state.processed_chunks ] # If all chunks have been processed, return an empty list if not unprocessed_chunks: return [] # Sort by priority score (already set by document processor) prioritized_chunks = sorted( unprocessed_chunks, key=lambda x: x.get('priority_score', 0.0), reverse=True ) return prioritized_chunks async def extract_information_from_chunk(self, chunk: Dict[str, Any], query: str, detail_level: str = "comprehensive", query_type: str = "exploratory") -> str: """ Extract key information from a document chunk. Args: chunk: Document chunk query: Original search query detail_level: Level of detail for extraction query_type: Type of query (factual, exploratory, comparative) Returns: Extracted information as a string """ # Get the appropriate extraction prompt based on detail level and query type extraction_prompt = self._get_extraction_prompt(detail_level, query_type) # Create a prompt for extracting key information from the chunk messages = [ {"role": "system", "content": extraction_prompt}, {"role": "user", "content": f"""Query: {query} Document title: {chunk.get('title', 'Untitled')} Document URL: {chunk.get('url', 'Unknown')} Document chunk content: {chunk.get('content', '')} Extract the most relevant information from this document chunk that addresses the query."""} ] # Process the chunk with the LLM extracted_info = await self.generate_completion(messages) return extracted_info async def refine_report(self, current_report: str, new_information: List[Tuple[Dict[str, Any], str]], query: str, query_type: str, detail_level: str) -> Tuple[str, float]: """ Refine the current report with new information. Args: current_report: Current version of the report new_information: List of tuples containing (chunk, extracted_information) query: Original search query query_type: Type of query (factual, exploratory, comparative) detail_level: Level of detail for the report Returns: Tuple of (refined_report, improvement_score) """ # Prepare context with new information context = "" for chunk, extracted_info in new_information: title = chunk.get('title', 'Untitled') url = chunk.get('url', 'Unknown') context += f"Document: {title}\n" context += f"URL: {url}\n" context += f"Source URL: {url}\n" # Duplicate for emphasis context += f"Extracted information:\n{extracted_info}\n\n" # Get template for the report template = self._get_template_from_strings(query_type, detail_level) if not template: raise ValueError(f"No template found for {query_type} {detail_level}") # Create the prompt for refining the report messages = [ {"role": "system", "content": f"""You are an expert research assistant tasked with progressively refining a research report. You will be given: 1. The current version of the report 2. New information extracted from additional documents Your task is to refine and improve the report by incorporating the new information. Follow these guidelines: 1. Maintain the overall structure and format of the report 2. Add new relevant information where appropriate 3. Expand sections with new details, examples, or evidence 4. Improve analysis based on the new information 5. Add or update citations for new information 6. Ensure the report follows this template structure: {template.template} Format the report in Markdown with clear headings, subheadings, and bullet points where appropriate. Make the report readable, engaging, and informative while maintaining academic rigor. IMPORTANT FOR REFERENCES: - Use a consistent format: [1] Title of the Article/Page. URL - DO NOT use generic placeholders like "Document 1" for references - ALWAYS include the actual URL from the source documents - Each reference MUST include both the title and the URL - Make sure all references are complete and properly formatted - Number the references sequentially After refining the report, rate how much the new information improved the report on a scale of 0.0 to 1.0: - 0.0: No improvement (new information was redundant or irrelevant) - 0.5: Moderate improvement (new information added some value) - 1.0: Significant improvement (new information substantially enhanced the report) End your response with a single line containing only the improvement score in this format: IMPROVEMENT_SCORE: [score] """}, {"role": "user", "content": f"""Query: {query} Current report: {current_report} New information from additional sources: {context} Please refine the report by incorporating this new information while maintaining the overall structure and format."""} ] # Generate the refined report response = await self.generate_completion(messages) # Extract the improvement score improvement_score = 0.5 # Default moderate improvement score_line = response.strip().split('\n')[-1] if score_line.startswith('IMPROVEMENT_SCORE:'): try: improvement_score = float(score_line.split(':')[1].strip()) # Remove the score line from the report response = '\n'.join(response.strip().split('\n')[:-1]) except (ValueError, IndexError): logger.warning("Could not parse improvement score, using default value of 0.5") return response, improvement_score async def initialize_report(self, initial_chunks: List[Dict[str, Any]], query: str, query_type: str, detail_level: str) -> str: """ Initialize the report with the first batch of chunks. Args: initial_chunks: Initial batch of document chunks query: Original search query query_type: Type of query (factual, exploratory, comparative) detail_level: Level of detail for the report Returns: Initial report as a string """ logger.info(f"Initializing report with {len(initial_chunks)} chunks") # Process initial chunks using the standard map-reduce approach processed_chunks = await self.map_document_chunks(initial_chunks, query, detail_level, query_type) # Generate initial report initial_report = await self.reduce_processed_chunks(processed_chunks, query, query_type, detail_level) # Update report state self.report_state.current_report = initial_report self.report_state.version = 1 self.report_state.last_update_time = time.time() # Mark chunks as processed for chunk in initial_chunks: if chunk.get('document_id'): self.report_state.processed_chunks.add(str(chunk.get('document_id'))) self.processed_chunk_count += len(initial_chunks) self._report_progress() return initial_report def should_terminate(self, improvement_score: float) -> Tuple[bool, Optional[str]]: """ Determine if the progressive report generation should terminate. Args: improvement_score: Score indicating how much the report improved Returns: Tuple of (should_terminate, reason) """ # Check if all chunks have been processed if self.processed_chunk_count >= self.total_chunks: return True, "All chunks processed" # Check if maximum iterations reached if self.report_state.version >= self.max_iterations: return True, "Maximum iterations reached" # Check for diminishing returns if improvement_score < self.min_improvement_threshold: self.consecutive_low_improvements += 1 if self.consecutive_low_improvements >= self.max_consecutive_low_improvements: return True, "Diminishing returns (consecutive low improvements)" else: self.consecutive_low_improvements = 0 return False, None async def synthesize_report_progressively(self, chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory", detail_level: str = "comprehensive") -> str: """ Synthesize a report from document chunks using a progressive approach. Args: chunks: List of document chunks query: Original search query query_type: Type of query (factual, exploratory, comparative) detail_level: Level of detail for the report Returns: Synthesized report as a string """ if not chunks: logger.warning("No document chunks provided for report synthesis.") return "No information found for the given query." # Reset report state self.report_state = ReportState() self.consecutive_low_improvements = 0 self.total_chunks = len(chunks) self.processed_chunk_count = 0 # Verify that a template exists for the given query type and detail level template = self._get_template_from_strings(query_type, detail_level) if not template: logger.warning(f"No template found for {query_type} {detail_level}, falling back to standard template") # Fall back to standard detail level if the requested one doesn't exist detail_level = "standard" # Determine batch size based on the model if "gemini" in self.model_name.lower(): self.batch_size = 5 # Larger batch size for Gemini models with 1M token windows else: self.batch_size = 3 # Smaller batch size for other models logger.info(f"Using batch size of {self.batch_size} for model {self.model_name}") # Prioritize chunks prioritized_chunks = self.prioritize_chunks(chunks, query) # Initialize report with first batch of chunks initial_batch = prioritized_chunks[:self.batch_size] await self.initialize_report(initial_batch, query, query_type, detail_level) # Progressive refinement loop while True: # Check if we should terminate should_terminate, reason = self.should_terminate( self.report_state.improvement_scores[-1] if self.report_state.improvement_scores else 1.0 ) if should_terminate: logger.info(f"Terminating progressive report generation: {reason}") self.report_state.is_complete = True self.report_state.termination_reason = reason break # Get next batch of chunks prioritized_chunks = self.prioritize_chunks(chunks, query) next_batch = prioritized_chunks[:self.batch_size] if not next_batch: logger.info("No more chunks to process") self.report_state.is_complete = True self.report_state.termination_reason = "All chunks processed" break logger.info(f"Processing batch {self.report_state.version + 1} with {len(next_batch)} chunks") # Extract information from chunks new_information = [] for chunk in next_batch: extracted_info = await self.extract_information_from_chunk(chunk, query, detail_level, query_type) new_information.append((chunk, extracted_info)) # Mark chunk as processed if chunk.get('document_id'): self.report_state.processed_chunks.add(str(chunk.get('document_id'))) # Refine report with new information refined_report, improvement_score = await self.refine_report( self.report_state.current_report, new_information, query, query_type, detail_level ) # Update report state self.report_state.current_report = refined_report self.report_state.version += 1 self.report_state.last_update_time = time.time() self.report_state.improvement_scores.append(improvement_score) self.processed_chunk_count += len(next_batch) self._report_progress() logger.info(f"Completed iteration {self.report_state.version} with improvement score {improvement_score:.2f}") # Add a small delay between iterations to avoid rate limiting await asyncio.sleep(2) # Final report return self.report_state.current_report async def synthesize_report(self, chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory", detail_level: str = "standard") -> str: """ Synthesize a report from document chunks. This method overrides the parent method to use progressive synthesis for comprehensive detail level and standard map-reduce for other detail levels. Args: chunks: List of document chunks query: Original search query query_type: Type of query (factual, exploratory, comparative) detail_level: Level of detail for the report Returns: Synthesized report as a string """ # Use progressive synthesis for comprehensive detail level if detail_level.lower() == "comprehensive": logger.info(f"Using progressive synthesis for {detail_level} detail level") return await self.synthesize_report_progressively(chunks, query, query_type, detail_level) else: # Use standard map-reduce for other detail levels logger.info(f"Using standard map-reduce for {detail_level} detail level") return await super().synthesize_report(chunks, query, query_type, detail_level) # Create a singleton instance for global use progressive_report_synthesizer = ProgressiveReportSynthesizer() def get_progressive_report_synthesizer(model_name: Optional[str] = None) -> ProgressiveReportSynthesizer: """ Get the global progressive report synthesizer instance or create a new one with a specific model. Args: model_name: Optional model name to use instead of the default Returns: ProgressiveReportSynthesizer instance """ global progressive_report_synthesizer if model_name and model_name != progressive_report_synthesizer.model_name: progressive_report_synthesizer = ProgressiveReportSynthesizer(model_name) return progressive_report_synthesizer async def test_progressive_report_synthesizer(): """Test the progressive report synthesizer with sample document chunks.""" # Sample document chunks chunks = [ { "document_id": "1", "title": "Introduction to Python", "url": "https://docs.python.org/3/tutorial/index.html", "content": "Python is an easy to learn, powerful programming language. It has efficient high-level data structures and a simple but effective approach to object-oriented programming. Python's elegant syntax and dynamic typing, together with its interpreted nature, make it an ideal language for scripting and rapid application development in many areas on most platforms.", "priority_score": 0.9 }, { "document_id": "2", "title": "Python Features", "url": "https://www.python.org/about/", "content": "Python is a programming language that lets you work quickly and integrate systems more effectively. Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together.", "priority_score": 0.8 }, { "document_id": "3", "title": "Python Applications", "url": "https://www.python.org/about/apps/", "content": "Python is used in many application domains. Here's a sampling: Web and Internet Development, Scientific and Numeric Computing, Education, Desktop GUIs, Software Development, and Business Applications. Python is also used in Data Science, Machine Learning, and Artificial Intelligence applications.", "priority_score": 0.7 }, { "document_id": "4", "title": "Python History", "url": "https://en.wikipedia.org/wiki/Python_(programming_language)", "content": "Python was conceived in the late 1980s by Guido van Rossum at Centrum Wiskunde & Informatica (CWI) in the Netherlands as a successor to the ABC language, capable of exception handling and interfacing with the Amoeba operating system. Its implementation began in December 1989.", "priority_score": 0.6 } ] # Initialize the progressive report synthesizer synthesizer = get_progressive_report_synthesizer() # Test query query = "What are the key features and applications of Python programming language?" # Define a progress callback def progress_callback(progress, total, current_report): print(f"Progress: {progress:.2%} ({total} chunks)") # Set progress callback synthesizer.set_progress_callback(progress_callback) # Generate report progressively report = await synthesizer.synthesize_report_progressively(chunks, query, query_type="exploratory", detail_level="comprehensive") # Print report print("\nFinal Generated Report:") print(report) # Print report state print("\nReport State:") print(f"Versions: {synthesizer.report_state.version}") print(f"Processed Chunks: {len(synthesizer.report_state.processed_chunks)}") print(f"Improvement Scores: {synthesizer.report_state.improvement_scores}") print(f"Termination Reason: {synthesizer.report_state.termination_reason}") if __name__ == "__main__": asyncio.run(test_progressive_report_synthesizer())