Implement Phase 3: Report Synthesis using Map-Reduce approach with Groq LLM

This commit is contained in:
Steve White 2025-02-27 17:59:18 -06:00
parent 34be5ce36f
commit 8ee4605522
3 changed files with 498 additions and 24 deletions

View File

@ -14,6 +14,7 @@ from typing import Dict, List, Any, Optional, Tuple, Union
from report.database.db_manager import get_db_manager, initialize_database
from report.document_scraper import get_document_scraper
from report.document_processor import get_document_processor
from report.report_synthesis import get_report_synthesizer
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
@ -33,6 +34,7 @@ class ReportGenerator:
self.db_manager = get_db_manager()
self.document_scraper = get_document_scraper()
self.document_processor = get_document_processor()
self.report_synthesizer = get_report_synthesizer()
async def initialize(self):
"""Initialize the report generator by setting up the database."""
@ -150,22 +152,8 @@ class ReportGenerator:
overlap_size
)
# TODO: Implement report synthesis using LLM
# For now, just return a placeholder report
report = f"# Report for: {query}\n\n"
report += f"Based on {len(selected_chunks)} document chunks\n\n"
# Add document summaries
for i, chunk in enumerate(selected_chunks[:5]): # Show first 5 chunks
report += f"## Document {i+1}: {chunk.get('title', 'Untitled')}\n"
report += f"Source: {chunk.get('url', 'Unknown')}\n"
report += f"Chunk type: {chunk.get('chunk_type', 'Unknown')}\n"
report += f"Priority score: {chunk.get('priority_score', 0.0):.2f}\n\n"
# Add a snippet of the content
content = chunk.get('content', '')
snippet = content[:200] + "..." if len(content) > 200 else content
report += f"{snippet}\n\n"
# Generate report using report synthesizer
report = await self.report_synthesizer.synthesize_report(selected_chunks, query)
return report
@ -203,20 +191,20 @@ async def test_report_generator(use_mock: bool = False):
search_results = [
{
'title': 'Python Documentation',
'url': 'https://docs.python.org/3/',
'snippet': 'Official Python documentation.',
'url': 'https://docs.python.org/3/tutorial/index.html',
'snippet': 'The Python Tutorial.',
'score': 0.95
},
{
'title': 'Python.org',
'url': 'https://www.python.org/',
'snippet': 'The official home of the Python Programming Language.',
'title': 'Python Requests Library',
'url': 'https://requests.readthedocs.io/en/latest/',
'snippet': 'Requests is an elegant and simple HTTP library for Python.',
'score': 0.85
},
{
'title': 'Wikipedia - Python',
'url': 'https://en.wikipedia.org/wiki/Python_(programming_language)',
'snippet': 'Python is a high-level, general-purpose programming language.',
'title': 'Real Python',
'url': 'https://realpython.com/',
'snippet': 'Python tutorials for developers of all skill levels.',
'score': 0.75
}
]

333
report/report_synthesis.py Normal file
View File

