352 lines
13 KiB
Python
352 lines
13 KiB
Python
"""
|
|
Report generator module for the intelligent research system.
|
|
|
|
This module provides functionality to generate reports from search results
|
|
by scraping documents, storing them in a database, and synthesizing them
|
|
into a comprehensive report.
|
|
"""
|
|
|
|
import os
|
|
import asyncio
|
|
import logging
|
|
from typing import Dict, List, Any, Optional, Tuple, Union
|
|
|
|
from report.database.db_manager import get_db_manager, initialize_database
|
|
from report.document_scraper import get_document_scraper
|
|
from report.document_processor import get_document_processor
|
|
from report.report_synthesis import get_report_synthesizer
|
|
from report.progressive_report_synthesis import get_progressive_report_synthesizer
|
|
from report.report_detail_levels import get_report_detail_level_manager, DetailLevel
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ReportGenerator:
|
|
"""
|
|
Report generator for the intelligent research system.
|
|
|
|
This class provides methods to generate reports from search results
|
|
by scraping documents, storing them in a database, and synthesizing them
|
|
into a comprehensive report.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the report generator."""
|
|
self.db_manager = get_db_manager()
|
|
self.document_scraper = get_document_scraper()
|
|
self.document_processor = get_document_processor()
|
|
self.report_synthesizer = get_report_synthesizer()
|
|
self.progressive_report_synthesizer = get_progressive_report_synthesizer()
|
|
self.detail_level_manager = get_report_detail_level_manager()
|
|
self.detail_level = "standard" # Default detail level
|
|
self.model_name = None # Will use default model based on detail level
|
|
|
|
async def initialize(self):
|
|
"""Initialize the report generator by setting up the database."""
|
|
await initialize_database()
|
|
logger.info("Report generator initialized")
|
|
|
|
def set_detail_level(self, detail_level: str) -> None:
|
|
"""
|
|
Set the detail level for report generation.
|
|
|
|
Args:
|
|
detail_level: Detail level (brief, standard, detailed, comprehensive)
|
|
"""
|
|
try:
|
|
# Validate detail level
|
|
config = self.detail_level_manager.get_detail_level_config(detail_level)
|
|
self.detail_level = detail_level
|
|
|
|
# Update model if needed
|
|
model = config.get("model")
|
|
if model and model != self.model_name:
|
|
self.model_name = model
|
|
self.report_synthesizer = get_report_synthesizer(model)
|
|
self.progressive_report_synthesizer = get_progressive_report_synthesizer(model)
|
|
|
|
logger.info(f"Detail level set to {detail_level} with model {model}")
|
|
except ValueError as e:
|
|
logger.error(f"Error setting detail level: {e}")
|
|
raise
|
|
|
|
def get_detail_level_config(self) -> Dict[str, Any]:
|
|
"""
|
|
Get the current detail level configuration.
|
|
|
|
Returns:
|
|
Dictionary of configuration parameters for the current detail level
|
|
"""
|
|
return self.detail_level_manager.get_detail_level_config(self.detail_level)
|
|
|
|
def get_available_detail_levels(self) -> List[Tuple[str, str]]:
|
|
"""
|
|
Get a list of available detail levels with descriptions.
|
|
|
|
Returns:
|
|
List of tuples containing detail level and description
|
|
"""
|
|
return self.detail_level_manager.get_available_detail_levels()
|
|
|
|
async def process_search_results(self, search_results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Process search results by scraping the URLs and storing them in the database.
|
|
|
|
Args:
|
|
search_results: List of search results, each containing at least a 'url' field
|
|
|
|
Returns:
|
|
List of processed documents
|
|
"""
|
|
# Extract URLs from search results
|
|
urls = [result.get('url') for result in search_results if result.get('url')]
|
|
|
|
# Extract relevance scores if available
|
|
relevance_scores = {}
|
|
for result in search_results:
|
|
if result.get('url') and result.get('score') is not None:
|
|
relevance_scores[result.get('url')] = result.get('score')
|
|
|
|
# Scrape URLs and store in database
|
|
documents = await self.document_scraper.scrape_urls(urls)
|
|
|
|
# Log results
|
|
logger.info(f"Processed {len(documents)} documents out of {len(urls)} URLs")
|
|
|
|
return documents, relevance_scores
|
|
|
|
async def get_document_by_url(self, url: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get a document by its URL.
|
|
|
|
Args:
|
|
url: URL of the document
|
|
|
|
Returns:
|
|
Document as a dictionary, or None if not found
|
|
"""
|
|
return await self.db_manager.get_document_by_url(url)
|
|
|
|
async def search_documents(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
|
|
"""
|
|
Search for documents in the database.
|
|
|
|
Args:
|
|
query: Search query
|
|
limit: Maximum number of results to return
|
|
|
|
Returns:
|
|
List of matching documents
|
|
"""
|
|
return await self.db_manager.search_documents(query, limit)
|
|
|
|
async def prepare_documents_for_report(self,
|
|
search_results: List[Dict[str, Any]],
|
|
token_budget: Optional[int] = None,
|
|
chunk_size: Optional[int] = None,
|
|
overlap_size: Optional[int] = None) -> List[Dict[str, Any]]:
|
|
"""
|
|
Prepare documents for report generation by processing search results,
|
|
prioritizing documents, and chunking them to fit within token budget.
|
|
|
|
Args:
|
|
search_results: List of search results
|
|
token_budget: Maximum number of tokens to use
|
|
chunk_size: Maximum number of tokens per chunk
|
|
overlap_size: Number of tokens to overlap between chunks
|
|
|
|
Returns:
|
|
List of selected document chunks
|
|
"""
|
|
# Get configuration from detail level if not specified
|
|
config = self.get_detail_level_config()
|
|
|
|
if token_budget is None:
|
|
token_budget = config.get("token_budget")
|
|
|
|
if chunk_size is None:
|
|
chunk_size = config.get("chunk_size", 1000)
|
|
|
|
if overlap_size is None:
|
|
overlap_size = config.get("overlap_size", 100)
|
|
|
|
logger.info(f"Preparing documents with token_budget={token_budget}, chunk_size={chunk_size}, overlap_size={overlap_size}")
|
|
|
|
# Process search results to get documents and relevance scores
|
|
documents, relevance_scores = await self.process_search_results(search_results)
|
|
|
|
# Prioritize and chunk documents
|
|
selected_chunks = self.document_processor.process_documents_for_report(
|
|
documents,
|
|
relevance_scores,
|
|
token_budget,
|
|
chunk_size,
|
|
overlap_size
|
|
)
|
|
|
|
return selected_chunks
|
|
|
|
def set_progress_callback(self, callback):
|
|
"""
|
|
Set the progress callback for both synthesizers.
|
|
|
|
Args:
|
|
callback: Function that takes (current_progress, total, current_report) as arguments
|
|
"""
|
|
# Set the callback for both synthesizers
|
|
if hasattr(self.report_synthesizer, 'set_progress_callback'):
|
|
self.report_synthesizer.set_progress_callback(callback)
|
|
|
|
if hasattr(self.progressive_report_synthesizer, 'set_progress_callback'):
|
|
self.progressive_report_synthesizer.set_progress_callback(callback)
|
|
|
|
async def generate_report(self,
|
|
search_results: List[Dict[str, Any]],
|
|
query: str,
|
|
token_budget: Optional[int] = None,
|
|
chunk_size: Optional[int] = None,
|
|
overlap_size: Optional[int] = None,
|
|
detail_level: Optional[str] = None,
|
|
query_type: Optional[str] = None) -> str:
|
|
"""
|
|
Generate a report from search results.
|
|
|
|
Args:
|
|
search_results: List of search results
|
|
query: Original search query
|
|
token_budget: Maximum number of tokens to use
|
|
chunk_size: Maximum number of tokens per chunk
|
|
overlap_size: Number of tokens to overlap between chunks
|
|
detail_level: Level of detail for the report (brief, standard, detailed, comprehensive)
|
|
|
|
Returns:
|
|
Generated report as a string
|
|
"""
|
|
# Set detail level if specified
|
|
if detail_level:
|
|
self.set_detail_level(detail_level)
|
|
|
|
# Prepare documents for report
|
|
selected_chunks = await self.prepare_documents_for_report(
|
|
search_results,
|
|
token_budget,
|
|
chunk_size,
|
|
overlap_size
|
|
)
|
|
|
|
# Log query type information
|
|
if query_type:
|
|
logger.info(f"Using specified query type: {query_type}")
|
|
else:
|
|
logger.info("Using automatic query type detection")
|
|
|
|
# Choose the appropriate synthesizer based on detail level
|
|
if self.detail_level.lower() == "comprehensive":
|
|
# Use progressive report synthesizer for comprehensive detail level
|
|
logger.info(f"Using progressive report synthesizer for {self.detail_level} detail level")
|
|
report = await self.progressive_report_synthesizer.synthesize_report(
|
|
selected_chunks,
|
|
query,
|
|
query_type=query_type,
|
|
detail_level=self.detail_level
|
|
)
|
|
else:
|
|
# Use standard report synthesizer for other detail levels
|
|
logger.info(f"Using standard report synthesizer for {self.detail_level} detail level")
|
|
report = await self.report_synthesizer.synthesize_report(
|
|
selected_chunks,
|
|
query,
|
|
query_type=query_type,
|
|
detail_level=self.detail_level
|
|
)
|
|
|
|
return report
|
|
|
|
|
|
# Create a singleton instance for global use
|
|
report_generator = ReportGenerator()
|
|
|
|
async def initialize_report_generator():
|
|
"""Initialize the report generator."""
|
|
await report_generator.initialize()
|
|
|
|
def get_report_generator() -> ReportGenerator:
|
|
"""
|
|
Get the global report generator instance.
|
|
|
|
Returns:
|
|
ReportGenerator instance
|
|
"""
|
|
return report_generator
|
|
|
|
async def test_report_generator(use_mock: bool = False):
|
|
"""
|
|
Test the report generator with sample search results.
|
|
|
|
Args:
|
|
use_mock: If True, use mock data instead of making actual API calls
|
|
"""
|
|
# Initialize the report generator
|
|
await initialize_report_generator()
|
|
|
|
# Get document scraper with mock option
|
|
document_scraper = get_document_scraper(use_mock=use_mock)
|
|
|
|
# Sample search results with real, accessible URLs
|
|
search_results = [
|
|
{
|
|
'title': 'Python Documentation',
|
|
'url': 'https://docs.python.org/3/tutorial/index.html',
|
|
'snippet': 'The Python Tutorial.',
|
|
'score': 0.95
|
|
},
|
|
{
|
|
'title': 'Python Requests Library',
|
|
'url': 'https://requests.readthedocs.io/en/latest/',
|
|
'snippet': 'Requests is an elegant and simple HTTP library for Python.',
|
|
'score': 0.85
|
|
},
|
|
{
|
|
'title': 'Real Python',
|
|
'url': 'https://realpython.com/',
|
|
'snippet': 'Python tutorials for developers of all skill levels.',
|
|
'score': 0.75
|
|
}
|
|
]
|
|
|
|
try:
|
|
# Process search results
|
|
documents, relevance_scores = await report_generator.process_search_results(search_results)
|
|
|
|
# Print documents
|
|
print(f"Processed {len(documents)} documents")
|
|
for doc in documents:
|
|
print(f"Document: {doc.get('title')} ({doc.get('url')})")
|
|
print(f"Token count: {doc.get('token_count')}")
|
|
content_preview = doc.get('content', '')[:100] + '...' if doc.get('content') else 'No content'
|
|
print(f"Content snippet: {content_preview}")
|
|
print()
|
|
|
|
# Generate report
|
|
report = await report_generator.generate_report(search_results, "Python programming")
|
|
|
|
# Print report
|
|
print("Generated Report:")
|
|
print(report)
|
|
except Exception as e:
|
|
logger.error(f"Error during report generation test: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Run test if this module is executed directly
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='Test the report generator')
|
|
parser.add_argument('--mock', action='store_true', help='Use mock data instead of making actual API calls')
|
|
args = parser.parse_args()
|
|
|
|
print(f"Running test with {'mock data' if args.mock else 'real data'}")
|
|
asyncio.run(test_report_generator(use_mock=args.mock))
|