From 8ee4605522d600a76053f78764de5a91bb322744 Mon Sep 17 00:00:00 2001 From: Steve White Date: Thu, 27 Feb 2025 17:59:18 -0600 Subject: [PATCH] Implement Phase 3: Report Synthesis using Map-Reduce approach with Groq LLM --- report/report_generator.py | 36 ++-- report/report_synthesis.py | 333 +++++++++++++++++++++++++++++++++ tests/test_report_synthesis.py | 153 +++++++++++++++ 3 files changed, 498 insertions(+), 24 deletions(-) create mode 100644 report/report_synthesis.py create mode 100755 tests/test_report_synthesis.py diff --git a/report/report_generator.py b/report/report_generator.py index 41d656d..3d93336 100644 --- a/report/report_generator.py +++ b/report/report_generator.py @@ -14,6 +14,7 @@ from typing import Dict, List, Any, Optional, Tuple, Union from report.database.db_manager import get_db_manager, initialize_database from report.document_scraper import get_document_scraper from report.document_processor import get_document_processor +from report.report_synthesis import get_report_synthesizer # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') @@ -33,6 +34,7 @@ class ReportGenerator: self.db_manager = get_db_manager() self.document_scraper = get_document_scraper() self.document_processor = get_document_processor() + self.report_synthesizer = get_report_synthesizer() async def initialize(self): """Initialize the report generator by setting up the database.""" @@ -150,22 +152,8 @@ class ReportGenerator: overlap_size ) - # TODO: Implement report synthesis using LLM - # For now, just return a placeholder report - report = f"# Report for: {query}\n\n" - report += f"Based on {len(selected_chunks)} document chunks\n\n" - - # Add document summaries - for i, chunk in enumerate(selected_chunks[:5]): # Show first 5 chunks - report += f"## Document {i+1}: {chunk.get('title', 'Untitled')}\n" - report += f"Source: {chunk.get('url', 'Unknown')}\n" - report += f"Chunk type: {chunk.get('chunk_type', 'Unknown')}\n" - report += f"Priority score: {chunk.get('priority_score', 0.0):.2f}\n\n" - - # Add a snippet of the content - content = chunk.get('content', '') - snippet = content[:200] + "..." if len(content) > 200 else content - report += f"{snippet}\n\n" + # Generate report using report synthesizer + report = await self.report_synthesizer.synthesize_report(selected_chunks, query) return report @@ -203,20 +191,20 @@ async def test_report_generator(use_mock: bool = False): search_results = [ { 'title': 'Python Documentation', - 'url': 'https://docs.python.org/3/', - 'snippet': 'Official Python documentation.', + 'url': 'https://docs.python.org/3/tutorial/index.html', + 'snippet': 'The Python Tutorial.', 'score': 0.95 }, { - 'title': 'Python.org', - 'url': 'https://www.python.org/', - 'snippet': 'The official home of the Python Programming Language.', + 'title': 'Python Requests Library', + 'url': 'https://requests.readthedocs.io/en/latest/', + 'snippet': 'Requests is an elegant and simple HTTP library for Python.', 'score': 0.85 }, { - 'title': 'Wikipedia - Python', - 'url': 'https://en.wikipedia.org/wiki/Python_(programming_language)', - 'snippet': 'Python is a high-level, general-purpose programming language.', + 'title': 'Real Python', + 'url': 'https://realpython.com/', + 'snippet': 'Python tutorials for developers of all skill levels.', 'score': 0.75 } ] diff --git a/report/report_synthesis.py b/report/report_synthesis.py new file mode 100644 index 0000000..0d5b64c --- /dev/null +++ b/report/report_synthesis.py @@ -0,0 +1,333 @@ +""" +Report synthesis module for the intelligent research system. + +This module provides functionality to synthesize reports from document chunks +using LLMs with a map-reduce approach. +""" + +import os +import json +import asyncio +import logging +from typing import Dict, List, Any, Optional, Tuple, Union + +import litellm +from litellm import completion + +from config.config import get_config + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class ReportSynthesizer: + """ + Report synthesizer for the intelligent research system. + + This class provides methods to synthesize reports from document chunks + using LLMs with a map-reduce approach. + """ + + def __init__(self, model_name: Optional[str] = None): + """ + Initialize the report synthesizer. + + Args: + model_name: Name of the LLM model to use. If None, uses the default model + from configuration. + """ + self.config = get_config() + + # Use specified model or default from config for report synthesis + self.model_name = model_name or self.config.config_data.get('report_synthesis', {}).get('model', 'llama-3.3-70b-versatile') + + # Get model-specific configuration + self.model_config = self.config.get_model_config(self.model_name) + + # Set up LiteLLM with the appropriate provider + self._setup_provider() + + def _setup_provider(self) -> None: + """Set up the LLM provider based on the model configuration.""" + provider = self.model_config.get('provider', 'groq') + + try: + # Get API key for the provider + api_key = self.config.get_api_key(provider) + + # Set environment variable for the provider + os.environ[f"{provider.upper()}_API_KEY"] = api_key + + logger.info(f"Report synthesizer initialized with model: {self.model_name} (provider: {provider})") + except ValueError as e: + logger.error(f"Error setting up LLM provider: {e}") + + def _get_completion_params(self) -> Dict[str, Any]: + """ + Get parameters for LLM completion based on model configuration. + + Returns: + Dictionary of parameters for LiteLLM completion + """ + params = { + 'temperature': self.model_config.get('temperature', 0.3), # Lower temperature for factual reporting + 'max_tokens': self.model_config.get('max_tokens', 4000), # Longer output for comprehensive reports + 'top_p': self.model_config.get('top_p', 0.9) + } + + # Handle different provider configurations + provider = self.model_config.get('provider', 'groq') + + if provider == 'groq': + # For Groq provider + params['model'] = f"groq/{self.model_name}" + elif provider == 'openrouter': + # For OpenRouter provider + params['model'] = self.model_config.get('model_name', self.model_name) + params['api_base'] = self.model_config.get('endpoint') + + # Set HTTP headers for OpenRouter if needed + params['headers'] = { + 'HTTP-Referer': 'https://sim-search.app', # Replace with your actual app URL + 'X-Title': 'Intelligent Research System' # Replace with your actual app name + } + else: + # Standard provider (OpenAI, Anthropic, etc.) + params['model'] = self.model_name + + return params + + async def generate_completion(self, messages: List[Dict[str, str]], stream: bool = False) -> Union[str, Any]: + """ + Generate a completion using the configured LLM. + + Args: + messages: List of message dictionaries with 'role' and 'content' keys + stream: Whether to stream the response + + Returns: + If stream is False, returns the completion text as a string + If stream is True, returns the completion response object for streaming + """ + try: + params = self._get_completion_params() + params['messages'] = messages + params['stream'] = stream + + response = completion(**params) + + if stream: + return response + else: + return response.choices[0].message.content + except Exception as e: + logger.error(f"Error generating completion: {e}") + return f"Error: {str(e)}" + + async def map_document_chunks(self, chunks: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]: + """ + Map phase: Process individual document chunks to extract key information. + + Args: + chunks: List of document chunks + query: Original search query + + Returns: + List of processed chunks with extracted information + """ + processed_chunks = [] + + for chunk in chunks: + # Create a prompt for extracting key information from the chunk + messages = [ + {"role": "system", "content": """You are an expert research assistant. Extract the most relevant information from this document chunk that addresses the user's query. + Focus on factual information, key concepts, and important details. + Include any relevant statistics, definitions, or explanations that would be valuable for a comprehensive report. + Format your response as a concise summary with bullet points for key facts."""}, + {"role": "user", "content": f"""Query: {query} + + Document title: {chunk.get('title', 'Untitled')} + Document URL: {chunk.get('url', 'Unknown')} + + Document chunk content: + {chunk.get('content', '')} + + Extract the most relevant information from this document chunk that addresses the query."""} + ] + + # Process the chunk with the LLM + extracted_info = await self.generate_completion(messages) + + # Add the extracted information to the chunk + processed_chunk = chunk.copy() + processed_chunk['extracted_info'] = extracted_info + + processed_chunks.append(processed_chunk) + + return processed_chunks + + async def reduce_processed_chunks(self, processed_chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory") -> str: + """ + Reduce phase: Synthesize processed chunks into a coherent report. + + Args: + processed_chunks: List of processed chunks with extracted information + query: Original search query + query_type: Type of query (factual, exploratory, comparative) + + Returns: + Synthesized report as a string + """ + # Prepare the context with all extracted information + context = "" + for i, chunk in enumerate(processed_chunks): + context += f"Document {i+1}: {chunk.get('title', 'Untitled')}\n" + context += f"Source: {chunk.get('url', 'Unknown')}\n" + context += f"Extracted information:\n{chunk.get('extracted_info', '')}\n\n" + + # Create a template based on query type + if query_type == "factual": + template = """Create a comprehensive factual report that directly answers the query. Focus on accuracy and clarity. Include: + 1. A clear, direct answer to the query + 2. Supporting evidence and facts from the sources + 3. Any relevant context needed to understand the answer + 4. Citations for all information (use numbered citations in square brackets [1], [2], etc.) + 5. A references section at the end listing all sources""" + elif query_type == "comparative": + template = """Create a comprehensive comparative report that analyzes different perspectives on the query. Include: + 1. An overview of the topic and why it's significant + 2. A balanced presentation of different viewpoints or approaches + 3. Analysis of similarities and differences + 4. Evidence supporting each perspective + 5. A synthesis of the information that highlights key insights + 6. Citations for all information (use numbered citations in square brackets [1], [2], etc.) + 7. A references section at the end listing all sources""" + else: # exploratory (default) + template = """Create a comprehensive exploratory report that investigates the query in depth. Include: + 1. An introduction that frames the topic and its significance + 2. Key concepts and definitions + 3. Main findings and insights from the sources + 4. Analysis of the information that highlights patterns and connections + 5. Implications or applications of the findings + 6. Citations for all information (use numbered citations in square brackets [1], [2], etc.) + 7. A references section at the end listing all sources""" + + # Create the prompt for synthesizing the report + messages = [ + {"role": "system", "content": f"""You are an expert research assistant tasked with creating comprehensive, well-structured reports. + {template} + + Format the report in Markdown with clear headings, subheadings, and bullet points where appropriate. + Make the report readable, engaging, and informative while maintaining academic rigor."""}, + {"role": "user", "content": f"""Query: {query} + + Information from sources: + {context} + + Synthesize this information into a comprehensive report that addresses the query. Use your own words to create a coherent narrative, but ensure all information is based on the provided sources. Include citations and a references section."""} + ] + + # Generate the report + report = await self.generate_completion(messages) + + return report + + async def synthesize_report(self, chunks: List[Dict[str, Any]], query: str, query_type: str = "exploratory") -> str: + """ + Synthesize a report from document chunks using the map-reduce approach. + + Args: + chunks: List of document chunks + query: Original search query + query_type: Type of query (factual, exploratory, comparative) + + Returns: + Synthesized report as a string + """ + logger.info(f"Synthesizing report for query: {query}") + logger.info(f"Using {len(chunks)} document chunks") + + # Determine query type if not specified + if query_type == "exploratory": + # Try to infer query type from the query text + if any(term in query.lower() for term in ["what is", "who is", "when did", "where is", "how does"]): + query_type = "factual" + elif any(term in query.lower() for term in ["compare", "difference", "versus", "pros and cons"]): + query_type = "comparative" + + logger.info(f"Query type determined as: {query_type}") + + # Map phase: Process individual document chunks + logger.info("Starting map phase: Processing individual document chunks") + processed_chunks = await self.map_document_chunks(chunks, query) + logger.info(f"Map phase complete: Processed {len(processed_chunks)} chunks") + + # Reduce phase: Synthesize processed chunks into a coherent report + logger.info("Starting reduce phase: Synthesizing processed chunks into a report") + report = await self.reduce_processed_chunks(processed_chunks, query, query_type) + logger.info("Reduce phase complete: Report generated") + + return report + + +# Create a singleton instance for global use +report_synthesizer = ReportSynthesizer() + +def get_report_synthesizer(model_name: Optional[str] = None) -> ReportSynthesizer: + """ + Get the global report synthesizer instance or create a new one with a specific model. + + Args: + model_name: Optional model name to use instead of the default + + Returns: + ReportSynthesizer instance + """ + global report_synthesizer + + if model_name and model_name != report_synthesizer.model_name: + report_synthesizer = ReportSynthesizer(model_name) + + return report_synthesizer + +async def test_report_synthesizer(): + """Test the report synthesizer with sample document chunks.""" + # Sample document chunks + chunks = [ + { + "title": "Introduction to Python", + "url": "https://docs.python.org/3/tutorial/index.html", + "content": "Python is an easy to learn, powerful programming language. It has efficient high-level data structures and a simple but effective approach to object-oriented programming. Python's elegant syntax and dynamic typing, together with its interpreted nature, make it an ideal language for scripting and rapid application development in many areas on most platforms." + }, + { + "title": "Python Features", + "url": "https://www.python.org/about/", + "content": "Python is a programming language that lets you work quickly and integrate systems more effectively. Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together." + } + ] + + # Initialize the report synthesizer + synthesizer = get_report_synthesizer() + + # Test query + query = "What are the key features of Python programming language?" + + # Map phase + processed_chunks = await synthesizer.map_document_chunks(chunks, query) + + # Print processed chunks + print("Processed chunks:") + for i, chunk in enumerate(processed_chunks): + print(f"Chunk {i+1}: {chunk.get('title')}") + print(f"Extracted information: {chunk.get('extracted_info')}") + print() + + # Reduce phase + report = await synthesizer.reduce_processed_chunks(processed_chunks, query) + + # Print report + print("Generated Report:") + print(report) + +if __name__ == "__main__": + asyncio.run(test_report_synthesizer()) diff --git a/tests/test_report_synthesis.py b/tests/test_report_synthesis.py new file mode 100755 index 0000000..8fcd611 --- /dev/null +++ b/tests/test_report_synthesis.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 +""" +Test script for the report synthesis functionality. + +This script tests the report synthesis functionality by generating a report +from sample document chunks. +""" + +import os +import sys +import asyncio +import json +import argparse +from typing import List, Dict, Any, Optional + +# Add the parent directory to the path so we can import the modules +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from report.report_synthesis import get_report_synthesizer +from report.document_processor import get_document_processor +from report.document_scraper import get_document_scraper +from report.database.db_manager import get_db_manager, initialize_database + +async def test_with_sample_chunks(): + """Test report synthesis with sample document chunks.""" + # Sample document chunks + chunks = [ + { + "title": "Introduction to Python", + "url": "https://docs.python.org/3/tutorial/index.html", + "content": "Python is an easy to learn, powerful programming language. It has efficient high-level data structures and a simple but effective approach to object-oriented programming. Python's elegant syntax and dynamic typing, together with its interpreted nature, make it an ideal language for scripting and rapid application development in many areas on most platforms.", + "chunk_type": "introduction", + "priority_score": 0.95 + }, + { + "title": "Python Features", + "url": "https://www.python.org/about/", + "content": "Python is a programming language that lets you work quickly and integrate systems more effectively. Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together.", + "chunk_type": "features", + "priority_score": 0.90 + }, + { + "title": "Python Applications", + "url": "https://www.python.org/about/apps/", + "content": "Python is used in many application domains. Here's a sampling: Web and Internet Development, Scientific and Numeric Computing, Education, Desktop GUIs, Software Development, and Business Applications. Python is also used as a scripting language for web applications, e.g. via mod_wsgi for the Apache webserver. With Web Server Gateway Interface support, it has become the language of choice for many web developers.", + "chunk_type": "applications", + "priority_score": 0.85 + } + ] + + # Initialize the report synthesizer + synthesizer = get_report_synthesizer() + + # Test query + query = "What are the key features and applications of Python programming language?" + + # Generate report + print(f"Generating report for query: '{query}'") + print("-" * 50) + + report = await synthesizer.synthesize_report(chunks, query) + + print("\nGenerated Report:") + print("=" * 50) + print(report) + print("=" * 50) + +async def test_with_real_urls(urls: List[str], query: str, use_mock: bool = False): + """ + Test report synthesis with real URLs. + + Args: + urls: List of URLs to scrape + query: Query to use for the report + use_mock: Whether to use mock data for document scraping + """ + # Initialize the database + await initialize_database() + + # Get document scraper with mock option + document_scraper = get_document_scraper(use_mock=use_mock) + + # Get document processor + document_processor = get_document_processor() + + # Get report synthesizer + report_synthesizer = get_report_synthesizer() + + # Scrape URLs + print(f"Scraping {len(urls)} URLs...") + documents = await document_scraper.scrape_urls(urls) + print(f"Scraped {len(documents)} documents") + + # Create relevance scores (mock scores for this test) + relevance_scores = {} + for i, doc in enumerate(documents): + relevance_scores[doc.get('url')] = 1.0 - (i * 0.1) # Simple decreasing scores + + # Process documents for report + print("Processing documents for report...") + selected_chunks = document_processor.process_documents_for_report( + documents, + relevance_scores, + token_budget=4000, + chunk_size=1000, + overlap_size=100 + ) + print(f"Selected {len(selected_chunks)} chunks for report") + + # Generate report + print(f"Generating report for query: '{query}'") + print("-" * 50) + + report = await report_synthesizer.synthesize_report(selected_chunks, query) + + print("\nGenerated Report:") + print("=" * 50) + print(report) + print("=" * 50) + + # Save the report to a file + output_file = f"report_{int(asyncio.get_event_loop().time())}.md" + with open(output_file, "w") as f: + f.write(report) + + print(f"Report saved to {output_file}") + +async def main(): + """Main function to run the test.""" + parser = argparse.ArgumentParser(description="Test report synthesis functionality") + parser.add_argument("--sample", action="store_true", help="Use sample document chunks") + parser.add_argument("--urls", nargs="+", help="URLs to scrape") + parser.add_argument("--query", type=str, default="What are the key features and applications of Python programming language?", help="Query to use for the report") + parser.add_argument("--mock", action="store_true", help="Use mock data for document scraping") + + args = parser.parse_args() + + if args.sample: + await test_with_sample_chunks() + elif args.urls: + await test_with_real_urls(args.urls, args.query, args.mock) + else: + # Default test with some Python-related URLs + default_urls = [ + "https://docs.python.org/3/tutorial/index.html", + "https://www.python.org/about/", + "https://www.python.org/about/apps/", + "https://realpython.com/python-introduction/" + ] + await test_with_real_urls(default_urls, args.query, args.mock) + +if __name__ == "__main__": + asyncio.run(main())