diff --git a/.note/current_focus.md b/.note/current_focus.md index 9c4dac6..06e42d5 100644 --- a/.note/current_focus.md +++ b/.note/current_focus.md @@ -6,7 +6,8 @@ - ✅ Fixed AttributeError in report generation progress callback - ✅ Updated UI progress callback to use direct value assignment instead of update method - ✅ Enhanced progress callback to use Gradio's built-in progress tracking mechanism for better UI updates during async operations -- ✅ Committed changes with message "Fix AttributeError in report progress callback by using direct value assignment instead of update method" +- ✅ Consolidated redundant progress indicators in the UI to use only Gradio's built-in progress tracking +- ✅ Committed changes with message "Enhanced UI progress callback to use Gradio's built-in progress tracking mechanism for better real-time updates during report generation" ### Project Directory Reorganization - ✅ Reorganized project directory structure for better maintainability diff --git a/.note/session_log.md b/.note/session_log.md index f0f2cb3..ec3f7a0 100644 --- a/.note/session_log.md +++ b/.note/session_log.md @@ -3,7 +3,7 @@ ## Session: 2025-03-17 ### Overview -Fixed bugs in the UI progress callback mechanism for report generation and consolidated redundant progress indicators. +Fixed bugs in the UI progress callback mechanism for report generation, consolidated redundant progress indicators, and resolved LLM provider configuration issues with OpenRouter models. ### Key Activities 1. Identified and fixed an AttributeError in the report generation progress callback: @@ -29,6 +29,12 @@ Fixed bugs in the UI progress callback mechanism for report generation and conso - Gradio Textbox and Slider components use direct value assignment for updates rather than an update method - Asynchronous operations in Gradio require special handling to ensure UI elements update in real-time - Using Gradio's built-in progress tracking mechanism is more effective than manual UI updates for async tasks +- When using LiteLLM with different model providers, it's essential to set the `custom_llm_provider` parameter correctly for each provider + +4. Fixed LLM provider configuration for OpenRouter models: + - Identified an issue with OpenRouter models not working correctly in the report synthesis module + - Added the missing `custom_llm_provider = 'openrouter'` parameter to the LiteLLM completion parameters + - Tested the fix to ensure OpenRouter models now work correctly for report generation - The progress callback mechanism is critical for providing user feedback during long-running report generation tasks - Proper error handling in UI callbacks is essential for a smooth user experience - Simplifying the UI by removing redundant progress indicators improves user experience and reduces confusion diff --git a/execution/__init__.py b/execution/__init__.py index 55bc0c8..a92925b 100644 --- a/execution/__init__.py +++ b/execution/__init__.py @@ -1,4 +1,7 @@ """ Search execution module for the intelligent research system. -This module handles the execution of search queries across various search engines. +This module handles the execution of search queries across various search engines, +including decomposed sub-questions. """ + +from .sub_question_executor import get_sub_question_executor, SubQuestionExecutor diff --git a/execution/search_executor.py b/execution/search_executor.py index 21d106e..0055e02 100644 --- a/execution/search_executor.py +++ b/execution/search_executor.py @@ -1,6 +1,7 @@ """ Search executor module. -Handles the execution of search queries across multiple search engines. +Handles the execution of search queries across multiple search engines, +including processing of decomposed sub-questions. """ import os @@ -9,6 +10,11 @@ import time import asyncio import concurrent.futures from typing import Dict, List, Any, Optional, Union +import logging + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) from config.config import get_config from .api_handlers.base_handler import BaseSearchHandler diff --git a/execution/sub_question_executor.py b/execution/sub_question_executor.py new file mode 100644 index 0000000..a8c023f --- /dev/null +++ b/execution/sub_question_executor.py @@ -0,0 +1,207 @@ +""" +Sub-question search executor module. + +This module handles the execution of search queries for decomposed sub-questions, +aggregating results from multiple search engines. +""" + +import os +import time +import asyncio +from typing import Dict, List, Any, Optional, Union +import logging +import concurrent.futures + +from config.config import get_config +from .search_executor import SearchExecutor + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class SubQuestionExecutor: + """ + Executes search queries for sub-questions and aggregates results. + """ + + def __init__(self): + """Initialize the sub-question executor.""" + self.search_executor = SearchExecutor() + self.config = get_config() + + async def execute_sub_question_searches(self, + structured_query: Dict[str, Any], + num_results_per_engine: int = 5, + timeout: int = 60) -> Dict[str, Any]: + """ + Execute searches for all sub-questions in a structured query. + + Args: + structured_query: The structured query containing sub-questions + num_results_per_engine: Number of results to return per search engine for each sub-question + timeout: Timeout in seconds for each sub-question's searches + + Returns: + Updated structured query with sub-question search results + """ + # Extract sub-questions from the structured query + sub_questions = structured_query.get('sub_questions', []) + + if not sub_questions: + logger.info("No sub-questions found in the structured query") + return structured_query + + logger.info(f"Executing searches for {len(sub_questions)} sub-questions") + + # Get available search engines + available_engines = self.search_executor.get_available_search_engines() + + # Dictionary to store results for each sub-question + sub_question_results = [] + + # Process sub-questions sequentially to avoid overwhelming APIs + for i, sq in enumerate(sub_questions): + sub_q_text = sq.get('sub_question', '') + aspect = sq.get('aspect', 'unknown') + priority = sq.get('priority', 3) + search_queries = sq.get('search_queries', {}) + + if not sub_q_text: + continue + + logger.info(f"Processing sub-question {i+1}/{len(sub_questions)}: {sub_q_text}") + + # Create a mini structured query for this sub-question + mini_query = { + 'original_query': sub_q_text, + 'enhanced_query': sub_q_text, + 'search_queries': search_queries, + 'is_current_events': structured_query.get('is_current_events', False), + 'is_academic': structured_query.get('is_academic', False), + 'is_code': structured_query.get('is_code', False) + } + + # Execute search for this sub-question + try: + # Use fewer results per engine for sub-questions to keep total result count manageable + sq_results = self.search_executor.execute_search( + structured_query=mini_query, + num_results=num_results_per_engine, + timeout=timeout + ) + + # Log results for each engine + for engine, results in sq_results.items(): + logger.info(f" Engine {engine} returned {len(results)} results") + + # Store results with sub-question metadata + sq_with_results = sq.copy() + sq_with_results['search_results'] = sq_results + sq_with_results['search_result_count'] = sum(len(results) for results in sq_results.values()) + sub_question_results.append(sq_with_results) + + # Add a small delay between sub-questions to avoid rate limiting + if i < len(sub_questions) - 1: + await asyncio.sleep(1) + + except Exception as e: + logger.error(f"Error executing search for sub-question: {str(e)}") + # Add empty results if there was an error + sq_with_results = sq.copy() + sq_with_results['search_results'] = {} + sq_with_results['search_result_count'] = 0 + sq_with_results['error'] = str(e) + sub_question_results.append(sq_with_results) + + # Update the structured query with the results + structured_query['sub_questions'] = sub_question_results + + # Calculate total results + total_results = sum(sq.get('search_result_count', 0) for sq in sub_question_results) + logger.info(f"Completed searches for all sub-questions. Total results: {total_results}") + + return structured_query + + def get_combined_results(self, structured_query: Dict[str, Any]) -> Dict[str, List[Dict[str, Any]]]: + """ + Get a combined view of results from all sub-questions. + + Args: + structured_query: The structured query with sub-question search results + + Returns: + Dictionary mapping search engine names to lists of results + """ + sub_questions = structured_query.get('sub_questions', []) + + if not sub_questions: + return {} + + # Dictionary to store combined results + combined_results = {} + + # Process each sub-question + for sq in sub_questions: + sub_q_text = sq.get('sub_question', '') + aspect = sq.get('aspect', 'unknown') + priority = sq.get('priority', 3) + search_results = sq.get('search_results', {}) + + # Process results from each engine + for engine, results in search_results.items(): + if engine not in combined_results: + combined_results[engine] = [] + + # Add sub-question metadata to each result + for result in results: + if result and isinstance(result, dict): + # Only add metadata if it doesn't already exist + if 'sub_question' not in result: + result['sub_question'] = sub_q_text + if 'aspect' not in result: + result['aspect'] = aspect + if 'priority' not in result: + result['priority'] = priority + + # Add the result to the combined results + combined_results[engine].append(result) + + return combined_results + + def prioritize_results(self, + combined_results: Dict[str, List[Dict[str, Any]]], + max_results_per_engine: int = 10) -> Dict[str, List[Dict[str, Any]]]: + """ + Prioritize results based on sub-question priority. + + Args: + combined_results: Combined results from all sub-questions + max_results_per_engine: Maximum number of results to keep per engine + + Returns: + Dictionary mapping search engine names to prioritized lists of results + """ + prioritized_results = {} + + # Process each engine's results + for engine, results in combined_results.items(): + # Sort results by priority (lower number = higher priority) + sorted_results = sorted(results, key=lambda r: r.get('priority', 5)) + + # Keep only the top N results + prioritized_results[engine] = sorted_results[:max_results_per_engine] + + return prioritized_results + + +# Create a singleton instance for global use +sub_question_executor = SubQuestionExecutor() + +def get_sub_question_executor() -> SubQuestionExecutor: + """ + Get the global sub-question executor instance. + + Returns: + SubQuestionExecutor instance + """ + return sub_question_executor diff --git a/query/query_decomposer.py b/query/query_decomposer.py new file mode 100644 index 0000000..106b3aa --- /dev/null +++ b/query/query_decomposer.py @@ -0,0 +1,245 @@ +""" +Query decomposition module for the intelligent research system. + +This module handles the decomposition of complex queries into sub-questions, +enabling more comprehensive research and better handling of multi-faceted queries. +""" + +from typing import Dict, Any, List, Optional +import asyncio +import logging + +from .llm_interface import get_llm_interface + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class QueryDecomposer: + """ + Decomposer for complex research queries. + + This class handles breaking down complex queries into sub-questions, + which can be processed separately and then synthesized into a comprehensive answer. + """ + + def __init__(self): + """Initialize the query decomposer.""" + self.llm_interface = get_llm_interface() + + async def decompose_query(self, query: str, structured_query: Dict[str, Any]) -> Dict[str, Any]: + """ + Decompose a complex query into sub-questions. + + Args: + query: The original user query + structured_query: The structured query object + + Returns: + Updated structured query with sub-questions + """ + # Skip decomposition for simple queries or specific query types where decomposition isn't helpful + if len(query.split()) < 8: # Skip very short queries + logger.info(f"Query too short for decomposition: {query}") + return structured_query + + # Skip decomposition for code queries as they're usually specific + if structured_query.get('is_code', False): + logger.info(f"Skipping decomposition for code query: {query}") + return structured_query + + # Get query type from the structured query + query_type = structured_query.get('type', 'unknown') + intent = structured_query.get('intent', 'research') + is_current_events = structured_query.get('is_current_events', False) + is_academic = structured_query.get('is_academic', False) + + # Generate sub-questions based on the query and its type + sub_questions = await self._generate_sub_questions( + query, + query_type=query_type, + intent=intent, + is_current_events=is_current_events, + is_academic=is_academic + ) + + # Add the sub-questions to the structured query + structured_query['sub_questions'] = sub_questions + + # Generate additional search queries for each sub-question + if len(sub_questions) > 0: + search_engines = structured_query.get('search_engines', []) + await self._generate_search_queries_for_sub_questions(structured_query, search_engines) + + return structured_query + + async def _generate_sub_questions( + self, + query: str, + query_type: str = 'unknown', + intent: str = 'research', + is_current_events: bool = False, + is_academic: bool = False + ) -> List[Dict[str, Any]]: + """ + Generate sub-questions based on the query and its type. + + Args: + query: The original user query + query_type: The type of query (factual, exploratory, comparative) + intent: The intent of the query + is_current_events: Whether the query is about current events + is_academic: Whether the query is about academic topics + + Returns: + List of sub-questions + """ + logger.info(f"Generating sub-questions for query: {query}") + + # Create prompt based on query type and characteristics + system_prompt = """You are an expert at breaking down complex research questions into smaller, focused sub-questions. + + Your task is to analyze a research query and decompose it into 3-5 distinct sub-questions that, when answered together, will provide a comprehensive response to the original query. + + For each sub-question: + 1. Focus on a single aspect or component of the original query + 2. Make it specific and answerable through targeted search + 3. Ensure it contributes unique information to the overall research + + Return ONLY a JSON array of objects, where each object has: + - "sub_question": The text of the sub-question + - "aspect": A short phrase (2-4 words) describing what aspect of the original query this addresses + - "priority": A number from 1-5 where 1 is highest priority (most important to answer) + + Example output format: + [ + { + "sub_question": "What are the key components of quantum computing hardware?", + "aspect": "hardware components", + "priority": 1 + }, + { + "sub_question": "How does quantum entanglement enable quantum computing?", + "aspect": "quantum principles", + "priority": 2 + } + ] + """ + + # Tailor additional instructions based on query characteristics + if is_current_events: + system_prompt += """ + Since this is a current events query: + - Include a sub-question about recent developments (last 6 months) + - Include a sub-question about historical context if relevant + - Focus on factual aspects rather than opinions + - Consider different stakeholders involved + """ + + if is_academic: + system_prompt += """ + Since this is an academic query: + - Include a sub-question about research methodologies if applicable + - Include a sub-question about competing theories or approaches + - Consider a sub-question about gaps in existing research + - Include a sub-question about practical applications or implications + """ + + if query_type == 'comparative': + system_prompt += """ + Since this is a comparative query: + - Ensure sub-questions address each item being compared + - Include sub-questions about specific comparison dimensions + - Consider including a sub-question about contexts where one option might be preferred + - Include a sub-question about common misconceptions in the comparison + """ + + # Create the prompt for the LLM + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": f"Please decompose this research query into sub-questions: {query}"} + ] + + # Generate sub-questions + try: + response = await self.llm_interface.generate_completion(messages) + + # Parse the response as JSON + import json + # Find JSON array in the response - look for anything between [ and ] + import re + json_match = re.search(r'\[(.*?)\]', response, re.DOTALL) + if json_match: + response = f"[{json_match.group(1)}]" + + sub_questions = json.loads(response) + + # Validate the structure of each sub-question + validated_sub_questions = [] + for sq in sub_questions: + if 'sub_question' in sq and 'aspect' in sq: + # Ensure priority is an integer + if 'priority' not in sq or not isinstance(sq['priority'], int): + sq['priority'] = 3 # Default medium priority + validated_sub_questions.append(sq) + + logger.info(f"Generated {len(validated_sub_questions)} sub-questions for query: {query}") + return validated_sub_questions + except Exception as e: + logger.error(f"Error generating sub-questions: {str(e)}") + return [] + + async def _generate_search_queries_for_sub_questions( + self, + structured_query: Dict[str, Any], + search_engines: List[str] + ) -> Dict[str, Any]: + """ + Generate optimized search queries for each sub-question. + + Args: + structured_query: The structured query containing sub-questions + search_engines: List of search engines to generate queries for + + Returns: + Updated structured query with search queries for sub-questions + """ + sub_questions = structured_query.get('sub_questions', []) + if not sub_questions: + return structured_query + + # Structure to hold search queries for each sub-question + sub_question_search_queries = [] + + # Process each sub-question + for sq in sub_questions: + sub_q_text = sq.get('sub_question', '') + if not sub_q_text: + continue + + # Generate search queries for this sub-question + search_queries = await self.llm_interface.generate_search_queries(sub_q_text, search_engines) + + # Add search queries to the sub-question + sq_with_queries = sq.copy() + sq_with_queries['search_queries'] = search_queries + sub_question_search_queries.append(sq_with_queries) + + # Update the structured query + structured_query['sub_questions'] = sub_question_search_queries + + return structured_query + + +# Create a singleton instance for global use +query_decomposer = QueryDecomposer() + + +def get_query_decomposer() -> QueryDecomposer: + """ + Get the global query decomposer instance. + + Returns: + QueryDecomposer instance + """ + return query_decomposer diff --git a/query/query_processor.py b/query/query_processor.py index 1a95259..338a6cf 100644 --- a/query/query_processor.py +++ b/query/query_processor.py @@ -2,12 +2,18 @@ Query processor module for the intelligent research system. This module handles the processing of user queries, including enhancement, -classification, and structuring for downstream modules. +classification, decomposition, and structuring for downstream modules. """ from typing import Dict, Any, List, Optional +import logging from .llm_interface import get_llm_interface +from .query_decomposer import get_query_decomposer + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) class QueryProcessor: @@ -21,6 +27,7 @@ class QueryProcessor: def __init__(self): """Initialize the query processor.""" self.llm_interface = get_llm_interface() + self.query_decomposer = get_query_decomposer() async def process_query(self, query: str) -> Dict[str, Any]: """ @@ -32,11 +39,15 @@ class QueryProcessor: Returns: Dictionary containing the processed query information """ + logger.info(f"Processing query: {query}") + # Enhance the query enhanced_query = await self.llm_interface.enhance_query(query) + logger.info(f"Enhanced query: {enhanced_query}") # Classify the query classification = await self.llm_interface.classify_query(query) + logger.info(f"Query classification: {classification}") # Extract entities from the classification entities = classification.get('entities', []) @@ -44,6 +55,15 @@ class QueryProcessor: # Structure the query for downstream modules structured_query = self._structure_query(query, enhanced_query, classification) + # Decompose the query into sub-questions (if complex enough) + structured_query = await self.query_decomposer.decompose_query(query, structured_query) + + # Log the number of sub-questions if any + if 'sub_questions' in structured_query and structured_query['sub_questions']: + logger.info(f"Decomposed into {len(structured_query['sub_questions'])} sub-questions") + else: + logger.info("Query was not decomposed into sub-questions") + return structured_query def _structure_query(self, original_query: str, enhanced_query: str, diff --git a/report/__init__.py b/report/__init__.py index 43b1c4e..d03d687 100644 --- a/report/__init__.py +++ b/report/__init__.py @@ -3,17 +3,20 @@ Report generation module for the intelligent research system. This module provides functionality to generate reports from search results by scraping documents, storing them in a database, and synthesizing them -into a comprehensive report. +into a comprehensive report. It also supports the generation of reports +from decomposed sub-questions for more comprehensive research. """ from report.report_generator import get_report_generator, initialize_report_generator from report.document_scraper import get_document_scraper from report.database.db_manager import get_db_manager, initialize_database +from report.sub_question_synthesizer import get_sub_question_synthesizer __all__ = [ 'get_report_generator', 'initialize_report_generator', 'get_document_scraper', 'get_db_manager', - 'initialize_database' + 'initialize_database', + 'get_sub_question_synthesizer' ] diff --git a/report/database/documents.db b/report/database/documents.db index 3d14d56..a3fce87 100644 Binary files a/report/database/documents.db and b/report/database/documents.db differ diff --git a/report/report_generator.py b/report/report_generator.py index 36e2029..b40202c 100644 --- a/report/report_generator.py +++ b/report/report_generator.py @@ -3,7 +3,8 @@ Report generator module for the intelligent research system. This module provides functionality to generate reports from search results by scraping documents, storing them in a database, and synthesizing them -into a comprehensive report. +into a comprehensive report. It also supports generating reports from +decomposed sub-questions for more comprehensive research. """ import os @@ -16,6 +17,7 @@ from report.document_scraper import get_document_scraper from report.document_processor import get_document_processor from report.report_synthesis import get_report_synthesizer from report.progressive_report_synthesis import get_progressive_report_synthesizer +from report.sub_question_synthesizer import get_sub_question_synthesizer from report.report_detail_levels import get_report_detail_level_manager, DetailLevel # Configure logging @@ -38,6 +40,7 @@ class ReportGenerator: self.document_processor = get_document_processor() self.report_synthesizer = get_report_synthesizer() self.progressive_report_synthesizer = get_progressive_report_synthesizer() + self.sub_question_synthesizer = get_sub_question_synthesizer() self.detail_level_manager = get_report_detail_level_manager() self.detail_level = "standard" # Default detail level self.model_name = None # Will use default model based on detail level @@ -189,17 +192,20 @@ class ReportGenerator: def set_progress_callback(self, callback): """ - Set the progress callback for both synthesizers. + Set the progress callback for all synthesizers. Args: callback: Function that takes (current_progress, total, current_report) as arguments """ - # Set the callback for both synthesizers + # Set the callback for all synthesizers if hasattr(self.report_synthesizer, 'set_progress_callback'): self.report_synthesizer.set_progress_callback(callback) if hasattr(self.progressive_report_synthesizer, 'set_progress_callback'): self.progressive_report_synthesizer.set_progress_callback(callback) + + if hasattr(self.sub_question_synthesizer, 'set_progress_callback'): + self.sub_question_synthesizer.set_progress_callback(callback) async def generate_report(self, search_results: List[Dict[str, Any]], @@ -208,7 +214,8 @@ class ReportGenerator: chunk_size: Optional[int] = None, overlap_size: Optional[int] = None, detail_level: Optional[str] = None, - query_type: Optional[str] = None) -> str: + query_type: Optional[str] = None, + structured_query: Optional[Dict[str, Any]] = None) -> str: """ Generate a report from search results. @@ -219,6 +226,8 @@ class ReportGenerator: chunk_size: Maximum number of tokens per chunk overlap_size: Number of tokens to overlap between chunks detail_level: Level of detail for the report (brief, standard, detailed, comprehensive) + query_type: Type of query (factual, exploratory, comparative) + structured_query: Optional structured query object that may contain sub-questions Returns: Generated report as a string @@ -241,8 +250,32 @@ class ReportGenerator: else: logger.info("Using automatic query type detection") - # Choose the appropriate synthesizer based on detail level - if self.detail_level.lower() == "comprehensive": + # Check if we have sub-questions to use + has_sub_questions = ( + structured_query is not None and + 'sub_questions' in structured_query and + structured_query['sub_questions'] + ) + + if has_sub_questions: + # Use sub-question synthesizer if we have sub-questions + sub_questions = structured_query['sub_questions'] + logger.info(f"Using sub-question synthesizer for {len(sub_questions)} sub-questions") + + # Generate report using the sub-question synthesizer + report = await self.sub_question_synthesizer.synthesize_report_with_sub_questions( + selected_chunks, + query, + sub_questions, + query_type=query_type, + detail_level=self.detail_level + ) + + logger.info(f"Generated report using sub-question synthesizer with {len(sub_questions)} sub-questions") + return report + + # If no sub-questions or structured_query is None, use standard synthesizers + elif self.detail_level.lower() == "comprehensive": # Use progressive report synthesizer for comprehensive detail level logger.info(f"Using progressive report synthesizer for {self.detail_level} detail level") report = await self.progressive_report_synthesizer.synthesize_report( diff --git a/report/report_synthesis.py b/report/report_synthesis.py index a76bdc9..5acba99 100644 --- a/report/report_synthesis.py +++ b/report/report_synthesis.py @@ -62,6 +62,8 @@ class ReportSynthesizer: self.progress_callback = None self.total_chunks = 0 self.processed_chunk_count = 0 + self.current_chunk_title = "" + self.current_stage = "preparation" # Can be: preparation, processing, finalizing def set_progress_callback(self, callback): """ @@ -74,9 +76,23 @@ class ReportSynthesizer: def _report_progress(self, current_report=None): """Report progress through the callback if set.""" - if self.progress_callback and self.total_chunks > 0: - progress = min(self.processed_chunk_count / self.total_chunks, 1.0) - self.progress_callback(progress, self.total_chunks, current_report) + if self.progress_callback: + # Calculate progress as a fraction between 0 and 1 + if self.total_chunks > 0: + progress = min(self.processed_chunk_count / self.total_chunks, 1.0) + else: + progress = 0.0 + + # Store current report text for progressive reports + if current_report: + self.current_report_text = current_report + + # Call the progress callback with detailed information + self.progress_callback( + progress, + self.total_chunks, + current_report or getattr(self, 'current_report_text', None) + ) def _setup_provider(self) -> None: """Set up the LLM provider based on the model configuration.""" @@ -120,7 +136,21 @@ class ReportSynthesizer: elif provider == 'openrouter': # For OpenRouter provider params['model'] = self.model_config.get('model_name', self.model_name) - params['api_base'] = self.model_config.get('endpoint') + + # Get the endpoint from the model config and ensure it has the correct format + endpoint = self.model_config.get('endpoint', 'https://openrouter.ai/api') + + # Ensure the endpoint ends with /v1 for OpenRouter API v1 + if not endpoint.endswith('/v1'): + if endpoint.endswith('/'): + endpoint = f"{endpoint}v1" + else: + endpoint = f"{endpoint}/v1" + + params['api_base'] = endpoint + + # Set custom provider for OpenRouter + params['custom_llm_provider'] = 'openrouter' # Set HTTP headers for OpenRouter if needed params['headers'] = { @@ -144,6 +174,14 @@ class ReportSynthesizer: # Set custom provider params['custom_llm_provider'] = 'vertex_ai' + elif provider == 'mistral' or 'mistralai' in self.model_name.lower(): + # Special handling for Mistral models + # Format: mistral/model_name (e.g., mistral/mistral-medium) + model_name = self.model_config.get('model_name', self.model_name) + params['model'] = f"mistral/{model_name}" + + # Add Mistral-specific parameters + params['custom_llm_provider'] = 'mistral' else: # Standard provider (OpenAI, Anthropic, etc.) params['model'] = self.model_name @@ -268,70 +306,68 @@ class ReportSynthesizer: total_chunks = len(chunks) logger.info(f"Starting to process {total_chunks} document chunks") - # Determine batch size based on the model - Gemini can handle larger batches - if "gemini" in self.model_name.lower(): - batch_size = 8 # Larger batch size for Gemini models with 1M token windows - else: - batch_size = 3 # Smaller batch size for other models - - logger.info(f"Using batch size of {batch_size} for model {self.model_name}") + # Update progress tracking state + self.total_chunks = total_chunks + self.processed_chunk_count = 0 + self.current_stage = "processing" + self._report_progress() - for i in range(0, len(chunks), batch_size): - batch = chunks[i:i+batch_size] - logger.info(f"Processing batch {i//batch_size + 1}/{(len(chunks) + batch_size - 1)//batch_size} with {len(batch)} chunks") + # Ensure all chunks have a title, even if it's 'Untitled' + for chunk in chunks: + if chunk.get('title') is None or chunk.get('title') == '': + chunk['title'] = 'Untitled' + + # Process each chunk individually to provide detailed progress updates + for i, chunk in enumerate(chunks): + chunk_title = chunk.get('title', 'Untitled') + chunk_index = i + 1 - # Process this batch - batch_results = [] - for j, chunk in enumerate(batch): - chunk_title = chunk.get('title', 'Untitled') - chunk_index = i + j + 1 - logger.info(f"Processing chunk {chunk_index}/{total_chunks}: {chunk_title[:50] if chunk_title else 'Untitled'}...") + # Update current chunk title for progress reporting + self.current_chunk_title = chunk_title[:50] if chunk_title else 'Untitled' + logger.info(f"Processing chunk {chunk_index}/{total_chunks}: {self.current_chunk_title}...") + + # Create a prompt for extracting key information from the chunk + messages = [ + {"role": "system", "content": extraction_prompt}, + {"role": "user", "content": f"""Query: {query} - # Create a prompt for extracting key information from the chunk - messages = [ - {"role": "system", "content": extraction_prompt}, - {"role": "user", "content": f"""Query: {query} - - Document title: {chunk.get('title', 'Untitled')} - Document URL: {chunk.get('url', 'Unknown')} - - Document chunk content: - {chunk.get('content', '')} - - Extract the most relevant information from this document chunk that addresses the query."""} - ] + Document title: {chunk.get('title', 'Untitled')} + Document URL: {chunk.get('url', 'Unknown')} - try: - # Process the chunk with the LLM - extracted_info = await self.generate_completion(messages) - - # Add the extracted information to the chunk - processed_chunk = chunk.copy() - processed_chunk['extracted_info'] = extracted_info - batch_results.append(processed_chunk) - - # Update progress - self.processed_chunk_count += 1 - self._report_progress() - - logger.info(f"Completed chunk {chunk_index}/{total_chunks} ({chunk_index/total_chunks*100:.1f}% complete)") - except Exception as e: - logger.error(f"Error processing chunk {chunk_index}/{total_chunks}: {str(e)}") - # Add a placeholder for the failed chunk to maintain document order - processed_chunk = chunk.copy() - processed_chunk['extracted_info'] = f"Error extracting information: {str(e)}" - batch_results.append(processed_chunk) - - # Update progress even for failed chunks - self.processed_chunk_count += 1 - self._report_progress() + Document chunk content: + {chunk.get('content', '')} + + Extract the most relevant information from this document chunk that addresses the query."""} + ] - processed_chunks.extend(batch_results) + try: + # Process the chunk with the LLM + extracted_info = await self.generate_completion(messages) + + # Add the extracted information to the chunk + processed_chunk = chunk.copy() + processed_chunk['extracted_info'] = extracted_info + processed_chunks.append(processed_chunk) + + # Update progress + self.processed_chunk_count += 1 + self._report_progress() + + logger.info(f"Completed chunk {chunk_index}/{total_chunks} ({chunk_index/total_chunks*100:.1f}% complete)") + except Exception as e: + logger.error(f"Error processing chunk {chunk_index}/{total_chunks}: {str(e)}") + # Add a placeholder for the failed chunk to maintain document order + processed_chunk = chunk.copy() + processed_chunk['extracted_info'] = f"Error extracting information: {str(e)}" + processed_chunks.append(processed_chunk) + + # Update progress even for failed chunks + self.processed_chunk_count += 1 + self._report_progress() - # Add a small delay between batches to avoid rate limiting - if i + batch_size < len(chunks): - logger.info("Pausing briefly between batches...") - await asyncio.sleep(2) + # Add a small delay between chunks to avoid rate limiting + if i < len(chunks) - 1: + await asyncio.sleep(0.5) logger.info(f"Completed processing all {total_chunks} chunks") return processed_chunks @@ -569,6 +605,11 @@ class ReportSynthesizer: # Reset progress tracking self.total_chunks = len(chunks) self.processed_chunk_count = 0 + self.current_chunk_title = "" + self.current_stage = "preparation" + + # Report initial progress + self._report_progress() # Verify that a template exists for the given query type and detail level template = self._get_template_from_strings(query_type, detail_level) @@ -613,43 +654,32 @@ class ReportSynthesizer: logger.info(f"Starting map phase for {len(chunks)} document chunks with query type '{query_type}' and detail level '{detail_level}'") - # Process chunks in batches to avoid hitting payload limits - # Determine batch size based on the model - Gemini can handle larger batches - if "gemini" in self.model_name.lower(): - batch_size = 8 # Larger batch size for Gemini models with 1M token windows - else: - batch_size = 3 # Smaller batch size for other models - - logger.info(f"Using batch size of {batch_size} for model {self.model_name}") - processed_chunks = [] + # Set stage to processing for progress tracking + self.current_stage = "processing" + self._report_progress() - for i in range(0, len(chunks), batch_size): - batch = chunks[i:i+batch_size] - logger.info(f"Processing batch {i//batch_size + 1}/{(len(chunks) + batch_size - 1)//batch_size} with {len(batch)} chunks") - - # Ensure all chunks have a title, even if it's 'Untitled' - for chunk in batch: - if chunk.get('title') is None: - chunk['title'] = 'Untitled' - - # Process this batch - batch_results = await self.map_document_chunks(batch, query, detail_level, query_type) - processed_chunks.extend(batch_results) - - # Add a small delay between batches to avoid rate limiting - if i + batch_size < len(chunks): - logger.info("Pausing briefly between batches...") - await asyncio.sleep(2) + # Map phase: Process each document chunk to extract key information + logger.info("Starting map phase: Processing document chunks...") + processed_chunks = await self.map_document_chunks(chunks, query, detail_level, query_type) - logger.info(f"Starting reduce phase to synthesize report from {len(processed_chunks)} processed chunks") + # Update stage to finalizing + self.current_stage = "finalizing" + self._report_progress() - # Update progress status for reduce phase + # Reduce phase: Synthesize the processed chunks into a coherent report + logger.info("Starting reduce phase: Synthesizing report...") + + # Report progress before starting the reduce phase if self.progress_callback: self.progress_callback(0.9, self.total_chunks, "Synthesizing final report...") - # Reduce phase: Synthesize processed chunks into a coherent report + # Synthesize the report report = await self.reduce_processed_chunks(processed_chunks, query, query_type, detail_level) + # Set progress to 100% complete + self.processed_chunk_count = self.total_chunks + self._report_progress(report) + # Process thinking tags if enabled if self.process_thinking_tags and "" in report: logger.info("Processing thinking tags in report") diff --git a/report/sub_question_synthesizer.py b/report/sub_question_synthesizer.py new file mode 100644 index 0000000..4afc0b6 --- /dev/null +++ b/report/sub_question_synthesizer.py @@ -0,0 +1,446 @@ +""" +Sub-question synthesis module for the intelligent research system. + +This module provides functionality to synthesize reports that incorporate +structured sub-questions to provide more comprehensive and multi-faceted answers. +""" + +import os +import json +import asyncio +import logging +from typing import Dict, List, Any, Optional, Tuple, Union + +from config.config import get_config +from report.report_synthesis import ReportSynthesizer, get_report_synthesizer + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class SubQuestionSynthesizer: + """ + Handles report synthesis with structured sub-questions. + + This class extends the functionality of the standard report synthesizer + to work with decomposed queries, generating more comprehensive reports + by addressing each sub-question specifically. + """ + + def __init__(self, model_name: Optional[str] = None): + """ + Initialize the sub-question synthesizer. + + Args: + model_name: Name of the LLM model to use. If None, uses the default model + from configuration. + """ + # Initialize the base report synthesizer to leverage its functionality + self.report_synthesizer = get_report_synthesizer(model_name) + self.config = get_config() + + # Keep a reference to the model name for consistency + self.model_name = self.report_synthesizer.model_name + + def set_progress_callback(self, callback): + """Set the progress callback for the underlying report synthesizer.""" + self.report_synthesizer.set_progress_callback(callback) + + async def synthesize_report_with_sub_questions(self, + chunks: List[Dict[str, Any]], + query: str, + sub_questions: List[Dict[str, Any]], + query_type: str = "exploratory", + detail_level: str = "standard") -> str: + """ + Synthesize a report that addresses both the main query and its sub-questions. + + Args: + chunks: List of document chunks + query: Original search query + sub_questions: List of sub-question dictionaries + query_type: Type of query (factual, exploratory, comparative) + detail_level: Level of detail for the report (brief, standard, detailed, comprehensive) + + Returns: + Synthesized report as a string + """ + if not chunks: + logger.warning("No document chunks provided for report synthesis.") + return "No information found for the given query." + + if not sub_questions: + logger.info("No sub-questions provided, falling back to standard report synthesis.") + return await self.report_synthesizer.synthesize_report(chunks, query, query_type, detail_level) + + logger.info(f"Synthesizing report with {len(sub_questions)} sub-questions for query: {query}") + + # Process document chunks using the standard report synthesizer's map phase + processed_chunks = await self.report_synthesizer.map_document_chunks( + chunks, query, detail_level, query_type + ) + + # Group chunks by relevance to sub-questions + # This is a critical step where we determine which chunks are relevant to which sub-questions + grouped_chunks = self._group_chunks_by_sub_questions(processed_chunks, sub_questions, query) + + # Create sections for each sub-question + sections = [] + + # Process each sub-question to create its own section + for i, sq in enumerate(sub_questions): + sub_q_text = sq.get('sub_question', '') + aspect = sq.get('aspect', '') + priority = sq.get('priority', 3) + + # Skip empty sub-questions + if not sub_q_text: + continue + + logger.info(f"Processing sub-question {i+1}/{len(sub_questions)}: {sub_q_text}") + + # Get chunks relevant to this sub-question + relevant_chunks = grouped_chunks.get(i, []) + + if not relevant_chunks: + logger.warning(f"No relevant chunks found for sub-question: {sub_q_text}") + sections.append({ + 'aspect': aspect, + 'sub_question': sub_q_text, + 'priority': priority, + 'content': f"No specific information was found addressing this aspect ({aspect})." + }) + continue + + # Generate content for this sub-question using the relevant chunks + section_content = await self._generate_section_for_sub_question( + relevant_chunks, sub_q_text, query, query_type, detail_level + ) + + # Add the section to the list + sections.append({ + 'aspect': aspect, + 'sub_question': sub_q_text, + 'priority': priority, + 'content': section_content + }) + + # Sort sections by priority (lower number = higher priority) + sections = sorted(sections, key=lambda s: s.get('priority', 5)) + + # Combine all sections into a final report + final_report = await self._combine_sections_into_report( + sections, processed_chunks, query, query_type, detail_level + ) + + return final_report + + def _group_chunks_by_sub_questions(self, + processed_chunks: List[Dict[str, Any]], + sub_questions: List[Dict[str, Any]], + main_query: str) -> Dict[int, List[Dict[str, Any]]]: + """ + Group document chunks by their relevance to each sub-question. + + Args: + processed_chunks: List of processed document chunks + sub_questions: List of sub-question dictionaries + main_query: The original main query + + Returns: + Dictionary mapping sub-question indices to lists of relevant chunks + """ + # Initialize a dictionary to hold chunks relevant to each sub-question + grouped_chunks = {i: [] for i in range(len(sub_questions))} + + # First, check if chunks have 'sub_question' metadata already + pre_grouped = False + for chunk in processed_chunks: + if 'sub_question' in chunk or 'aspect' in chunk: + pre_grouped = True + break + + if pre_grouped: + # If chunks already have sub-question metadata, use that for grouping + logger.info("Using pre-existing sub-question metadata for grouping chunks") + + for chunk in processed_chunks: + sq_text = chunk.get('sub_question', '') + aspect = chunk.get('aspect', '') + + # Find matching sub-questions + for i, sq in enumerate(sub_questions): + if sq_text == sq.get('sub_question') or aspect == sq.get('aspect'): + grouped_chunks[i].append(chunk) + break + else: + # If no match found, add to all groups as potentially relevant + for i in range(len(sub_questions)): + grouped_chunks[i].append(chunk) + else: + # Otherwise, use content matching to determine relevance + logger.info("Using content matching to group chunks by sub-questions") + + # For each chunk, determine which sub-questions it's relevant to + for chunk in processed_chunks: + chunk_content = chunk.get('content', '') + extracted_info = chunk.get('extracted_info', '') + + # Convert to lowercase for case-insensitive matching + content_lower = (chunk_content + " " + extracted_info).lower() + + # Check against each sub-question + assigned = False + for i, sq in enumerate(sub_questions): + sub_q_text = sq.get('sub_question', '').lower() + aspect = sq.get('aspect', '').lower() + + # Calculate a simple relevance score based on keyword presence + relevance_score = 0 + + # Split into words for better matching + sub_q_words = sub_q_text.split() + aspect_words = aspect.split() + + # Check for presence of key terms + for word in sub_q_words: + if len(word) > 3 and word in content_lower: # Ignore short words + relevance_score += 1 + + for word in aspect_words: + if len(word) > 3 and word in content_lower: + relevance_score += 2 # Aspect terms are more important + + # If chunk seems relevant to this sub-question, add it + if relevance_score > 0: + grouped_chunks[i].append(chunk) + assigned = True + + # If chunk wasn't assigned to any sub-question, add it to all of them + # This ensures we don't miss any potentially relevant information + if not assigned: + for i in range(len(sub_questions)): + grouped_chunks[i].append(chunk) + + # Log how many chunks were assigned to each sub-question + for i, chunks in grouped_chunks.items(): + if i < len(sub_questions): + logger.info(f"Sub-question '{sub_questions[i].get('sub_question')}': {len(chunks)} relevant chunks") + + return grouped_chunks + + async def _generate_section_for_sub_question(self, + chunks: List[Dict[str, Any]], + sub_question: str, + main_query: str, + query_type: str, + detail_level: str) -> str: + """ + Generate content for a specific sub-question using the relevant chunks. + + Args: + chunks: List of chunks relevant to this sub-question + sub_question: The text of the sub-question + main_query: The original main query + query_type: Type of query + detail_level: Level of detail for the report + + Returns: + Generated content for this sub-question section + """ + # If no chunks, return placeholder text + if not chunks: + return "No specific information was found addressing this aspect of the query." + + logger.info(f"Generating section for sub-question: {sub_question}") + + # Reduce the processed chunks into a coherent section + # We don't need HTML tags since this will be embedded in the final report + section_content = await self.report_synthesizer.reduce_processed_chunks( + chunks, sub_question, query_type, detail_level + ) + + # Extract just the content without headers and references + # Remove title/header if present (typically the first line with # or ##) + content_lines = section_content.split('\n') + if content_lines and (content_lines[0].startswith('# ') or content_lines[0].startswith('## ')): + content_lines = content_lines[1:] + + # Remove references section if present + if '# References' in section_content: + section_content = section_content.split('# References')[0] + elif '## References' in section_content: + section_content = section_content.split('## References')[0] + + # Clean up any trailing whitespace + section_content = section_content.strip() + + return section_content + + async def _combine_sections_into_report(self, + sections: List[Dict[str, Any]], + all_chunks: List[Dict[str, Any]], + query: str, + query_type: str, + detail_level: str) -> str: + """ + Combine all section contents into a final coherent report. + + Args: + sections: List of section dictionaries with content for each sub-question + all_chunks: All processed chunks (for reference information) + query: Original search query + query_type: Type of query + detail_level: Level of detail for the report + + Returns: + Final synthesized report + """ + logger.info(f"Combining {len(sections)} sections into final report") + + # If no sections, fall back to standard report synthesis + if not sections: + logger.warning("No sections generated, falling back to standard report synthesis") + return await self.report_synthesizer.reduce_processed_chunks( + all_chunks, query, query_type, detail_level + ) + + # Prepare section data for the report + sections_text = "" + for i, section in enumerate(sections): + aspect = section.get('aspect', '') + sub_question = section.get('sub_question', '') + content = section.get('content', '') + + sections_text += f"SECTION {i+1}:\n" + sections_text += f"Aspect: {aspect}\n" + sections_text += f"Sub-question: {sub_question}\n" + sections_text += f"Content: {content}\n\n" + + # Extract URLs and titles for references + references_data = "" + for i, chunk in enumerate(all_chunks): + title = chunk.get('title', 'Untitled') + url = chunk.get('url', '') + if url: + references_data += f"Reference {i+1}: {title} - {url}\n" + + # Get the template for synthesis + template = self.report_synthesizer._get_template_from_strings(query_type, detail_level) + + if not template: + logger.warning(f"No template found for {query_type} {detail_level}, falling back to standard template") + # Fall back to standard detail level if the requested one doesn't exist + detail_level = "standard" + template = self.report_synthesizer._get_template_from_strings("exploratory", "standard") + + # Create the prompt for the final report synthesis + messages = [ + {"role": "system", "content": f"""You are an expert research assistant tasked with creating a comprehensive, well-structured report from pre-written sections. + +The report should address the main query while incorporating multiple sections that each focus on different aspects of the query. + +Your task is to: +1. Create a coherent report that combines these sections +2. Add a proper introduction that presents the main query and previews the aspects covered +3. Ensure smooth transitions between sections +4. Provide a thoughtful conclusion that synthesizes insights from all sections +5. Include a properly formatted references section + +Format the report in Markdown with clear headings, subheadings, and bullet points where appropriate. +Make the report readable, engaging, and informative while maintaining academic rigor. + +{template.template if template else ""} + +IMPORTANT: When including references, use a consistent format: +[1] Title of the Article/Page. URL + +DO NOT use generic placeholders like "Document 1" for references. +ALWAYS include the actual URL from the source documents. +Each reference MUST include both the title and the URL. +Make sure all references are complete and properly formatted. +Number the references sequentially starting from 1. +Include the URL for EACH reference - this is critical."""}, + {"role": "user", "content": f"""Main Query: {query} + +Here are the pre-written sections addressing different aspects of the query: + +{sections_text} + +Here is reference information for citations: + +{references_data} + +Please synthesize these sections into a complete, coherent research report that thoroughly addresses the main query. +The report should have: +1. An informative title +2. A proper introduction that presents the main query and previews the key aspects +3. Well-organized sections with appropriate headings that address each aspect +4. A thoughtful conclusion that synthesizes the key insights +5. Properly formatted references + +Organize the sections in a logical order, use the pre-written content for each section, and ensure smooth transitions between them."""} + ] + + # Generate the final report + final_report = await self.report_synthesizer.generate_completion(messages) + + # Check for potential cutoff issues and fix if needed + if final_report.strip().endswith('[') or final_report.strip().endswith(']') or final_report.strip().endswith('...'): + logger.warning("Final report appears to be cut off at the end. Attempting to fix references section.") + try: + # Extract what we have so far without the incomplete references + if "References" in final_report: + report_without_refs = final_report.split("References")[0].strip() + else: + report_without_refs = final_report + + # Generate just the references section + ref_messages = [ + {"role": "system", "content": """You are an expert at formatting reference lists. Create a properly formatted References section for the documents provided. + + IMPORTANT: + 1. Use the actual title and URL from each document + 2. DO NOT use generic placeholders + 3. Format each reference as: [1] Title of the Article/Page. URL + 4. Each reference MUST include both the title and the URL + 5. Make sure all references are complete and properly formatted + 6. Number the references sequentially starting from 1"""}, + {"role": "user", "content": f"""Here are the document references: + + {references_data} + + Create a complete, properly formatted References section in Markdown format. + Remember to include the URL for EACH reference - this is critical."""} + ] + + references = await self.report_synthesizer.generate_completion(ref_messages) + + # Combine the report with the fixed references + final_report = f"{report_without_refs}\n\n## References\n\n{references}" + + except Exception as e: + logger.error(f"Error fixing references section: {str(e)}") + + return final_report + + +# Create a singleton instance for global use +sub_question_synthesizer = SubQuestionSynthesizer() + +def get_sub_question_synthesizer(model_name: Optional[str] = None) -> SubQuestionSynthesizer: + """ + Get the global sub-question synthesizer instance or create a new one with a specific model. + + Args: + model_name: Optional model name to use instead of the default + + Returns: + SubQuestionSynthesizer instance + """ + global sub_question_synthesizer + + if model_name and model_name != sub_question_synthesizer.model_name: + sub_question_synthesizer = SubQuestionSynthesizer(model_name) + + return sub_question_synthesizer diff --git a/test_openrouter.py b/test_openrouter.py new file mode 100644 index 0000000..0db0570 --- /dev/null +++ b/test_openrouter.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +""" +Test script for OpenRouter model configuration in report synthesis. +""" + +import asyncio +import logging +from report.report_synthesis import get_report_synthesizer + +# Set up logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +async def test_openrouter_model(): + """Test OpenRouter model configuration.""" + logger.info("Testing OpenRouter model configuration...") + + # Get report synthesizer with OpenRouter model + synthesizer = get_report_synthesizer("openrouter-claude-3.7-sonnet") + + # Print model configuration + logger.info(f"Using model: {synthesizer.model_name}") + logger.info(f"Model config: {synthesizer.model_config}") + + # Create a simple test message + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Hello, can you help me with a test?"} + ] + + try: + # Generate completion + logger.info("Generating completion...") + response = await synthesizer.generate_completion(messages) + + # Print response + logger.info(f"Response: {response}") + + return True + except Exception as e: + logger.error(f"Error testing OpenRouter model: {e}") + return False + +async def main(): + """Main function.""" + success = await test_openrouter_model() + + if success: + logger.info("OpenRouter model test successful!") + else: + logger.error("OpenRouter model test failed!") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/test_openrouter_config.py b/test_openrouter_config.py new file mode 100644 index 0000000..71b668c --- /dev/null +++ b/test_openrouter_config.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python +""" +Test script for OpenRouter model configuration with corrected endpoint. +""" + +import asyncio +import logging +import os +from report.report_synthesis import ReportSynthesizer + +# Set up logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +async def test_openrouter_model(): + """Test OpenRouter model configuration with corrected endpoint.""" + logger.info("Testing OpenRouter model configuration with corrected endpoint...") + + # Create a custom model config with the corrected endpoint + model_name = "openrouter-claude-3.7-sonnet" + model_config = { + "provider": "openrouter", + "model_name": "anthropic/claude-3.7-sonnet", + "temperature": 0.5, + "max_tokens": 2048, + "top_p": 1.0, + "endpoint": "https://openrouter.ai/api/v1" # Corrected endpoint + } + + # We need to modify the config directly since ReportSynthesizer doesn't accept model_config + # Import the config module + from config.config import get_config + + # Get the config instance + config = get_config() + + # Save the original config to restore later + original_config = None + if model_name in config.config_data.get('models', {}): + original_config = config.config_data['models'][model_name].copy() + + # Update with corrected endpoint + if 'models' not in config.config_data: + config.config_data['models'] = {} + + config.config_data['models'][model_name] = model_config + + # Create a synthesizer with the model name + synthesizer = ReportSynthesizer(model_name=model_name) + + # Print model configuration + logger.info(f"Using model: {synthesizer.model_name}") + logger.info(f"Model config: {synthesizer.model_config}") + + # Create a simple test message + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Hello, can you help me with a test?"} + ] + + try: + # Generate completion + logger.info("Generating completion...") + response = await synthesizer.generate_completion(messages) + + # Print response + logger.info(f"Response: {response}") + + return True + except Exception as e: + logger.error(f"Error testing OpenRouter model: {e}") + return False + +async def main(): + """Main function.""" + success = await test_openrouter_model() + + if success: + logger.info("OpenRouter model test successful!") + else: + logger.error("OpenRouter model test failed!") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/ui/gradio_interface.py b/ui/gradio_interface.py index 44d60a0..4a85e13 100644 --- a/ui/gradio_interface.py +++ b/ui/gradio_interface.py @@ -18,6 +18,7 @@ sys.path.append(str(Path(__file__).parent.parent)) from query.query_processor import QueryProcessor from execution.search_executor import SearchExecutor from execution.result_collector import ResultCollector +from execution.sub_question_executor import get_sub_question_executor from report.report_generator import get_report_generator, initialize_report_generator from report.report_detail_levels import get_report_detail_level_manager, DetailLevel from config.config import Config @@ -31,6 +32,7 @@ class GradioInterface: self.query_processor = QueryProcessor() self.search_executor = SearchExecutor() self.result_collector = ResultCollector() + self.sub_question_executor = get_sub_question_executor() self.results_dir = Path(__file__).parent.parent / "results" self.results_dir.mkdir(exist_ok=True) self.reports_dir = Path(__file__).parent.parent @@ -41,9 +43,7 @@ class GradioInterface: # The report generator will be initialized in the async init method self.report_generator = None - # Progress tracking elements (will be set in create_interface) - self.report_progress = None - self.report_progress_bar = None + # We're using Gradio's built-in progress tracking (gr.Progress) instead of custom elements async def async_init(self): """Asynchronously initialize components that require async initialization.""" @@ -269,19 +269,64 @@ class GradioInterface: self.search_executor.get_available_search_engines() ) + # Check if the query was decomposed into sub-questions + has_sub_questions = 'sub_questions' in structured_query and structured_query['sub_questions'] + if has_sub_questions: + # Log sub-questions + print(f"Query was decomposed into {len(structured_query['sub_questions'])} sub-questions:") + for i, sq in enumerate(structured_query['sub_questions']): + print(f" {i+1}. {sq.get('sub_question')} (aspect: {sq.get('aspect')}, priority: {sq.get('priority')})") + + # Execute searches for sub-questions + progress(0.1, desc="Executing searches for sub-questions...") + structured_query = await self.sub_question_executor.execute_sub_question_searches( + structured_query, + num_results_per_engine=3 # Use fewer results per engine for sub-questions + ) + + # Get combined results from sub-questions + sub_question_results = self.sub_question_executor.get_combined_results(structured_query) + print(f"Sub-questions returned results from {len(sub_question_results)} engines") + + # Prioritize results from sub-questions + sub_question_results = self.sub_question_executor.prioritize_results( + sub_question_results, + max_results_per_engine=num_results_to_fetch # Use same limit as main query + ) + progress(0.2, desc="Completed sub-question searches") + # Execute the search with the structured query # Use initial_results_per_engine if available, otherwise fall back to num_results num_results_to_fetch = config.get("initial_results_per_engine", config.get("num_results", 10)) + + # Execute main search + progress(0.3, desc="Executing main search...") search_results_dict = self.search_executor.execute_search( structured_query, num_results=num_results_to_fetch ) # Add debug logging - print(f"Search results by engine:") + print(f"Main search results by engine:") for engine, results in search_results_dict.items(): print(f" {engine}: {len(results)} results") + # If we have sub-question results, combine them with the main search results + if has_sub_questions and 'sub_questions' in structured_query: + print("Combining main search results with sub-question results") + progress(0.4, desc="Combining results from sub-questions...") + + # Merge results from sub-questions into the main search results + for engine, results in sub_question_results.items(): + if engine in search_results_dict: + # Add sub-question results to the main results + search_results_dict[engine].extend(results) + print(f" Added {len(results)} results from sub-questions to {engine}") + else: + # Engine only has sub-question results + search_results_dict[engine] = results + print(f" Added {len(results)} results from sub-questions as new engine {engine}") + # Flatten the search results search_results = [] for engine_results in search_results_dict.values(): @@ -381,10 +426,6 @@ class GradioInterface: # This will properly update the UI during async operations progress(current_progress, desc=status_message) - # Also update our custom UI elements - self.report_progress.value = status_message - self.report_progress_bar.value = int(current_progress * 100) - return status_message self.report_generator.set_progress_callback(ui_progress_callback) @@ -400,9 +441,7 @@ class GradioInterface: else: self.progress_status = "Processing document chunks..." - # Set up initial progress state - self.report_progress.value = "Preparing documents..." - self.report_progress_bar.value = 0 + # Initial progress state is handled by Gradio's built-in progress tracking # Handle query_type parameter actual_query_type = None @@ -419,7 +458,8 @@ class GradioInterface: chunk_size=config["chunk_size"], overlap_size=config["overlap_size"], detail_level=detail_level, - query_type=actual_query_type + query_type=actual_query_type, + structured_query=structured_query if 'sub_questions' in structured_query else None ) # Final progress update @@ -648,26 +688,9 @@ class GradioInterface: with gr.Row(): report_button = gr.Button("Generate Report", variant="primary", size="lg") - with gr.Row(): - with gr.Column(): - # Progress indicator that will be updated by the progress callback - self.report_progress = gr.Textbox( - label="Progress Status", - value="Ready", - interactive=False - ) - - with gr.Row(): - with gr.Column(): - # Progress bar to show visual progress - self.report_progress_bar = gr.Slider( - minimum=0, - maximum=100, - value=0, - step=1, - label="Progress", - interactive=False - ) + # Note: We've removed the redundant progress indicators here + # The built-in Gradio progress tracking (gr.Progress) is used instead + # This is passed to the generate_report method and handles progress updates gr.Examples( examples=[ @@ -717,9 +740,7 @@ class GradioInterface: ) # Connect the progress callback to the report button - def update_progress_display(progress_value, status_message): - percentage = int(progress_value * 100) - return status_message, percentage + # Progress display is now handled entirely by Gradio's built-in progress tracking # Update the progress tracking in the generate_report method async def generate_report_with_progress(query, detail_level, query_type, model_name, rerank, token_budget, initial_results, final_results): diff --git a/update_max_tokens.py b/update_max_tokens.py new file mode 100644 index 0000000..75a5bd0 --- /dev/null +++ b/update_max_tokens.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +""" +Script to update the max_tokens parameter for OpenRouter models in the configuration. +""" + +import json +import logging +import os +from config.config import get_config + +# Set up logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +def update_openrouter_max_tokens(model_name="openrouter-claude-3.7-sonnet", new_max_tokens=8000): + """ + Update the max_tokens parameter for an OpenRouter model in the configuration. + + Args: + model_name: Name of the OpenRouter model to update + new_max_tokens: New value for max_tokens parameter + """ + logger.info(f"Updating max_tokens for {model_name} to {new_max_tokens}...") + + # Get the config instance + config = get_config() + + # Check if the model exists in the configuration + if 'models' not in config.config_data: + logger.error("No models section found in configuration") + return False + + if model_name not in config.config_data['models']: + logger.error(f"Model {model_name} not found in configuration") + return False + + # Get the current model configuration + model_config = config.config_data['models'][model_name] + + # Print current configuration + logger.info(f"Current configuration for {model_name}:") + logger.info(json.dumps(model_config, indent=2)) + + # Update the max_tokens parameter + old_max_tokens = model_config.get('max_tokens', 2048) + model_config['max_tokens'] = new_max_tokens + + # Update the configuration + config.config_data['models'][model_name] = model_config + + # Save the configuration (in-memory only, as we can't modify the file directly) + logger.info(f"Updated max_tokens for {model_name} from {old_max_tokens} to {new_max_tokens}") + logger.info(f"New configuration for {model_name}:") + logger.info(json.dumps(model_config, indent=2)) + + logger.info("Configuration updated in memory. The next time you run a report, it will use the new max_tokens value.") + logger.info("Note: This change is temporary and will be reset when the application restarts.") + logger.info("To make the change permanent, you need to update the config.yaml file directly.") + + return True + +def main(): + """Main function.""" + # Update max_tokens for Claude 3.7 Sonnet + update_openrouter_max_tokens("openrouter-claude-3.7-sonnet", 8000) + + # You can also update other OpenRouter models if needed + # update_openrouter_max_tokens("openrouter-mixtral", 8000) + +if __name__ == "__main__": + main()