@ -0,0 +1,333 @@
"""
Report synthesis module for the intelligent research system.
This module provides functionality to synthesize reports from document chunks
using LLMs with a map-reduce approach.
"""
import os
import json
import asyncio
import logging
from typing import Dict, List, Any, Optional, Tuple, Union
import litellm
from litellm import completion
from config.config import get_config
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class ReportSynthesizer:
"""
Report synthesizer for the intelligent research system.
This class provides methods to synthesize reports from document chunks
using LLMs with a map-reduce approach.
"""
def __init__(self, model_name: Optional[str] = None):
"""
Initialize the report synthesizer.
Args:
model_name: Name of the LLM model to use. If None, uses the default model
from configuration.
"""
self.config = get_config()
# Use specified model or default from config for report synthesis
self.model_name = model_name or self.config.config_data.get('report_synthesis', {}).get('model', 'llama-3.3-70b-versatile')
# Get model-specific configuration
self.model_config = self.config.get_model_config(self.model_name)
# Set up LiteLLM with the appropriate provider
self._setup_provider()
def _setup_provider(self) -> None:
"""Set up the LLM provider based on the model configuration."""
provider = self.model_config.get('provider', 'groq')
try:
# Get API key for the provider
api_key = self.config.get_api_key(provider)
# Set environment variable for the provider
os.environ[f"{provider.upper()}_API_KEY"] = api_key
logger.info(f"Report synthesizer initialized with model: {self.model_name} (provider: {provider})")
except ValueError as e:
logger.error(f"Error setting up LLM provider: {e}")
def _get_completion_params(self) -> Dict[str, Any]:
"""
Get parameters for LLM completion based on model configuration.
Returns:
Dictionary of parameters for LiteLLM completion
"""
params = {
'temperature': self.model_config.get('temperature', 0.3), # Lower temperature for factual reporting
'max_tokens': self.model_config.get('max_tokens', 4000), # Longer output for comprehensive reports
'top_p': self.model_config.get('top_p', 0.9)
}
# Handle different provider configurations
provider = self.model_config.get('provider', 'groq')
if provider == 'groq':
# For Groq provider
params['model'] = f"groq/{self.model_name}"
elif provider == 'openrouter':
# For OpenRouter provider
params['model'] = self.model_config.get('model_name', self.model_name)
params['api_base'] = self.model_config.get('endpoint')
# Set HTTP headers for OpenRouter if needed
params['headers'] = {
'HTTP-Referer': 'https://sim-search.app', # Replace with your actual app URL
'X-Title': 'Intelligent Research System' # Replace with your actual app name
}
else:
# Standard provider (OpenAI, Anthropic, etc.)
params['model'] = self.model_name
return params
async def generate_completion(self, messages: List[Dict[str, str]], stream: bool = False) -> Union[str, Any]:
"""
Generate a completion using the configured LLM.
Args:
messages: List of message dictionaries with 'role' and 'content' keys
stream: Whether to stream the response
Returns:
If stream is False, returns the completion text as a string
If stream is True, returns the completion response object for streaming
"""
try:
params = self._get_completion_params()
params['messages'] = messages
params['stream'] = stream
response = completion(**params)
if stream:
return response
else:
return response.choices[0].message.content
except Exception as e:
logger.error(f"Error generating completion: {e}")
return f"Error: {str(e)}"
async def map_document_chunks(self, chunks: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]:
"""
Map phase: Process individual document chunks to extract key information.
Args:
chunks: List of document chunks
query: Original search query
Returns:
List of processed chunks with extracted information
"""
processed_chunks = []
for chunk in chunks:
# Create a prompt for extracting key information from the chunk
messages = [
{"role": "system", "content": """You are an expert research assistant. Extract the most relevant information from this document chunk that addresses the user's query.
Focus on factual information, key concepts, and important details.
Include any relevant statistics, definitions, or explanations that would be valuable for a comprehensive report.
Format your response as a concise summary with bullet points for key facts."""},
{"role": "user", "content": f"""Query: {query}
Document title: {chunk.get('title', 'Untitled')}
Document URL: {chunk.get('url', 'Unknown')}
Document chunk content:
{chunk.get('content', '')}
Extract the most relevant information from this document chunk that addresses the query."""}
]
# Process the chunk with the LLM
extracted_info = await self.generate_completion(messages)
# Add the extracted information to the chunk
processed_chunk = chunk.copy()
processed_chunk['extracted_info'] = extracted_info
processed_chunks.append(processed_chunk)
return processed_chunks
async def reduce_processed_chunks(self, processed_chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory") -> str:
"""
Reduce phase: Synthesize processed chunks into a coherent report.
Args:
processed_chunks: List of processed chunks with extracted information
query: Original search query
query_type: Type of query (factual, exploratory, comparative)
Returns:
Synthesized report as a string
"""
# Prepare the context with all extracted information
context = ""
for i, chunk in enumerate(processed_chunks):
context += f"Document {i+1}: {chunk.get('title', 'Untitled')}\n"
context += f"Source: {chunk.get('url', 'Unknown')}\n"
context += f"Extracted information:\n{chunk.get('extracted_info', '')}\n\n"
# Create a template based on query type
if query_type == "factual":
template = """Create a comprehensive factual report that directly answers the query. Focus on accuracy and clarity. Include:
1. A clear, direct answer to the query
2. Supporting evidence and facts from the sources
3. Any relevant context needed to understand the answer
4. Citations for all information (use numbered citations in square brackets [1], [2], etc.)
5. A references section at the end listing all sources"""
elif query_type == "comparative":
template = """Create a comprehensive comparative report that analyzes different perspectives on the query. Include:
1. An overview of the topic and why it's significant
2. A balanced presentation of different viewpoints or approaches
3. Analysis of similarities and differences
4. Evidence supporting each perspective
5. A synthesis of the information that highlights key insights
6. Citations for all information (use numbered citations in square brackets [1], [2], etc.)
7. A references section at the end listing all sources"""
else: # exploratory (default)
template = """Create a comprehensive exploratory report that investigates the query in depth. Include:
1. An introduction that frames the topic and its significance
2. Key concepts and definitions
3. Main findings and insights from the sources
4. Analysis of the information that highlights patterns and connections
5. Implications or applications of the findings
6. Citations for all information (use numbered citations in square brackets [1], [2], etc.)
7. A references section at the end listing all sources"""
# Create the prompt for synthesizing the report
messages = [
{"role": "system", "content": f"""You are an expert research assistant tasked with creating comprehensive, well-structured reports.
{template}
Format the report in Markdown with clear headings, subheadings, and bullet points where appropriate.
Make the report readable, engaging, and informative while maintaining academic rigor."""},
{"role": "user", "content": f"""Query: {query}
Information from sources:
{context}
Synthesize this information into a comprehensive report that addresses the query. Use your own words to create a coherent narrative, but ensure all information is based on the provided sources. Include citations and a references section."""}
]
# Generate the report
report = await self.generate_completion(messages)
return report
async def synthesize_report(self, chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory") -> str:
"""
Synthesize a report from document chunks using the map-reduce approach.
Args:
chunks: List of document chunks
query: Original search query
query_type: Type of query (factual, exploratory, comparative)
Returns:
Synthesized report as a string
"""
logger.info(f"Synthesizing report for query: {query}")
logger.info(f"Using {len(chunks)} document chunks")
# Determine query type if not specified
if query_type == "exploratory":
# Try to infer query type from the query text
if any(term in query.lower() for term in ["what is", "who is", "when did", "where is", "how does"]):
query_type = "factual"
elif any(term in query.lower() for term in ["compare", "difference", "versus", "pros and cons"]):
query_type = "comparative"
logger.info(f"Query type determined as: {query_type}")
# Map phase: Process individual document chunks
logger.info("Starting map phase: Processing individual document chunks")
processed_chunks = await self.map_document_chunks(chunks, query)
logger.info(f"Map phase complete: Processed {len(processed_chunks)} chunks")
# Reduce phase: Synthesize processed chunks into a coherent report
logger.info("Starting reduce phase: Synthesizing processed chunks into a report")
report = await self.reduce_processed_chunks(processed_chunks, query, query_type)
logger.info("Reduce phase complete: Report generated")
return report
# Create a singleton instance for global use
report_synthesizer = ReportSynthesizer()
def get_report_synthesizer(model_name: Optional[str] = None) -> ReportSynthesizer:
"""
Get the global report synthesizer instance or create a new one with a specific model.
Args:
model_name: Optional model name to use instead of the default
Returns:
ReportSynthesizer instance
"""
global report_synthesizer
if model_name and model_name != report_synthesizer.model_name:
report_synthesizer = ReportSynthesizer(model_name)
return report_synthesizer
async def test_report_synthesizer():
"""Test the report synthesizer with sample document chunks."""
# Sample document chunks
chunks = [
{
"title": "Introduction to Python",
"url": "https://docs.python.org/3/tutorial/index.html",
"content": "Python is an easy to learn, powerful programming language. It has efficient high-level data structures and a simple but effective approach to object-oriented programming. Python's elegant syntax and dynamic typing, together with its interpreted nature, make it an ideal language for scripting and rapid application development in many areas on most platforms."
},
{
"title": "Python Features",
"url": "https://www.python.org/about/",
"content": "Python is a programming language that lets you work quickly and integrate systems more effectively. Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together."
}
]
# Initialize the report synthesizer
synthesizer = get_report_synthesizer()
# Test query
query = "What are the key features of Python programming language?"
# Map phase
processed_chunks = await synthesizer.map_document_chunks(chunks, query)
# Print processed chunks
print("Processed chunks:")
for i, chunk in enumerate(processed_chunks):
print(f"Chunk {i+1}: {chunk.get('title')}")
print(f"Extracted information: {chunk.get('extracted_info')}")
print()
# Reduce phase
report = await synthesizer.reduce_processed_chunks(processed_chunks, query)
# Print report
print("Generated Report:")
print(report)
if __name__ == "__main__":
asyncio.run(test_report_synthesizer())

153
tests/test_report_synthesis.py Executable file
View File

@ -0,0 +1,153 @@
#!/usr/bin/env python3
"""
Test script for the report synthesis functionality.
This script tests the report synthesis functionality by generating a report
from sample document chunks.
"""
import os
import sys
import asyncio
import json
import argparse
from typing import List, Dict, Any, Optional
# Add the parent directory to the path so we can import the modules
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from report.report_synthesis import get_report_synthesizer
from report.document_processor import get_document_processor
from report.document_scraper import get_document_scraper
from report.database.db_manager import get_db_manager, initialize_database
async def test_with_sample_chunks():
"""Test report synthesis with sample document chunks."""
# Sample document chunks
chunks = [
{
"title": "Introduction to Python",
"url": "https://docs.python.org/3/tutorial/index.html",
"content": "Python is an easy to learn, powerful programming language. It has efficient high-level data structures and a simple but effective approach to object-oriented programming. Python's elegant syntax and dynamic typing, together with its interpreted nature, make it an ideal language for scripting and rapid application development in many areas on most platforms.",
"chunk_type": "introduction",
"priority_score": 0.95
},
{
"title": "Python Features",
"url": "https://www.python.org/about/",
"content": "Python is a programming language that lets you work quickly and integrate systems more effectively. Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together.",
"chunk_type": "features",
"priority_score": 0.90
},
{
"title": "Python Applications",
"url": "https://www.python.org/about/apps/",
"content": "Python is used in many application domains. Here's a sampling: Web and Internet Development, Scientific and Numeric Computing, Education, Desktop GUIs, Software Development, and Business Applications. Python is also used as a scripting language for web applications, e.g. via mod_wsgi for the Apache webserver. With Web Server Gateway Interface support, it has become the language of choice for many web developers.",
"chunk_type": "applications",
"priority_score": 0.85
}
]
# Initialize the report synthesizer
synthesizer = get_report_synthesizer()
# Test query
query = "What are the key features and applications of Python programming language?"
# Generate report
print(f"Generating report for query: '{query}'")
print("-" * 50)
report = await synthesizer.synthesize_report(chunks, query)
print("\nGenerated Report:")
print("=" * 50)
print(report)
print("=" * 50)
async def test_with_real_urls(urls: List[str], query: str, use_mock: bool = False):
"""
Test report synthesis with real URLs.
Args:
urls: List of URLs to scrape
query: Query to use for the report
use_mock: Whether to use mock data for document scraping
"""
# Initialize the database
await initialize_database()
# Get document scraper with mock option
document_scraper = get_document_scraper(use_mock=use_mock)
# Get document processor
document_processor = get_document_processor()
# Get report synthesizer
report_synthesizer = get_report_synthesizer()
# Scrape URLs
print(f"Scraping {len(urls)} URLs...")
documents = await document_scraper.scrape_urls(urls)
print(f"Scraped {len(documents)} documents")
# Create relevance scores (mock scores for this test)
relevance_scores = {}
for i, doc in enumerate(documents):
relevance_scores[doc.get('url')] = 1.0 - (i * 0.1) # Simple decreasing scores
# Process documents for report
print("Processing documents for report...")
selected_chunks = document_processor.process_documents_for_report(
documents,
relevance_scores,
token_budget=4000,
chunk_size=1000,
overlap_size=100
)
print(f"Selected {len(selected_chunks)} chunks for report")
# Generate report
print(f"Generating report for query: '{query}'")
print("-" * 50)
report = await report_synthesizer.synthesize_report(selected_chunks, query)
print("\nGenerated Report:")
print("=" * 50)
print(report)
print("=" * 50)
# Save the report to a file
output_file = f"report_{int(asyncio.get_event_loop().time())}.md"
with open(output_file, "w") as f:
f.write(report)
print(f"Report saved to {output_file}")
async def main():
"""Main function to run the test."""
parser = argparse.ArgumentParser(description="Test report synthesis functionality")
parser.add_argument("--sample", action="store_true", help="Use sample document chunks")
parser.add_argument("--urls", nargs="+", help="URLs to scrape")
parser.add_argument("--query", type=str, default="What are the key features and applications of Python programming language?", help="Query to use for the report")
parser.add_argument("--mock", action="store_true", help="Use mock data for document scraping")
args = parser.parse_args()
if args.sample:
await test_with_sample_chunks()
elif args.urls:
await test_with_real_urls(args.urls, args.query, args.mock)
else:
# Default test with some Python-related URLs
default_urls = [
"https://docs.python.org/3/tutorial/index.html",
"https://www.python.org/about/",
"https://www.python.org/about/apps/",
"https://realpython.com/python-introduction/"
]
await test_with_real_urls(default_urls, args.query, args.mock)
if __name__ == "__main__":
asyncio.run(main())