From fd92885a65785b07e57e9fa7d1c6af68f9a704f3 Mon Sep 17 00:00:00 2001 From: Steve White Date: Fri, 28 Feb 2025 10:38:01 -0600 Subject: [PATCH] Fix rate limit issues with comprehensive reports and add chunk processing progress indicators --- report/report_detail_levels.py | 8 ++-- report/report_synthesis.py | 73 ++++++++++++++++++++++++++-------- 2 files changed, 60 insertions(+), 21 deletions(-) diff --git a/report/report_detail_levels.py b/report/report_detail_levels.py index 16da163..c317bd5 100644 --- a/report/report_detail_levels.py +++ b/report/report_detail_levels.py @@ -56,10 +56,10 @@ class ReportDetailLevelManager: "description": "A comprehensive report with in-depth analysis, methodology, and implications." }, DetailLevel.COMPREHENSIVE: { - "num_results": 20, - "token_budget": 200000, - "chunk_size": 1500, - "overlap_size": 200, + "num_results": 10, + "token_budget": 120000, + "chunk_size": 1000, + "overlap_size": 100, "model": "llama-3.3-70b-versatile", "description": "An exhaustive report with all available information, extensive analysis, and detailed references." } diff --git a/report/report_synthesis.py b/report/report_synthesis.py index 2e4693c..e085621 100644 --- a/report/report_synthesis.py +++ b/report/report_synthesis.py @@ -177,7 +177,13 @@ class ReportSynthesizer: # Get the appropriate extraction prompt based on detail level extraction_prompt = self._get_extraction_prompt(detail_level) - for chunk in chunks: + total_chunks = len(chunks) + logger.info(f"Starting to process {total_chunks} document chunks") + + for i, chunk in enumerate(chunks, 1): + chunk_title = chunk.get('title', 'Untitled') + logger.info(f"Processing chunk {i}/{total_chunks}: {chunk_title[:50]}...") + # Create a prompt for extracting key information from the chunk messages = [ {"role": "system", "content": extraction_prompt}, @@ -192,15 +198,24 @@ class ReportSynthesizer: Extract the most relevant information from this document chunk that addresses the query."""} ] - # Process the chunk with the LLM - extracted_info = await self.generate_completion(messages) - - # Add the extracted information to the chunk - processed_chunk = chunk.copy() - processed_chunk['extracted_info'] = extracted_info - - processed_chunks.append(processed_chunk) + try: + # Process the chunk with the LLM + extracted_info = await self.generate_completion(messages) + + # Add the extracted information to the chunk + processed_chunk = chunk.copy() + processed_chunk['extracted_info'] = extracted_info + processed_chunks.append(processed_chunk) + + logger.info(f"Completed chunk {i}/{total_chunks} ({(i/total_chunks)*100:.1f}% complete)") + except Exception as e: + logger.error(f"Error processing chunk {i}/{total_chunks}: {str(e)}") + # Add a placeholder for the failed chunk to maintain document order + processed_chunk = chunk.copy() + processed_chunk['extracted_info'] = f"Error extracting information: {str(e)}" + processed_chunks.append(processed_chunk) + logger.info(f"Completed processing all {total_chunks} chunks") return processed_chunks def _get_extraction_prompt(self, detail_level: str) -> str: @@ -304,9 +319,14 @@ class ReportSynthesizer: Returns: Synthesized report as a string """ - logger.info(f"Synthesizing report for query: {query}") - logger.info(f"Using {len(chunks)} document chunks") - logger.info(f"Detail level: {detail_level}") + if not chunks: + logger.warning("No document chunks provided for report synthesis.") + return "No information found for the given query." + + # Get detail level configuration + detail_level_manager = get_report_detail_level_manager() + config = detail_level_manager.get_detail_level_config(detail_level) + token_budget = config.get("token_budget", 100000) # Determine query type if not specified if query_type == "exploratory": @@ -318,15 +338,34 @@ class ReportSynthesizer: logger.info(f"Query type determined as: {query_type}") - # Map phase: Process individual document chunks - logger.info("Starting map phase: Processing individual document chunks") + # Estimate total tokens in chunks + total_tokens = sum(len(chunk.get('content', '').split()) * 1.3 for chunk in chunks) # Rough estimate + logger.info(f"Estimated total tokens in {len(chunks)} chunks: {total_tokens}") + + # If total tokens exceeds 80% of the token budget, reduce the number of chunks + if total_tokens > token_budget * 0.8: + max_chunks = int(len(chunks) * (token_budget * 0.8 / total_tokens)) + max_chunks = max(3, max_chunks) # Ensure we have at least 3 chunks + logger.warning(f"Token count ({total_tokens}) exceeds 80% of budget ({token_budget}). Reducing chunks from {len(chunks)} to {max_chunks}.") + chunks = chunks[:max_chunks] + # Recalculate estimated tokens + total_tokens = sum(len(chunk.get('content', '').split()) * 1.3 for chunk in chunks) + logger.info(f"Reduced to {len(chunks)} chunks with estimated {total_tokens} tokens") + + logger.info(f"Starting map phase for {len(chunks)} document chunks with query type '{query_type}' and detail level '{detail_level}'") + + # Map phase: Process individual chunks to extract key information processed_chunks = await self.map_document_chunks(chunks, query, detail_level) - logger.info(f"Map phase complete: Processed {len(processed_chunks)} chunks") + + logger.info(f"Starting reduce phase to synthesize report from {len(processed_chunks)} processed chunks") # Reduce phase: Synthesize processed chunks into a coherent report - logger.info("Starting reduce phase: Synthesizing processed chunks into a report") report = await self.reduce_processed_chunks(processed_chunks, query, query_type, detail_level) - logger.info("Reduce phase complete: Report generated") + + # Process thinking tags if enabled + if self.process_thinking_tags and "" in report: + logger.info("Processing thinking tags in report") + report = self._process_thinking_tags(report) return report