""" SQLite database manager for the report generation module. This module provides functionality to create, manage, and query the SQLite database for storing scraped documents and their metadata. """ import os import json import aiosqlite import asyncio import logging from datetime import datetime from typing import Dict, List, Any, Optional, Tuple, Union # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DBManager: """ Database manager for the report generation module. This class provides methods to create, manage, and query the SQLite database for storing scraped documents and their metadata. """ def __init__(self, db_path: str = "report/database/documents.db"): """ Initialize the database manager. Args: db_path: Path to the SQLite database file """ self.db_path = db_path self._ensure_dir_exists() def _ensure_dir_exists(self): """Ensure the directory for the database file exists.""" db_dir = os.path.dirname(self.db_path) if not os.path.exists(db_dir): os.makedirs(db_dir) logger.info(f"Created directory: {db_dir}") async def initialize_db(self): """ Initialize the database by creating necessary tables if they don't exist. This method creates the documents and metadata tables. """ async with aiosqlite.connect(self.db_path) as db: # Create documents table await db.execute(''' CREATE TABLE IF NOT EXISTS documents ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT UNIQUE NOT NULL, title TEXT, content TEXT NOT NULL, scrape_date TIMESTAMP NOT NULL, content_type TEXT, token_count INTEGER, hash TEXT UNIQUE ) ''') # Create metadata table await db.execute(''' CREATE TABLE IF NOT EXISTS metadata ( id INTEGER PRIMARY KEY AUTOINCREMENT, document_id INTEGER NOT NULL, key TEXT NOT NULL, value TEXT, FOREIGN KEY (document_id) REFERENCES documents (id) ON DELETE CASCADE, UNIQUE (document_id, key) ) ''') # Create index on url for faster lookups await db.execute('CREATE INDEX IF NOT EXISTS idx_documents_url ON documents (url)') # Create index on document_id for faster metadata lookups await db.execute('CREATE INDEX IF NOT EXISTS idx_metadata_document_id ON metadata (document_id)') await db.commit() logger.info("Database initialized successfully") async def document_exists(self, url: str) -> bool: """ Check if a document with the given URL already exists in the database. Args: url: URL of the document to check Returns: True if the document exists, False otherwise """ async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute('SELECT id FROM documents WHERE url = ?', (url,)) result = await cursor.fetchone() return result is not None async def get_document_by_url(self, url: str) -> Optional[Dict[str, Any]]: """ Get a document by its URL. Args: url: URL of the document to retrieve Returns: Document as a dictionary, or None if not found """ async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(''' SELECT id, url, title, content, scrape_date, content_type, token_count, hash FROM documents WHERE url = ? ''', (url,)) document = await cursor.fetchone() if not document: return None # Convert to dictionary doc_dict = dict(document) # Get metadata cursor = await db.execute(''' SELECT key, value FROM metadata WHERE document_id = ? ''', (doc_dict['id'],)) metadata = await cursor.fetchall() doc_dict['metadata'] = {row['key']: row['value'] for row in metadata} return doc_dict async def add_document(self, url: str, title: str, content: str, content_type: str, token_count: int, metadata: Dict[str, str], doc_hash: str) -> int: """ Add a document to the database. Args: url: URL of the document title: Title of the document content: Content of the document content_type: Type of content (e.g., 'markdown', 'html', 'text') token_count: Number of tokens in the document metadata: Dictionary of metadata key-value pairs doc_hash: Hash of the document content for deduplication Returns: ID of the added document Raises: aiosqlite.Error: If there's an error adding the document """ async with aiosqlite.connect(self.db_path) as db: try: # Begin transaction await db.execute('BEGIN TRANSACTION') # Insert document cursor = await db.execute(''' INSERT INTO documents (url, title, content, scrape_date, content_type, token_count, hash) VALUES (?, ?, ?, ?, ?, ?, ?) ''', (url, title, content, datetime.now().isoformat(), content_type, token_count, doc_hash)) document_id = cursor.lastrowid # Insert metadata for key, value in metadata.items(): await db.execute(''' INSERT INTO metadata (document_id, key, value) VALUES (?, ?, ?) ''', (document_id, key, value)) # Commit transaction await db.commit() logger.info(f"Added document: {url} (ID: {document_id})") return document_id except aiosqlite.Error as e: # Rollback transaction on error await db.execute('ROLLBACK') logger.error(f"Error adding document: {str(e)}") raise async def update_document(self, document_id: int, content: str = None, title: str = None, token_count: int = None, metadata: Dict[str, str] = None) -> bool: """ Update an existing document in the database. Args: document_id: ID of the document to update content: New content (optional) title: New title (optional) token_count: New token count (optional) metadata: New or updated metadata (optional) Returns: True if the document was updated, False otherwise Raises: aiosqlite.Error: If there's an error updating the document """ async with aiosqlite.connect(self.db_path) as db: try: # Begin transaction await db.execute('BEGIN TRANSACTION') # Update document fields if provided update_parts = [] params = [] if content is not None: update_parts.append("content = ?") params.append(content) if title is not None: update_parts.append("title = ?") params.append(title) if token_count is not None: update_parts.append("token_count = ?") params.append(token_count) if update_parts: update_query = f"UPDATE documents SET {', '.join(update_parts)} WHERE id = ?" params.append(document_id) await db.execute(update_query, params) # Update metadata if provided if metadata: for key, value in metadata.items(): # Check if metadata key exists cursor = await db.execute(''' SELECT id FROM metadata WHERE document_id = ? AND key = ? ''', (document_id, key)) result = await cursor.fetchone() if result: # Update existing metadata await db.execute(''' UPDATE metadata SET value = ? WHERE document_id = ? AND key = ? ''', (value, document_id, key)) else: # Insert new metadata await db.execute(''' INSERT INTO metadata (document_id, key, value) VALUES (?, ?, ?) ''', (document_id, key, value)) # Commit transaction await db.commit() logger.info(f"Updated document ID: {document_id}") return True except aiosqlite.Error as e: # Rollback transaction on error await db.execute('ROLLBACK') logger.error(f"Error updating document: {str(e)}") raise async def delete_document(self, document_id: int) -> bool: """ Delete a document from the database. Args: document_id: ID of the document to delete Returns: True if the document was deleted, False otherwise """ async with aiosqlite.connect(self.db_path) as db: try: # Begin transaction await db.execute('BEGIN TRANSACTION') # Delete document (metadata will be deleted via ON DELETE CASCADE) await db.execute('DELETE FROM documents WHERE id = ?', (document_id,)) # Commit transaction await db.commit() logger.info(f"Deleted document ID: {document_id}") return True except aiosqlite.Error as e: # Rollback transaction on error await db.execute('ROLLBACK') logger.error(f"Error deleting document: {str(e)}") return False async def search_documents(self, query: str, limit: int = 10, offset: int = 0) -> List[Dict[str, Any]]: """ Search for documents matching the query. Args: query: Search query (will be matched against title and content) limit: Maximum number of results to return offset: Number of results to skip Returns: List of matching documents as dictionaries """ async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row # Search documents cursor = await db.execute(''' SELECT id, url, title, content, scrape_date, content_type, token_count FROM documents WHERE title LIKE ? OR content LIKE ? ORDER BY scrape_date DESC LIMIT ? OFFSET ? ''', (f'%{query}%', f'%{query}%', limit, offset)) documents = await cursor.fetchall() results = [] # Get metadata for each document for doc in documents: doc_dict = dict(doc) cursor = await db.execute(''' SELECT key, value FROM metadata WHERE document_id = ? ''', (doc_dict['id'],)) metadata = await cursor.fetchall() doc_dict['metadata'] = {row['key']: row['value'] for row in metadata} results.append(doc_dict) return results async def get_documents_by_urls(self, urls: List[str]) -> List[Dict[str, Any]]: """ Get multiple documents by their URLs. Args: urls: List of URLs to retrieve Returns: List of documents as dictionaries """ results = [] for url in urls: doc = await self.get_document_by_url(url) if doc: results.append(doc) return results async def count_documents(self) -> int: """ Get the total number of documents in the database. Returns: Number of documents """ async with aiosqlite.connect(self.db_path) as db: cursor = await db.execute('SELECT COUNT(*) as count FROM documents') result = await cursor.fetchone() return result[0] if result else 0 # Create a singleton instance for global use db_manager = DBManager() async def initialize_database(): """Initialize the database.""" await db_manager.initialize_db() def get_db_manager() -> DBManager: """ Get the global database manager instance. Returns: DBManager instance """ return db_manager # Run database initialization if this module is executed directly if __name__ == "__main__": asyncio.run(initialize_database())