ira/report/progressive_report_synthesi...

532 lines
23 KiB
Python

"""
Progressive report synthesis module for the intelligent research system.
This module provides functionality to synthesize reports from document chunks
using LLMs with a progressive approach, where chunks are processed iteratively
and the report is refined over time.
"""
import os
import json
import asyncio
import logging
import time
from typing import Dict, List, Any, Optional, Tuple, Union, Set
from dataclasses import dataclass, field
import litellm
from litellm import completion
from config.config import get_config
from report.report_detail_levels import get_report_detail_level_manager, DetailLevel
from report.report_templates import QueryType, DetailLevel as TemplateDetailLevel, ReportTemplateManager, ReportTemplate
from report.report_synthesis import ReportSynthesizer
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class ReportState:
"""Class to track the state of a progressive report."""
current_report: str = ""
processed_chunks: Set[str] = field(default_factory=set)
version: int = 0
last_update_time: float = field(default_factory=time.time)
improvement_scores: List[float] = field(default_factory=list)
is_complete: bool = False
termination_reason: Optional[str] = None
class ProgressiveReportSynthesizer(ReportSynthesizer):
"""
Progressive report synthesizer for the intelligent research system.
This class extends the ReportSynthesizer to implement a progressive approach
to report generation, where chunks are processed iteratively and the report
is refined over time.
"""
def __init__(self, model_name: Optional[str] = None):
"""
Initialize the progressive report synthesizer.
Args:
model_name: Name of the LLM model to use. If None, uses the default model
from configuration.
"""
super().__init__(model_name)
# Initialize report state
self.report_state = ReportState()
# Configuration for progressive generation
self.min_improvement_threshold = 0.2 # Minimum improvement score to continue
self.max_consecutive_low_improvements = 3 # Max number of consecutive low improvements before stopping
self.batch_size = 3 # Number of chunks to process in each iteration
self.max_iterations = 20 # Maximum number of iterations
self.consecutive_low_improvements = 0 # Counter for consecutive low improvements
# Progress tracking
self.total_chunks = 0
self.processed_chunk_count = 0
self.progress_callback = None
def set_progress_callback(self, callback):
"""
Set a callback function to report progress.
Args:
callback: Function that takes (current_progress, total, current_report) as arguments
"""
self.progress_callback = callback
def _report_progress(self):
"""Report progress through the callback if set."""
if self.progress_callback and self.total_chunks > 0:
progress = min(self.processed_chunk_count / self.total_chunks, 1.0)
self.progress_callback(progress, self.total_chunks, self.report_state.current_report)
def prioritize_chunks(self, chunks: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]:
"""
Prioritize chunks based on relevance to the query and other factors.
Args:
chunks: List of document chunks
query: Original search query
Returns:
List of chunks sorted by priority
"""
# Start with chunks already prioritized by the document processor
# Further refine based on additional criteria if needed
# Filter out chunks that have already been processed
unprocessed_chunks = [
chunk for chunk in chunks
if chunk.get('document_id') and str(chunk.get('document_id')) not in self.report_state.processed_chunks
]
# If all chunks have been processed, return an empty list
if not unprocessed_chunks:
return []
# Sort by priority score (already set by document processor)
prioritized_chunks = sorted(
unprocessed_chunks,
key=lambda x: x.get('priority_score', 0.0),
reverse=True
)
return prioritized_chunks
async def extract_information_from_chunk(self, chunk: Dict[str, Any], query: str, detail_level: str = "comprehensive") -> str:
"""
Extract key information from a document chunk.
Args:
chunk: Document chunk
query: Original search query
detail_level: Level of detail for extraction
Returns:
Extracted information as a string
"""
# Get the appropriate extraction prompt based on detail level
extraction_prompt = self._get_extraction_prompt(detail_level)
# Create a prompt for extracting key information from the chunk
messages = [
{"role": "system", "content": extraction_prompt},
{"role": "user", "content": f"""Query: {query}
Document title: {chunk.get('title', 'Untitled')}
Document URL: {chunk.get('url', 'Unknown')}
Document chunk content:
{chunk.get('content', '')}
Extract the most relevant information from this document chunk that addresses the query."""}
]
# Process the chunk with the LLM
extracted_info = await self.generate_completion(messages)
return extracted_info
async def refine_report(self, current_report: str, new_information: List[Tuple[Dict[str, Any], str]], query: str, query_type: str, detail_level: str) -> Tuple[str, float]:
"""
Refine the current report with new information.
Args:
current_report: Current version of the report
new_information: List of tuples containing (chunk, extracted_information)
query: Original search query
query_type: Type of query (factual, exploratory, comparative)
detail_level: Level of detail for the report
Returns:
Tuple of (refined_report, improvement_score)
"""
# Prepare context with new information
context = ""
for chunk, extracted_info in new_information:
title = chunk.get('title', 'Untitled')
url = chunk.get('url', 'Unknown')
context += f"Document: {title}\n"
context += f"URL: {url}\n"
context += f"Source URL: {url}\n" # Duplicate for emphasis
context += f"Extracted information:\n{extracted_info}\n\n"
# Get template for the report
template = self._get_template_from_strings(query_type, detail_level)
if not template:
raise ValueError(f"No template found for {query_type} {detail_level}")
# Create the prompt for refining the report
messages = [
{"role": "system", "content": f"""You are an expert research assistant tasked with progressively refining a research report.
You will be given:
1. The current version of the report
2. New information extracted from additional documents
Your task is to refine and improve the report by incorporating the new information. Follow these guidelines:
1. Maintain the overall structure and format of the report
2. Add new relevant information where appropriate
3. Expand sections with new details, examples, or evidence
4. Improve analysis based on the new information
5. Add or update citations for new information
6. Ensure the report follows this template structure:
{template.template}
Format the report in Markdown with clear headings, subheadings, and bullet points where appropriate.
Make the report readable, engaging, and informative while maintaining academic rigor.
IMPORTANT FOR REFERENCES:
- Use a consistent format: [1] Title of the Article/Page. URL
- DO NOT use generic placeholders like "Document 1" for references
- ALWAYS include the actual URL from the source documents
- Each reference MUST include both the title and the URL
- Make sure all references are complete and properly formatted
- Number the references sequentially
After refining the report, rate how much the new information improved the report on a scale of 0.0 to 1.0:
- 0.0: No improvement (new information was redundant or irrelevant)
- 0.5: Moderate improvement (new information added some value)
- 1.0: Significant improvement (new information substantially enhanced the report)
End your response with a single line containing only the improvement score in this format:
IMPROVEMENT_SCORE: [score]
"""},
{"role": "user", "content": f"""Query: {query}
Current report:
{current_report}
New information from additional sources:
{context}
Please refine the report by incorporating this new information while maintaining the overall structure and format."""}
]
# Generate the refined report
response = await self.generate_completion(messages)
# Extract the improvement score
improvement_score = 0.5 # Default moderate improvement
score_line = response.strip().split('\n')[-1]
if score_line.startswith('IMPROVEMENT_SCORE:'):
try:
improvement_score = float(score_line.split(':')[1].strip())
# Remove the score line from the report
response = '\n'.join(response.strip().split('\n')[:-1])
except (ValueError, IndexError):
logger.warning("Could not parse improvement score, using default value of 0.5")
return response, improvement_score
async def initialize_report(self, initial_chunks: List[Dict[str, Any]], query: str, query_type: str, detail_level: str) -> str:
"""
Initialize the report with the first batch of chunks.
Args:
initial_chunks: Initial batch of document chunks
query: Original search query
query_type: Type of query (factual, exploratory, comparative)
detail_level: Level of detail for the report
Returns:
Initial report as a string
"""
logger.info(f"Initializing report with {len(initial_chunks)} chunks")
# Process initial chunks using the standard map-reduce approach
processed_chunks = await self.map_document_chunks(initial_chunks, query, detail_level)
# Generate initial report
initial_report = await self.reduce_processed_chunks(processed_chunks, query, query_type, detail_level)
# Update report state
self.report_state.current_report = initial_report
self.report_state.version = 1
self.report_state.last_update_time = time.time()
# Mark chunks as processed
for chunk in initial_chunks:
if chunk.get('document_id'):
self.report_state.processed_chunks.add(str(chunk.get('document_id')))
self.processed_chunk_count += len(initial_chunks)
self._report_progress()
return initial_report
def should_terminate(self, improvement_score: float) -> Tuple[bool, Optional[str]]:
"""
Determine if the progressive report generation should terminate.
Args:
improvement_score: Score indicating how much the report improved
Returns:
Tuple of (should_terminate, reason)
"""
# Check if all chunks have been processed
if self.processed_chunk_count >= self.total_chunks:
return True, "All chunks processed"
# Check if maximum iterations reached
if self.report_state.version >= self.max_iterations:
return True, "Maximum iterations reached"
# Check for diminishing returns
if improvement_score < self.min_improvement_threshold:
self.consecutive_low_improvements += 1
if self.consecutive_low_improvements >= self.max_consecutive_low_improvements:
return True, "Diminishing returns (consecutive low improvements)"
else:
self.consecutive_low_improvements = 0
return False, None
async def synthesize_report_progressively(self, chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory", detail_level: str = "comprehensive") -> str:
"""
Synthesize a report from document chunks using a progressive approach.
Args:
chunks: List of document chunks
query: Original search query
query_type: Type of query (factual, exploratory, comparative)
detail_level: Level of detail for the report
Returns:
Synthesized report as a string
"""
if not chunks:
logger.warning("No document chunks provided for report synthesis.")
return "No information found for the given query."
# Reset report state
self.report_state = ReportState()
self.consecutive_low_improvements = 0
self.total_chunks = len(chunks)
self.processed_chunk_count = 0
# Verify that a template exists for the given query type and detail level
template = self._get_template_from_strings(query_type, detail_level)
if not template:
logger.warning(f"No template found for {query_type} {detail_level}, falling back to standard template")
# Fall back to standard detail level if the requested one doesn't exist
detail_level = "standard"
# Determine batch size based on the model
if "gemini" in self.model_name.lower():
self.batch_size = 5 # Larger batch size for Gemini models with 1M token windows
else:
self.batch_size = 3 # Smaller batch size for other models
logger.info(f"Using batch size of {self.batch_size} for model {self.model_name}")
# Prioritize chunks
prioritized_chunks = self.prioritize_chunks(chunks, query)
# Initialize report with first batch of chunks
initial_batch = prioritized_chunks[:self.batch_size]
await self.initialize_report(initial_batch, query, query_type, detail_level)
# Progressive refinement loop
while True:
# Check if we should terminate
should_terminate, reason = self.should_terminate(
self.report_state.improvement_scores[-1] if self.report_state.improvement_scores else 1.0
)
if should_terminate:
logger.info(f"Terminating progressive report generation: {reason}")
self.report_state.is_complete = True
self.report_state.termination_reason = reason
break
# Get next batch of chunks
prioritized_chunks = self.prioritize_chunks(chunks, query)
next_batch = prioritized_chunks[:self.batch_size]
if not next_batch:
logger.info("No more chunks to process")
self.report_state.is_complete = True
self.report_state.termination_reason = "All chunks processed"
break
logger.info(f"Processing batch {self.report_state.version + 1} with {len(next_batch)} chunks")
# Extract information from chunks
new_information = []
for chunk in next_batch:
extracted_info = await self.extract_information_from_chunk(chunk, query, detail_level)
new_information.append((chunk, extracted_info))
# Mark chunk as processed
if chunk.get('document_id'):
self.report_state.processed_chunks.add(str(chunk.get('document_id')))
# Refine report with new information
refined_report, improvement_score = await self.refine_report(
self.report_state.current_report,
new_information,
query,
query_type,
detail_level
)
# Update report state
self.report_state.current_report = refined_report
self.report_state.version += 1
self.report_state.last_update_time = time.time()
self.report_state.improvement_scores.append(improvement_score)
self.processed_chunk_count += len(next_batch)
self._report_progress()
logger.info(f"Completed iteration {self.report_state.version} with improvement score {improvement_score:.2f}")
# Add a small delay between iterations to avoid rate limiting
await asyncio.sleep(2)
# Final report
return self.report_state.current_report
async def synthesize_report(self, chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory", detail_level: str = "standard") -> str:
"""
Synthesize a report from document chunks.
This method overrides the parent method to use progressive synthesis for comprehensive
detail level and standard map-reduce for other detail levels.
Args:
chunks: List of document chunks
query: Original search query
query_type: Type of query (factual, exploratory, comparative)
detail_level: Level of detail for the report
Returns:
Synthesized report as a string
"""
# Use progressive synthesis for comprehensive detail level
if detail_level.lower() == "comprehensive":
logger.info(f"Using progressive synthesis for {detail_level} detail level")
return await self.synthesize_report_progressively(chunks, query, query_type, detail_level)
else:
# Use standard map-reduce for other detail levels
logger.info(f"Using standard map-reduce for {detail_level} detail level")
return await super().synthesize_report(chunks, query, query_type, detail_level)
# Create a singleton instance for global use
progressive_report_synthesizer = ProgressiveReportSynthesizer()
def get_progressive_report_synthesizer(model_name: Optional[str] = None) -> ProgressiveReportSynthesizer:
"""
Get the global progressive report synthesizer instance or create a new one with a specific model.
Args:
model_name: Optional model name to use instead of the default
Returns:
ProgressiveReportSynthesizer instance
"""
global progressive_report_synthesizer
if model_name and model_name != progressive_report_synthesizer.model_name:
progressive_report_synthesizer = ProgressiveReportSynthesizer(model_name)
return progressive_report_synthesizer
async def test_progressive_report_synthesizer():
"""Test the progressive report synthesizer with sample document chunks."""
# Sample document chunks
chunks = [
{
"document_id": "1",
"title": "Introduction to Python",
"url": "https://docs.python.org/3/tutorial/index.html",
"content": "Python is an easy to learn, powerful programming language. It has efficient high-level data structures and a simple but effective approach to object-oriented programming. Python's elegant syntax and dynamic typing, together with its interpreted nature, make it an ideal language for scripting and rapid application development in many areas on most platforms.",
"priority_score": 0.9
},
{
"document_id": "2",
"title": "Python Features",
"url": "https://www.python.org/about/",
"content": "Python is a programming language that lets you work quickly and integrate systems more effectively. Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together.",
"priority_score": 0.8
},
{
"document_id": "3",
"title": "Python Applications",
"url": "https://www.python.org/about/apps/",
"content": "Python is used in many application domains. Here's a sampling: Web and Internet Development, Scientific and Numeric Computing, Education, Desktop GUIs, Software Development, and Business Applications. Python is also used in Data Science, Machine Learning, and Artificial Intelligence applications.",
"priority_score": 0.7
},
{
"document_id": "4",
"title": "Python History",
"url": "https://en.wikipedia.org/wiki/Python_(programming_language)",
"content": "Python was conceived in the late 1980s by Guido van Rossum at Centrum Wiskunde & Informatica (CWI) in the Netherlands as a successor to the ABC language, capable of exception handling and interfacing with the Amoeba operating system. Its implementation began in December 1989.",
"priority_score": 0.6
}
]
# Initialize the progressive report synthesizer
synthesizer = get_progressive_report_synthesizer()
# Test query
query = "What are the key features and applications of Python programming language?"
# Define a progress callback
def progress_callback(progress, total, current_report):
print(f"Progress: {progress:.2%} ({total} chunks)")
# Set progress callback
synthesizer.set_progress_callback(progress_callback)
# Generate report progressively
report = await synthesizer.synthesize_report_progressively(chunks, query, query_type="exploratory", detail_level="comprehensive")
# Print report
print("\nFinal Generated Report:")
print(report)
# Print report state
print("\nReport State:")
print(f"Versions: {synthesizer.report_state.version}")
print(f"Processed Chunks: {len(synthesizer.report_state.processed_chunks)}")
print(f"Improvement Scores: {synthesizer.report_state.improvement_scores}")
print(f"Termination Reason: {synthesizer.report_state.termination_reason}")
if __name__ == "__main__":
asyncio.run(test_progressive_report_synthesizer())