""" Document processor module for the report generation module. This module provides functionality to prioritize documents based on relevance scores, chunk long documents into manageable pieces, and select the most relevant chunks to stay within token budget limits. """ import re import math import logging import tiktoken from typing import Dict, List, Any, Optional, Tuple, Union, Set from datetime import datetime from report.database.db_manager import get_db_manager # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DocumentProcessor: """ Document processor for the report generation module. This class provides methods to prioritize documents based on relevance scores, chunk long documents into manageable pieces, and select the most relevant chunks to stay within token budget limits. """ def __init__(self, default_token_limit: int = 120000): """ Initialize the document processor. Args: default_token_limit: Default token limit for the context window """ self.db_manager = get_db_manager() self.default_token_limit = default_token_limit self.tokenizer = tiktoken.get_encoding("cl100k_base") # Using OpenAI's tokenizer def _count_tokens(self, text: str) -> int: """ Count the number of tokens in a text. Args: text: The text to count tokens for Returns: Number of tokens in the text """ return len(self.tokenizer.encode(text)) def prioritize_documents(self, documents: List[Dict[str, Any]], relevance_scores: Optional[Dict[str, float]] = None, recency_weight: float = 0.3, token_count_weight: float = 0.2) -> List[Dict[str, Any]]: """ Prioritize documents based on relevance scores, recency, and token count. Args: documents: List of documents to prioritize relevance_scores: Dictionary mapping document URLs to relevance scores recency_weight: Weight for recency in the prioritization score token_count_weight: Weight for token count in the prioritization score Returns: List of documents sorted by priority score """ # If no relevance scores provided, use equal scores for all documents if relevance_scores is None: relevance_scores = {doc['url']: 1.0 for doc in documents} # Get current time for recency calculation current_time = datetime.now() # Calculate priority scores for doc in documents: # Relevance score (normalized to 0-1) relevance_score = relevance_scores.get(doc['url'], 0.0) # Recency score (normalized to 0-1) try: doc_time = datetime.fromisoformat(doc['scrape_date']) time_diff = (current_time - doc_time).total_seconds() / 86400 # Convert to days recency_score = 1.0 / (1.0 + time_diff) # Newer documents get higher scores except (KeyError, ValueError): recency_score = 0.5 # Default if scrape_date is missing or invalid # Token count score (normalized to 0-1) # Prefer documents with more tokens, but not too many token_count = doc.get('token_count', 0) token_count_score = min(token_count / 5000, 1.0) # Normalize to 0-1 # Calculate final priority score relevance_weight = 1.0 - recency_weight - token_count_weight priority_score = ( relevance_weight * relevance_score + recency_weight * recency_score + token_count_weight * token_count_score ) # Add priority score to document doc['priority_score'] = priority_score # Sort documents by priority score (descending) return sorted(documents, key=lambda x: x.get('priority_score', 0.0), reverse=True) def chunk_document_by_sections(self, document: Dict[str, Any], max_chunk_tokens: int = 1000, overlap_tokens: int = 100) -> List[Dict[str, Any]]: """ Chunk a document by sections based on Markdown headers. Args: document: Document to chunk max_chunk_tokens: Maximum number of tokens per chunk overlap_tokens: Number of tokens to overlap between chunks Returns: List of document chunks """ content = document.get('content', '') # If content is empty, return empty list if not content.strip(): return [] # Find all headers in the content header_pattern = re.compile(r'^(#{1,6})\s+(.+)$', re.MULTILINE) headers = list(header_pattern.finditer(content)) # If no headers found, use fixed-size chunking if not headers: return self.chunk_document_fixed_size(document, max_chunk_tokens, overlap_tokens) chunks = [] # Process each section (from one header to the next) for i in range(len(headers)): start_pos = headers[i].start() # Determine end position (next header or end of content) if i < len(headers) - 1: end_pos = headers[i + 1].start() else: end_pos = len(content) section_content = content[start_pos:end_pos] section_tokens = self._count_tokens(section_content) # If section is small enough, add it as a single chunk if section_tokens <= max_chunk_tokens: chunks.append({ 'document_id': document.get('id'), 'url': document.get('url'), 'title': document.get('title'), 'content': section_content, 'token_count': section_tokens, 'chunk_type': 'section', 'section_title': headers[i].group(2), 'section_level': len(headers[i].group(1)), 'priority_score': document.get('priority_score', 0.0) }) else: # If section is too large, split it into fixed-size chunks section_chunks = self._split_text_fixed_size( section_content, max_chunk_tokens, overlap_tokens ) for j, chunk_content in enumerate(section_chunks): chunk_tokens = self._count_tokens(chunk_content) chunks.append({ 'document_id': document.get('id'), 'url': document.get('url'), 'title': document.get('title'), 'content': chunk_content, 'token_count': chunk_tokens, 'chunk_type': 'section_part', 'section_title': headers[i].group(2), 'section_level': len(headers[i].group(1)), 'section_part': j + 1, 'total_parts': len(section_chunks), 'priority_score': document.get('priority_score', 0.0) }) return chunks def chunk_document_fixed_size(self, document: Dict[str, Any], max_chunk_tokens: int = 1000, overlap_tokens: int = 100) -> List[Dict[str, Any]]: """ Chunk a document into fixed-size chunks with overlap. Args: document: Document to chunk max_chunk_tokens: Maximum number of tokens per chunk overlap_tokens: Number of tokens to overlap between chunks Returns: List of document chunks """ content = document.get('content', '') # If content is empty, return empty list if not content.strip(): return [] # Split content into fixed-size chunks content_chunks = self._split_text_fixed_size(content, max_chunk_tokens, overlap_tokens) chunks = [] # Create chunk objects for i, chunk_content in enumerate(content_chunks): chunk_tokens = self._count_tokens(chunk_content) chunks.append({ 'document_id': document.get('id'), 'url': document.get('url'), 'title': document.get('title'), 'content': chunk_content, 'token_count': chunk_tokens, 'chunk_type': 'fixed', 'chunk_index': i + 1, 'total_chunks': len(content_chunks), 'priority_score': document.get('priority_score', 0.0) }) return chunks def _split_text_fixed_size(self, text: str, max_chunk_tokens: int = 1000, overlap_tokens: int = 100) -> List[str]: """ Split text into fixed-size chunks with overlap. Args: text: Text to split max_chunk_tokens: Maximum number of tokens per chunk overlap_tokens: Number of tokens to overlap between chunks Returns: List of text chunks """ # Encode text into tokens tokens = self.tokenizer.encode(text) # If text is small enough, return as a single chunk if len(tokens) <= max_chunk_tokens: return [text] # Calculate number of chunks needed num_chunks = math.ceil((len(tokens) - overlap_tokens) / (max_chunk_tokens - overlap_tokens)) chunks = [] # Split tokens into chunks for i in range(num_chunks): # Calculate start and end positions start_pos = i * (max_chunk_tokens - overlap_tokens) end_pos = min(start_pos + max_chunk_tokens, len(tokens)) # Extract chunk tokens chunk_tokens = tokens[start_pos:end_pos] # Decode chunk tokens back to text chunk_text = self.tokenizer.decode(chunk_tokens) chunks.append(chunk_text) return chunks def chunk_document_hierarchical(self, document: Dict[str, Any], max_chunk_tokens: int = 1000, overlap_tokens: int = 100) -> List[Dict[str, Any]]: """ Chunk a very large document using a hierarchical approach. This method first chunks the document by sections, then further chunks large sections into smaller pieces. Args: document: Document to chunk max_chunk_tokens: Maximum number of tokens per chunk overlap_tokens: Number of tokens to overlap between chunks Returns: List of document chunks """ # First, chunk by sections section_chunks = self.chunk_document_by_sections(document, max_chunk_tokens, overlap_tokens) # If the document is small enough, return section chunks if sum(chunk.get('token_count', 0) for chunk in section_chunks) <= max_chunk_tokens * 3: return section_chunks # Otherwise, create a summary chunk and keep the most important sections content = document.get('content', '') title = document.get('title', '') # Extract first paragraph as summary first_para_match = re.search(r'^(.*?)\n\n', content, re.DOTALL) summary = first_para_match.group(1) if first_para_match else content[:500] # Create summary chunk summary_chunk = { 'document_id': document.get('id'), 'url': document.get('url'), 'title': title, 'content': f"# {title}\n\n{summary}\n\n(This is a summary of a large document)", 'token_count': self._count_tokens(f"# {title}\n\n{summary}\n\n(This is a summary of a large document)"), 'chunk_type': 'summary', 'priority_score': document.get('priority_score', 0.0) * 1.2 # Boost summary priority } # Sort section chunks by priority (section level and position) def section_priority(chunk): # Prioritize by section level (lower is more important) level_score = 6 - chunk.get('section_level', 3) # Prioritize earlier sections position_score = 1.0 / (1.0 + chunk.get('chunk_index', 0) + chunk.get('section_part', 0)) return level_score * position_score sorted_sections = sorted(section_chunks, key=section_priority, reverse=True) # Return summary chunk and top sections return [summary_chunk] + sorted_sections def select_chunks_for_context(self, chunks: List[Dict[str, Any]], token_budget: int, min_chunks_per_doc: int = 1) -> List[Dict[str, Any]]: """ Select chunks to include in the context window based on token budget. Args: chunks: List of document chunks token_budget: Maximum number of tokens to use min_chunks_per_doc: Minimum number of chunks to include per document Returns: List of selected chunks """ # Group chunks by document doc_chunks = {} for chunk in chunks: doc_id = chunk.get('document_id') if doc_id not in doc_chunks: doc_chunks[doc_id] = [] doc_chunks[doc_id].append(chunk) # Sort chunks within each document by priority for doc_id in doc_chunks: doc_chunks[doc_id] = sorted( doc_chunks[doc_id], key=lambda x: x.get('priority_score', 0.0), reverse=True ) # Select at least min_chunks_per_doc from each document selected_chunks = [] remaining_budget = token_budget # First pass: select minimum chunks from each document for doc_id, chunks in doc_chunks.items(): for i in range(min(min_chunks_per_doc, len(chunks))): chunk = chunks[i] selected_chunks.append(chunk) remaining_budget -= chunk.get('token_count', 0) # If we've exceeded the budget, sort selected chunks and trim if remaining_budget <= 0: selected_chunks = sorted( selected_chunks, key=lambda x: x.get('priority_score', 0.0), reverse=True ) # Keep adding chunks until we exceed the budget current_budget = 0 for i, chunk in enumerate(selected_chunks): current_budget += chunk.get('token_count', 0) if current_budget > token_budget: selected_chunks = selected_chunks[:i] break return selected_chunks # Second pass: add more chunks based on priority until budget is exhausted # Flatten remaining chunks from all documents remaining_chunks = [] for doc_id, chunks in doc_chunks.items(): if len(chunks) > min_chunks_per_doc: remaining_chunks.extend(chunks[min_chunks_per_doc:]) # Sort remaining chunks by priority remaining_chunks = sorted( remaining_chunks, key=lambda x: x.get('priority_score', 0.0), reverse=True ) # Add chunks until budget is exhausted for chunk in remaining_chunks: if chunk.get('token_count', 0) <= remaining_budget: selected_chunks.append(chunk) remaining_budget -= chunk.get('token_count', 0) if remaining_budget <= 0: break return selected_chunks def process_documents_for_report(self, documents: List[Dict[str, Any]], relevance_scores: Optional[Dict[str, float]] = None, token_budget: Optional[int] = None, chunk_size: int = 1000, overlap_size: int = 100) -> List[Dict[str, Any]]: """ Process documents for report generation. This method prioritizes documents, chunks them, and selects the most relevant chunks to stay within the token budget. Args: documents: List of documents to process relevance_scores: Dictionary mapping document URLs to relevance scores token_budget: Maximum number of tokens to use (default: self.default_token_limit) chunk_size: Maximum number of tokens per chunk overlap_size: Number of tokens to overlap between chunks Returns: List of selected document chunks """ if token_budget is None: token_budget = self.default_token_limit # Prioritize documents prioritized_docs = self.prioritize_documents(documents, relevance_scores) # Chunk documents all_chunks = [] for doc in prioritized_docs: # Choose chunking strategy based on document size token_count = doc.get('token_count', 0) if token_count > chunk_size * 10: # Very large document: use hierarchical chunking chunks = self.chunk_document_hierarchical(doc, chunk_size, overlap_size) elif token_count > chunk_size: # Medium document: use section-based chunking chunks = self.chunk_document_by_sections(doc, chunk_size, overlap_size) else: # Small document: keep as a single chunk chunks = [{ 'document_id': doc.get('id'), 'url': doc.get('url'), 'title': doc.get('title'), 'content': doc.get('content', ''), 'token_count': token_count, 'chunk_type': 'full', 'priority_score': doc.get('priority_score', 0.0) }] all_chunks.extend(chunks) # Select chunks based on token budget selected_chunks = self.select_chunks_for_context(all_chunks, token_budget) # Log statistics total_docs = len(documents) total_chunks = len(all_chunks) selected_chunk_count = len(selected_chunks) selected_token_count = sum(chunk.get('token_count', 0) for chunk in selected_chunks) logger.info(f"Processed {total_docs} documents into {total_chunks} chunks") logger.info(f"Selected {selected_chunk_count} chunks with {selected_token_count} tokens") return selected_chunks # Create a singleton instance for global use document_processor = DocumentProcessor() def get_document_processor() -> DocumentProcessor: """ Get the global document processor instance. Returns: DocumentProcessor instance """ return document_processor