Fix rate limit issues with comprehensive reports and add chunk processing progress indicators

This commit is contained in:
Steve White 2025-02-28 10:38:01 -06:00
parent 9d9fea8b5b
commit fd92885a65
2 changed files with 60 additions and 21 deletions

View File

@ -56,10 +56,10 @@ class ReportDetailLevelManager:
"description": "A comprehensive report with in-depth analysis, methodology, and implications." "description": "A comprehensive report with in-depth analysis, methodology, and implications."
}, },
DetailLevel.COMPREHENSIVE: { DetailLevel.COMPREHENSIVE: {
"num_results": 20, "num_results": 10,
"token_budget": 200000, "token_budget": 120000,
"chunk_size": 1500, "chunk_size": 1000,
"overlap_size": 200, "overlap_size": 100,
"model": "llama-3.3-70b-versatile", "model": "llama-3.3-70b-versatile",
"description": "An exhaustive report with all available information, extensive analysis, and detailed references." "description": "An exhaustive report with all available information, extensive analysis, and detailed references."
} }

View File

@ -177,7 +177,13 @@ class ReportSynthesizer:
# Get the appropriate extraction prompt based on detail level # Get the appropriate extraction prompt based on detail level
extraction_prompt = self._get_extraction_prompt(detail_level) extraction_prompt = self._get_extraction_prompt(detail_level)
for chunk in chunks: total_chunks = len(chunks)
logger.info(f"Starting to process {total_chunks} document chunks")
for i, chunk in enumerate(chunks, 1):
chunk_title = chunk.get('title', 'Untitled')
logger.info(f"Processing chunk {i}/{total_chunks}: {chunk_title[:50]}...")
# Create a prompt for extracting key information from the chunk # Create a prompt for extracting key information from the chunk
messages = [ messages = [
{"role": "system", "content": extraction_prompt}, {"role": "system", "content": extraction_prompt},
@ -192,15 +198,24 @@ class ReportSynthesizer:
Extract the most relevant information from this document chunk that addresses the query."""} Extract the most relevant information from this document chunk that addresses the query."""}
] ]
# Process the chunk with the LLM try:
extracted_info = await self.generate_completion(messages) # Process the chunk with the LLM
extracted_info = await self.generate_completion(messages)
# Add the extracted information to the chunk
processed_chunk = chunk.copy() # Add the extracted information to the chunk
processed_chunk['extracted_info'] = extracted_info processed_chunk = chunk.copy()
processed_chunk['extracted_info'] = extracted_info
processed_chunks.append(processed_chunk) processed_chunks.append(processed_chunk)
logger.info(f"Completed chunk {i}/{total_chunks} ({(i/total_chunks)*100:.1f}% complete)")
except Exception as e:
logger.error(f"Error processing chunk {i}/{total_chunks}: {str(e)}")
# Add a placeholder for the failed chunk to maintain document order
processed_chunk = chunk.copy()
processed_chunk['extracted_info'] = f"Error extracting information: {str(e)}"
processed_chunks.append(processed_chunk)
logger.info(f"Completed processing all {total_chunks} chunks")
return processed_chunks return processed_chunks
def _get_extraction_prompt(self, detail_level: str) -> str: def _get_extraction_prompt(self, detail_level: str) -> str:
@ -304,9 +319,14 @@ class ReportSynthesizer:
Returns: Returns:
Synthesized report as a string Synthesized report as a string
""" """
logger.info(f"Synthesizing report for query: {query}") if not chunks:
logger.info(f"Using {len(chunks)} document chunks") logger.warning("No document chunks provided for report synthesis.")
logger.info(f"Detail level: {detail_level}") return "No information found for the given query."
# Get detail level configuration
detail_level_manager = get_report_detail_level_manager()
config = detail_level_manager.get_detail_level_config(detail_level)
token_budget = config.get("token_budget", 100000)
# Determine query type if not specified # Determine query type if not specified
if query_type == "exploratory": if query_type == "exploratory":
@ -318,15 +338,34 @@ class ReportSynthesizer:
logger.info(f"Query type determined as: {query_type}") logger.info(f"Query type determined as: {query_type}")
# Map phase: Process individual document chunks # Estimate total tokens in chunks
logger.info("Starting map phase: Processing individual document chunks") total_tokens = sum(len(chunk.get('content', '').split()) * 1.3 for chunk in chunks) # Rough estimate
logger.info(f"Estimated total tokens in {len(chunks)} chunks: {total_tokens}")
# If total tokens exceeds 80% of the token budget, reduce the number of chunks
if total_tokens > token_budget * 0.8:
max_chunks = int(len(chunks) * (token_budget * 0.8 / total_tokens))
max_chunks = max(3, max_chunks) # Ensure we have at least 3 chunks
logger.warning(f"Token count ({total_tokens}) exceeds 80% of budget ({token_budget}). Reducing chunks from {len(chunks)} to {max_chunks}.")
chunks = chunks[:max_chunks]
# Recalculate estimated tokens
total_tokens = sum(len(chunk.get('content', '').split()) * 1.3 for chunk in chunks)
logger.info(f"Reduced to {len(chunks)} chunks with estimated {total_tokens} tokens")
logger.info(f"Starting map phase for {len(chunks)} document chunks with query type '{query_type}' and detail level '{detail_level}'")
# Map phase: Process individual chunks to extract key information
processed_chunks = await self.map_document_chunks(chunks, query, detail_level) processed_chunks = await self.map_document_chunks(chunks, query, detail_level)
logger.info(f"Map phase complete: Processed {len(processed_chunks)} chunks")
logger.info(f"Starting reduce phase to synthesize report from {len(processed_chunks)} processed chunks")
# Reduce phase: Synthesize processed chunks into a coherent report # Reduce phase: Synthesize processed chunks into a coherent report
logger.info("Starting reduce phase: Synthesizing processed chunks into a report")
report = await self.reduce_processed_chunks(processed_chunks, query, query_type, detail_level) report = await self.reduce_processed_chunks(processed_chunks, query, query_type, detail_level)
logger.info("Reduce phase complete: Report generated")
# Process thinking tags if enabled
if self.process_thinking_tags and "<thinking>" in report:
logger.info("Processing thinking tags in report")
report = self._process_thinking_tags(report)
return report return report