533 lines
23 KiB
Python
533 lines
23 KiB
Python
"""
|
|
Progressive report synthesis module for the intelligent research system.
|
|
|
|
This module provides functionality to synthesize reports from document chunks
|
|
using LLMs with a progressive approach, where chunks are processed iteratively
|
|
and the report is refined over time.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
from typing import Dict, List, Any, Optional, Tuple, Union, Set
|
|
from dataclasses import dataclass, field
|
|
|
|
import litellm
|
|
from litellm import completion
|
|
|
|
from config.config import get_config
|
|
from report.report_detail_levels import get_report_detail_level_manager, DetailLevel
|
|
from report.report_templates import QueryType, DetailLevel as TemplateDetailLevel, ReportTemplateManager, ReportTemplate
|
|
from report.report_synthesis import ReportSynthesizer
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class ReportState:
|
|
"""Class to track the state of a progressive report."""
|
|
current_report: str = ""
|
|
processed_chunks: Set[str] = field(default_factory=set)
|
|
version: int = 0
|
|
last_update_time: float = field(default_factory=time.time)
|
|
improvement_scores: List[float] = field(default_factory=list)
|
|
is_complete: bool = False
|
|
termination_reason: Optional[str] = None
|
|
|
|
|
|
class ProgressiveReportSynthesizer(ReportSynthesizer):
|
|
"""
|
|
Progressive report synthesizer for the intelligent research system.
|
|
|
|
This class extends the ReportSynthesizer to implement a progressive approach
|
|
to report generation, where chunks are processed iteratively and the report
|
|
is refined over time.
|
|
"""
|
|
|
|
def __init__(self, model_name: Optional[str] = None):
|
|
"""
|
|
Initialize the progressive report synthesizer.
|
|
|
|
Args:
|
|
model_name: Name of the LLM model to use. If None, uses the default model
|
|
from configuration.
|
|
"""
|
|
super().__init__(model_name)
|
|
|
|
# Initialize report state
|
|
self.report_state = ReportState()
|
|
|
|
# Configuration for progressive generation
|
|
self.min_improvement_threshold = 0.2 # Minimum improvement score to continue
|
|
self.max_consecutive_low_improvements = 3 # Max number of consecutive low improvements before stopping
|
|
self.batch_size = 3 # Number of chunks to process in each iteration
|
|
self.max_iterations = 20 # Maximum number of iterations
|
|
self.consecutive_low_improvements = 0 # Counter for consecutive low improvements
|
|
|
|
# Progress tracking
|
|
self.total_chunks = 0
|
|
self.processed_chunk_count = 0
|
|
self.progress_callback = None
|
|
|
|
def set_progress_callback(self, callback):
|
|
"""
|
|
Set a callback function to report progress.
|
|
|
|
Args:
|
|
callback: Function that takes (current_progress, total, current_report) as arguments
|
|
"""
|
|
self.progress_callback = callback
|
|
|
|
def _report_progress(self):
|
|
"""Report progress through the callback if set."""
|
|
if self.progress_callback and self.total_chunks > 0:
|
|
progress = min(self.processed_chunk_count / self.total_chunks, 1.0)
|
|
self.progress_callback(progress, self.total_chunks, self.report_state.current_report)
|
|
|
|
def prioritize_chunks(self, chunks: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Prioritize chunks based on relevance to the query and other factors.
|
|
|
|
Args:
|
|
chunks: List of document chunks
|
|
query: Original search query
|
|
|
|
Returns:
|
|
List of chunks sorted by priority
|
|
"""
|
|
# Start with chunks already prioritized by the document processor
|
|
# Further refine based on additional criteria if needed
|
|
|
|
# Filter out chunks that have already been processed
|
|
unprocessed_chunks = [
|
|
chunk for chunk in chunks
|
|
if chunk.get('document_id') and str(chunk.get('document_id')) not in self.report_state.processed_chunks
|
|
]
|
|
|
|
# If all chunks have been processed, return an empty list
|
|
if not unprocessed_chunks:
|
|
return []
|
|
|
|
# Sort by priority score (already set by document processor)
|
|
prioritized_chunks = sorted(
|
|
unprocessed_chunks,
|
|
key=lambda x: x.get('priority_score', 0.0),
|
|
reverse=True
|
|
)
|
|
|
|
return prioritized_chunks
|
|
|
|
async def extract_information_from_chunk(self, chunk: Dict[str, Any], query: str, detail_level: str = "comprehensive", query_type: str = "exploratory") -> str:
|
|
"""
|
|
Extract key information from a document chunk.
|
|
|
|
Args:
|
|
chunk: Document chunk
|
|
query: Original search query
|
|
detail_level: Level of detail for extraction
|
|
query_type: Type of query (factual, exploratory, comparative)
|
|
|
|
Returns:
|
|
Extracted information as a string
|
|
"""
|
|
# Get the appropriate extraction prompt based on detail level and query type
|
|
extraction_prompt = self._get_extraction_prompt(detail_level, query_type)
|
|
|
|
# Create a prompt for extracting key information from the chunk
|
|
messages = [
|
|
{"role": "system", "content": extraction_prompt},
|
|
{"role": "user", "content": f"""Query: {query}
|
|
|
|
Document title: {chunk.get('title', 'Untitled')}
|
|
Document URL: {chunk.get('url', 'Unknown')}
|
|
|
|
Document chunk content:
|
|
{chunk.get('content', '')}
|
|
|
|
Extract the most relevant information from this document chunk that addresses the query."""}
|
|
]
|
|
|
|
# Process the chunk with the LLM
|
|
extracted_info = await self.generate_completion(messages)
|
|
|
|
return extracted_info
|
|
|
|
async def refine_report(self, current_report: str, new_information: List[Tuple[Dict[str, Any], str]], query: str, query_type: str, detail_level: str) -> Tuple[str, float]:
|
|
"""
|
|
Refine the current report with new information.
|
|
|
|
Args:
|
|
current_report: Current version of the report
|
|
new_information: List of tuples containing (chunk, extracted_information)
|
|
query: Original search query
|
|
query_type: Type of query (factual, exploratory, comparative)
|
|
detail_level: Level of detail for the report
|
|
|
|
Returns:
|
|
Tuple of (refined_report, improvement_score)
|
|
"""
|
|
# Prepare context with new information
|
|
context = ""
|
|
for chunk, extracted_info in new_information:
|
|
title = chunk.get('title', 'Untitled')
|
|
url = chunk.get('url', 'Unknown')
|
|
|
|
context += f"Document: {title}\n"
|
|
context += f"URL: {url}\n"
|
|
context += f"Source URL: {url}\n" # Duplicate for emphasis
|
|
context += f"Extracted information:\n{extracted_info}\n\n"
|
|
|
|
# Get template for the report
|
|
template = self._get_template_from_strings(query_type, detail_level)
|
|
|
|
if not template:
|
|
raise ValueError(f"No template found for {query_type} {detail_level}")
|
|
|
|
# Create the prompt for refining the report
|
|
messages = [
|
|
{"role": "system", "content": f"""You are an expert research assistant tasked with progressively refining a research report.
|
|
|
|
You will be given:
|
|
1. The current version of the report
|
|
2. New information extracted from additional documents
|
|
|
|
Your task is to refine and improve the report by incorporating the new information. Follow these guidelines:
|
|
|
|
1. Maintain the overall structure and format of the report
|
|
2. Add new relevant information where appropriate
|
|
3. Expand sections with new details, examples, or evidence
|
|
4. Improve analysis based on the new information
|
|
5. Add or update citations for new information
|
|
6. Ensure the report follows this template structure:
|
|
{template.template}
|
|
|
|
Format the report in Markdown with clear headings, subheadings, and bullet points where appropriate.
|
|
Make the report readable, engaging, and informative while maintaining academic rigor.
|
|
|
|
IMPORTANT FOR REFERENCES:
|
|
- Use a consistent format: [1] Title of the Article/Page. URL
|
|
- DO NOT use generic placeholders like "Document 1" for references
|
|
- ALWAYS include the actual URL from the source documents
|
|
- Each reference MUST include both the title and the URL
|
|
- Make sure all references are complete and properly formatted
|
|
- Number the references sequentially
|
|
|
|
After refining the report, rate how much the new information improved the report on a scale of 0.0 to 1.0:
|
|
- 0.0: No improvement (new information was redundant or irrelevant)
|
|
- 0.5: Moderate improvement (new information added some value)
|
|
- 1.0: Significant improvement (new information substantially enhanced the report)
|
|
|
|
End your response with a single line containing only the improvement score in this format:
|
|
IMPROVEMENT_SCORE: [score]
|
|
"""},
|
|
{"role": "user", "content": f"""Query: {query}
|
|
|
|
Current report:
|
|
{current_report}
|
|
|
|
New information from additional sources:
|
|
{context}
|
|
|
|
Please refine the report by incorporating this new information while maintaining the overall structure and format."""}
|
|
]
|
|
|
|
# Generate the refined report
|
|
response = await self.generate_completion(messages)
|
|
|
|
# Extract the improvement score
|
|
improvement_score = 0.5 # Default moderate improvement
|
|
score_line = response.strip().split('\n')[-1]
|
|
if score_line.startswith('IMPROVEMENT_SCORE:'):
|
|
try:
|
|
improvement_score = float(score_line.split(':')[1].strip())
|
|
# Remove the score line from the report
|
|
response = '\n'.join(response.strip().split('\n')[:-1])
|
|
except (ValueError, IndexError):
|
|
logger.warning("Could not parse improvement score, using default value of 0.5")
|
|
|
|
return response, improvement_score
|
|
|
|
async def initialize_report(self, initial_chunks: List[Dict[str, Any]], query: str, query_type: str, detail_level: str) -> str:
|
|
"""
|
|
Initialize the report with the first batch of chunks.
|
|
|
|
Args:
|
|
initial_chunks: Initial batch of document chunks
|
|
query: Original search query
|
|
query_type: Type of query (factual, exploratory, comparative)
|
|
detail_level: Level of detail for the report
|
|
|
|
Returns:
|
|
Initial report as a string
|
|
"""
|
|
logger.info(f"Initializing report with {len(initial_chunks)} chunks")
|
|
|
|
# Process initial chunks using the standard map-reduce approach
|
|
processed_chunks = await self.map_document_chunks(initial_chunks, query, detail_level, query_type)
|
|
|
|
# Generate initial report
|
|
initial_report = await self.reduce_processed_chunks(processed_chunks, query, query_type, detail_level)
|
|
|
|
# Update report state
|
|
self.report_state.current_report = initial_report
|
|
self.report_state.version = 1
|
|
self.report_state.last_update_time = time.time()
|
|
|
|
# Mark chunks as processed
|
|
for chunk in initial_chunks:
|
|
if chunk.get('document_id'):
|
|
self.report_state.processed_chunks.add(str(chunk.get('document_id')))
|
|
|
|
self.processed_chunk_count += len(initial_chunks)
|
|
self._report_progress()
|
|
|
|
return initial_report
|
|
|
|
def should_terminate(self, improvement_score: float) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Determine if the progressive report generation should terminate.
|
|
|
|
Args:
|
|
improvement_score: Score indicating how much the report improved
|
|
|
|
Returns:
|
|
Tuple of (should_terminate, reason)
|
|
"""
|
|
# Check if all chunks have been processed
|
|
if self.processed_chunk_count >= self.total_chunks:
|
|
return True, "All chunks processed"
|
|
|
|
# Check if maximum iterations reached
|
|
if self.report_state.version >= self.max_iterations:
|
|
return True, "Maximum iterations reached"
|
|
|
|
# Check for diminishing returns
|
|
if improvement_score < self.min_improvement_threshold:
|
|
self.consecutive_low_improvements += 1
|
|
if self.consecutive_low_improvements >= self.max_consecutive_low_improvements:
|
|
return True, "Diminishing returns (consecutive low improvements)"
|
|
else:
|
|
self.consecutive_low_improvements = 0
|
|
|
|
return False, None
|
|
|
|
async def synthesize_report_progressively(self, chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory", detail_level: str = "comprehensive") -> str:
|
|
"""
|
|
Synthesize a report from document chunks using a progressive approach.
|
|
|
|
Args:
|
|
chunks: List of document chunks
|
|
query: Original search query
|
|
query_type: Type of query (factual, exploratory, comparative)
|
|
detail_level: Level of detail for the report
|
|
|
|
Returns:
|
|
Synthesized report as a string
|
|
"""
|
|
if not chunks:
|
|
logger.warning("No document chunks provided for report synthesis.")
|
|
return "No information found for the given query."
|
|
|
|
# Reset report state
|
|
self.report_state = ReportState()
|
|
self.consecutive_low_improvements = 0
|
|
self.total_chunks = len(chunks)
|
|
self.processed_chunk_count = 0
|
|
|
|
# Verify that a template exists for the given query type and detail level
|
|
template = self._get_template_from_strings(query_type, detail_level)
|
|
if not template:
|
|
logger.warning(f"No template found for {query_type} {detail_level}, falling back to standard template")
|
|
# Fall back to standard detail level if the requested one doesn't exist
|
|
detail_level = "standard"
|
|
|
|
# Determine batch size based on the model
|
|
if "gemini" in self.model_name.lower():
|
|
self.batch_size = 5 # Larger batch size for Gemini models with 1M token windows
|
|
else:
|
|
self.batch_size = 3 # Smaller batch size for other models
|
|
|
|
logger.info(f"Using batch size of {self.batch_size} for model {self.model_name}")
|
|
|
|
# Prioritize chunks
|
|
prioritized_chunks = self.prioritize_chunks(chunks, query)
|
|
|
|
# Initialize report with first batch of chunks
|
|
initial_batch = prioritized_chunks[:self.batch_size]
|
|
await self.initialize_report(initial_batch, query, query_type, detail_level)
|
|
|
|
# Progressive refinement loop
|
|
while True:
|
|
# Check if we should terminate
|
|
should_terminate, reason = self.should_terminate(
|
|
self.report_state.improvement_scores[-1] if self.report_state.improvement_scores else 1.0
|
|
)
|
|
|
|
if should_terminate:
|
|
logger.info(f"Terminating progressive report generation: {reason}")
|
|
self.report_state.is_complete = True
|
|
self.report_state.termination_reason = reason
|
|
break
|
|
|
|
# Get next batch of chunks
|
|
prioritized_chunks = self.prioritize_chunks(chunks, query)
|
|
next_batch = prioritized_chunks[:self.batch_size]
|
|
|
|
if not next_batch:
|
|
logger.info("No more chunks to process")
|
|
self.report_state.is_complete = True
|
|
self.report_state.termination_reason = "All chunks processed"
|
|
break
|
|
|
|
logger.info(f"Processing batch {self.report_state.version + 1} with {len(next_batch)} chunks")
|
|
|
|
# Extract information from chunks
|
|
new_information = []
|
|
for chunk in next_batch:
|
|
extracted_info = await self.extract_information_from_chunk(chunk, query, detail_level, query_type)
|
|
new_information.append((chunk, extracted_info))
|
|
|
|
# Mark chunk as processed
|
|
if chunk.get('document_id'):
|
|
self.report_state.processed_chunks.add(str(chunk.get('document_id')))
|
|
|
|
# Refine report with new information
|
|
refined_report, improvement_score = await self.refine_report(
|
|
self.report_state.current_report,
|
|
new_information,
|
|
query,
|
|
query_type,
|
|
detail_level
|
|
)
|
|
|
|
# Update report state
|
|
self.report_state.current_report = refined_report
|
|
self.report_state.version += 1
|
|
self.report_state.last_update_time = time.time()
|
|
self.report_state.improvement_scores.append(improvement_score)
|
|
|
|
self.processed_chunk_count += len(next_batch)
|
|
self._report_progress()
|
|
|
|
logger.info(f"Completed iteration {self.report_state.version} with improvement score {improvement_score:.2f}")
|
|
|
|
# Add a small delay between iterations to avoid rate limiting
|
|
await asyncio.sleep(2)
|
|
|
|
# Final report
|
|
return self.report_state.current_report
|
|
|
|
async def synthesize_report(self, chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory", detail_level: str = "standard") -> str:
|
|
"""
|
|
Synthesize a report from document chunks.
|
|
|
|
This method overrides the parent method to use progressive synthesis for comprehensive
|
|
detail level and standard map-reduce for other detail levels.
|
|
|
|
Args:
|
|
chunks: List of document chunks
|
|
query: Original search query
|
|
query_type: Type of query (factual, exploratory, comparative)
|
|
detail_level: Level of detail for the report
|
|
|
|
Returns:
|
|
Synthesized report as a string
|
|
"""
|
|
# Use progressive synthesis for comprehensive detail level
|
|
if detail_level.lower() == "comprehensive":
|
|
logger.info(f"Using progressive synthesis for {detail_level} detail level")
|
|
return await self.synthesize_report_progressively(chunks, query, query_type, detail_level)
|
|
else:
|
|
# Use standard map-reduce for other detail levels
|
|
logger.info(f"Using standard map-reduce for {detail_level} detail level")
|
|
return await super().synthesize_report(chunks, query, query_type, detail_level)
|
|
|
|
|
|
# Create a singleton instance for global use
|
|
progressive_report_synthesizer = ProgressiveReportSynthesizer()
|
|
|
|
def get_progressive_report_synthesizer(model_name: Optional[str] = None) -> ProgressiveReportSynthesizer:
|
|
"""
|
|
Get the global progressive report synthesizer instance or create a new one with a specific model.
|
|
|
|
Args:
|
|
model_name: Optional model name to use instead of the default
|
|
|
|
Returns:
|
|
ProgressiveReportSynthesizer instance
|
|
"""
|
|
global progressive_report_synthesizer
|
|
|
|
if model_name and model_name != progressive_report_synthesizer.model_name:
|
|
progressive_report_synthesizer = ProgressiveReportSynthesizer(model_name)
|
|
|
|
return progressive_report_synthesizer
|
|
|
|
async def test_progressive_report_synthesizer():
|
|
"""Test the progressive report synthesizer with sample document chunks."""
|
|
# Sample document chunks
|
|
chunks = [
|
|
{
|
|
"document_id": "1",
|
|
"title": "Introduction to Python",
|
|
"url": "https://docs.python.org/3/tutorial/index.html",
|
|
"content": "Python is an easy to learn, powerful programming language. It has efficient high-level data structures and a simple but effective approach to object-oriented programming. Python's elegant syntax and dynamic typing, together with its interpreted nature, make it an ideal language for scripting and rapid application development in many areas on most platforms.",
|
|
"priority_score": 0.9
|
|
},
|
|
{
|
|
"document_id": "2",
|
|
"title": "Python Features",
|
|
"url": "https://www.python.org/about/",
|
|
"content": "Python is a programming language that lets you work quickly and integrate systems more effectively. Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together.",
|
|
"priority_score": 0.8
|
|
},
|
|
{
|
|
"document_id": "3",
|
|
"title": "Python Applications",
|
|
"url": "https://www.python.org/about/apps/",
|
|
"content": "Python is used in many application domains. Here's a sampling: Web and Internet Development, Scientific and Numeric Computing, Education, Desktop GUIs, Software Development, and Business Applications. Python is also used in Data Science, Machine Learning, and Artificial Intelligence applications.",
|
|
"priority_score": 0.7
|
|
},
|
|
{
|
|
"document_id": "4",
|
|
"title": "Python History",
|
|
"url": "https://en.wikipedia.org/wiki/Python_(programming_language)",
|
|
"content": "Python was conceived in the late 1980s by Guido van Rossum at Centrum Wiskunde & Informatica (CWI) in the Netherlands as a successor to the ABC language, capable of exception handling and interfacing with the Amoeba operating system. Its implementation began in December 1989.",
|
|
"priority_score": 0.6
|
|
}
|
|
]
|
|
|
|
# Initialize the progressive report synthesizer
|
|
synthesizer = get_progressive_report_synthesizer()
|
|
|
|
# Test query
|
|
query = "What are the key features and applications of Python programming language?"
|
|
|
|
# Define a progress callback
|
|
def progress_callback(progress, total, current_report):
|
|
print(f"Progress: {progress:.2%} ({total} chunks)")
|
|
|
|
# Set progress callback
|
|
synthesizer.set_progress_callback(progress_callback)
|
|
|
|
# Generate report progressively
|
|
report = await synthesizer.synthesize_report_progressively(chunks, query, query_type="exploratory", detail_level="comprehensive")
|
|
|
|
# Print report
|
|
print("\nFinal Generated Report:")
|
|
print(report)
|
|
|
|
# Print report state
|
|
print("\nReport State:")
|
|
print(f"Versions: {synthesizer.report_state.version}")
|
|
print(f"Processed Chunks: {len(synthesizer.report_state.processed_chunks)}")
|
|
print(f"Improvement Scores: {synthesizer.report_state.improvement_scores}")
|
|
print(f"Termination Reason: {synthesizer.report_state.termination_reason}")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(test_progressive_report_synthesizer())
|