""" Sub-question synthesis module for the intelligent research system. This module provides functionality to synthesize reports that incorporate structured sub-questions to provide more comprehensive and multi-faceted answers. """ import os import json import asyncio import logging from typing import Dict, List, Any, Optional, Tuple, Union from config.config import get_config from report.report_synthesis import ReportSynthesizer, get_report_synthesizer # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class SubQuestionSynthesizer: """ Handles report synthesis with structured sub-questions. This class extends the functionality of the standard report synthesizer to work with decomposed queries, generating more comprehensive reports by addressing each sub-question specifically. """ def __init__(self, model_name: Optional[str] = None): """ Initialize the sub-question synthesizer. Args: model_name: Name of the LLM model to use. If None, uses the default model from configuration. """ # Initialize the base report synthesizer to leverage its functionality self.report_synthesizer = get_report_synthesizer(model_name) self.config = get_config() # Keep a reference to the model name for consistency self.model_name = self.report_synthesizer.model_name def set_progress_callback(self, callback): """Set the progress callback for the underlying report synthesizer.""" self.report_synthesizer.set_progress_callback(callback) async def synthesize_report_with_sub_questions(self, chunks: List[Dict[str, Any]], query: str, sub_questions: List[Dict[str, Any]], query_type: str = "exploratory", detail_level: str = "standard") -> str: """ Synthesize a report that addresses both the main query and its sub-questions. Args: chunks: List of document chunks query: Original search query sub_questions: List of sub-question dictionaries query_type: Type of query (factual, exploratory, comparative) detail_level: Level of detail for the report (brief, standard, detailed, comprehensive) Returns: Synthesized report as a string """ if not chunks: logger.warning("No document chunks provided for report synthesis.") return "No information found for the given query." if not sub_questions: logger.info("No sub-questions provided, falling back to standard report synthesis.") return await self.report_synthesizer.synthesize_report(chunks, query, query_type, detail_level) logger.info(f"Synthesizing report with {len(sub_questions)} sub-questions for query: {query}") # Process document chunks using the standard report synthesizer's map phase processed_chunks = await self.report_synthesizer.map_document_chunks( chunks, query, detail_level, query_type ) # Group chunks by relevance to sub-questions # This is a critical step where we determine which chunks are relevant to which sub-questions grouped_chunks = self._group_chunks_by_sub_questions(processed_chunks, sub_questions, query) # Create sections for each sub-question sections = [] # Process each sub-question to create its own section for i, sq in enumerate(sub_questions): sub_q_text = sq.get('sub_question', '') aspect = sq.get('aspect', '') priority = sq.get('priority', 3) # Skip empty sub-questions if not sub_q_text: continue logger.info(f"Processing sub-question {i+1}/{len(sub_questions)}: {sub_q_text}") # Get chunks relevant to this sub-question relevant_chunks = grouped_chunks.get(i, []) if not relevant_chunks: logger.warning(f"No relevant chunks found for sub-question: {sub_q_text}") sections.append({ 'aspect': aspect, 'sub_question': sub_q_text, 'priority': priority, 'content': f"No specific information was found addressing this aspect ({aspect})." }) continue # Generate content for this sub-question using the relevant chunks section_content = await self._generate_section_for_sub_question( relevant_chunks, sub_q_text, query, query_type, detail_level ) # Add the section to the list sections.append({ 'aspect': aspect, 'sub_question': sub_q_text, 'priority': priority, 'content': section_content }) # Sort sections by priority (lower number = higher priority) sections = sorted(sections, key=lambda s: s.get('priority', 5)) # Combine all sections into a final report final_report = await self._combine_sections_into_report( sections, processed_chunks, query, query_type, detail_level ) return final_report def _group_chunks_by_sub_questions(self, processed_chunks: List[Dict[str, Any]], sub_questions: List[Dict[str, Any]], main_query: str) -> Dict[int, List[Dict[str, Any]]]: """ Group document chunks by their relevance to each sub-question. Args: processed_chunks: List of processed document chunks sub_questions: List of sub-question dictionaries main_query: The original main query Returns: Dictionary mapping sub-question indices to lists of relevant chunks """ # Initialize a dictionary to hold chunks relevant to each sub-question grouped_chunks = {i: [] for i in range(len(sub_questions))} # First, check if chunks have 'sub_question' metadata already pre_grouped = False for chunk in processed_chunks: if 'sub_question' in chunk or 'aspect' in chunk: pre_grouped = True break if pre_grouped: # If chunks already have sub-question metadata, use that for grouping logger.info("Using pre-existing sub-question metadata for grouping chunks") for chunk in processed_chunks: sq_text = chunk.get('sub_question', '') aspect = chunk.get('aspect', '') # Find matching sub-questions for i, sq in enumerate(sub_questions): if sq_text == sq.get('sub_question') or aspect == sq.get('aspect'): grouped_chunks[i].append(chunk) break else: # If no match found, add to all groups as potentially relevant for i in range(len(sub_questions)): grouped_chunks[i].append(chunk) else: # Otherwise, use content matching to determine relevance logger.info("Using content matching to group chunks by sub-questions") # For each chunk, determine which sub-questions it's relevant to for chunk in processed_chunks: chunk_content = chunk.get('content', '') extracted_info = chunk.get('extracted_info', '') # Convert to lowercase for case-insensitive matching content_lower = (chunk_content + " " + extracted_info).lower() # Check against each sub-question assigned = False for i, sq in enumerate(sub_questions): sub_q_text = sq.get('sub_question', '').lower() aspect = sq.get('aspect', '').lower() # Calculate a simple relevance score based on keyword presence relevance_score = 0 # Split into words for better matching sub_q_words = sub_q_text.split() aspect_words = aspect.split() # Check for presence of key terms for word in sub_q_words: if len(word) > 3 and word in content_lower: # Ignore short words relevance_score += 1 for word in aspect_words: if len(word) > 3 and word in content_lower: relevance_score += 2 # Aspect terms are more important # If chunk seems relevant to this sub-question, add it if relevance_score > 0: grouped_chunks[i].append(chunk) assigned = True # If chunk wasn't assigned to any sub-question, add it to all of them # This ensures we don't miss any potentially relevant information if not assigned: for i in range(len(sub_questions)): grouped_chunks[i].append(chunk) # Log how many chunks were assigned to each sub-question for i, chunks in grouped_chunks.items(): if i < len(sub_questions): logger.info(f"Sub-question '{sub_questions[i].get('sub_question')}': {len(chunks)} relevant chunks") return grouped_chunks async def _generate_section_for_sub_question(self, chunks: List[Dict[str, Any]], sub_question: str, main_query: str, query_type: str, detail_level: str) -> str: """ Generate content for a specific sub-question using the relevant chunks. Args: chunks: List of chunks relevant to this sub-question sub_question: The text of the sub-question main_query: The original main query query_type: Type of query detail_level: Level of detail for the report Returns: Generated content for this sub-question section """ # If no chunks, return placeholder text if not chunks: return "No specific information was found addressing this aspect of the query." logger.info(f"Generating section for sub-question: {sub_question}") # Reduce the processed chunks into a coherent section # We don't need HTML tags since this will be embedded in the final report section_content = await self.report_synthesizer.reduce_processed_chunks( chunks, sub_question, query_type, detail_level ) # Extract just the content without headers and references # Remove title/header if present (typically the first line with # or ##) content_lines = section_content.split('\n') if content_lines and (content_lines[0].startswith('# ') or content_lines[0].startswith('## ')): content_lines = content_lines[1:] # Remove references section if present if '# References' in section_content: section_content = section_content.split('# References')[0] elif '## References' in section_content: section_content = section_content.split('## References')[0] # Clean up any trailing whitespace section_content = section_content.strip() return section_content async def _combine_sections_into_report(self, sections: List[Dict[str, Any]], all_chunks: List[Dict[str, Any]], query: str, query_type: str, detail_level: str) -> str: """ Combine all section contents into a final coherent report. Args: sections: List of section dictionaries with content for each sub-question all_chunks: All processed chunks (for reference information) query: Original search query query_type: Type of query detail_level: Level of detail for the report Returns: Final synthesized report """ logger.info(f"Combining {len(sections)} sections into final report") # If no sections, fall back to standard report synthesis if not sections: logger.warning("No sections generated, falling back to standard report synthesis") return await self.report_synthesizer.reduce_processed_chunks( all_chunks, query, query_type, detail_level ) # Prepare section data for the report sections_text = "" for i, section in enumerate(sections): aspect = section.get('aspect', '') sub_question = section.get('sub_question', '') content = section.get('content', '') sections_text += f"SECTION {i+1}:\n" sections_text += f"Aspect: {aspect}\n" sections_text += f"Sub-question: {sub_question}\n" sections_text += f"Content: {content}\n\n" # Extract URLs and titles for references references_data = "" for i, chunk in enumerate(all_chunks): title = chunk.get('title', 'Untitled') url = chunk.get('url', '') if url: references_data += f"Reference {i+1}: {title} - {url}\n" # Get the template for synthesis template = self.report_synthesizer._get_template_from_strings(query_type, detail_level) if not template: logger.warning(f"No template found for {query_type} {detail_level}, falling back to standard template") # Fall back to standard detail level if the requested one doesn't exist detail_level = "standard" template = self.report_synthesizer._get_template_from_strings("exploratory", "standard") # Create the prompt for the final report synthesis messages = [ {"role": "system", "content": f"""You are an expert research assistant tasked with creating a comprehensive, well-structured report from pre-written sections. The report should address the main query while incorporating multiple sections that each focus on different aspects of the query. Your task is to: 1. Create a coherent report that combines these sections 2. Add a proper introduction that presents the main query and previews the aspects covered 3. Ensure smooth transitions between sections 4. Provide a thoughtful conclusion that synthesizes insights from all sections 5. Include a properly formatted references section Format the report in Markdown with clear headings, subheadings, and bullet points where appropriate. Make the report readable, engaging, and informative while maintaining academic rigor. {template.template if template else ""} IMPORTANT: When including references, use a consistent format: [1] Title of the Article/Page. URL DO NOT use generic placeholders like "Document 1" for references. ALWAYS include the actual URL from the source documents. Each reference MUST include both the title and the URL. Make sure all references are complete and properly formatted. Number the references sequentially starting from 1. Include the URL for EACH reference - this is critical."""}, {"role": "user", "content": f"""Main Query: {query} Here are the pre-written sections addressing different aspects of the query: {sections_text} Here is reference information for citations: {references_data} Please synthesize these sections into a complete, coherent research report that thoroughly addresses the main query. The report should have: 1. An informative title 2. A proper introduction that presents the main query and previews the key aspects 3. Well-organized sections with appropriate headings that address each aspect 4. A thoughtful conclusion that synthesizes the key insights 5. Properly formatted references Organize the sections in a logical order, use the pre-written content for each section, and ensure smooth transitions between them."""} ] # Generate the final report final_report = await self.report_synthesizer.generate_completion(messages) # Check for potential cutoff issues and fix if needed if final_report.strip().endswith('[') or final_report.strip().endswith(']') or final_report.strip().endswith('...'): logger.warning("Final report appears to be cut off at the end. Attempting to fix references section.") try: # Extract what we have so far without the incomplete references if "References" in final_report: report_without_refs = final_report.split("References")[0].strip() else: report_without_refs = final_report # Generate just the references section ref_messages = [ {"role": "system", "content": """You are an expert at formatting reference lists. Create a properly formatted References section for the documents provided. IMPORTANT: 1. Use the actual title and URL from each document 2. DO NOT use generic placeholders 3. Format each reference as: [1] Title of the Article/Page. URL 4. Each reference MUST include both the title and the URL 5. Make sure all references are complete and properly formatted 6. Number the references sequentially starting from 1"""}, {"role": "user", "content": f"""Here are the document references: {references_data} Create a complete, properly formatted References section in Markdown format. Remember to include the URL for EACH reference - this is critical."""} ] references = await self.report_synthesizer.generate_completion(ref_messages) # Combine the report with the fixed references final_report = f"{report_without_refs}\n\n## References\n\n{references}" except Exception as e: logger.error(f"Error fixing references section: {str(e)}") return final_report # Create a singleton instance for global use sub_question_synthesizer = SubQuestionSynthesizer() def get_sub_question_synthesizer(model_name: Optional[str] = None) -> SubQuestionSynthesizer: """ Get the global sub-question synthesizer instance or create a new one with a specific model. Args: model_name: Optional model name to use instead of the default Returns: SubQuestionSynthesizer instance """ global sub_question_synthesizer if model_name and model_name != sub_question_synthesizer.model_name: sub_question_synthesizer = SubQuestionSynthesizer(model_name) return sub_question_synthesizer