143 lines
4.9 KiB
Python
143 lines
4.9 KiB
Python
"""
|
|
Test script for the document scraper module.
|
|
|
|
This script tests the functionality of the document scraper module
|
|
by scraping a few sample URLs and storing them in the database.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import asyncio
|
|
import logging
|
|
from typing import List, Dict, Any
|
|
|
|
# Add parent directory to path to allow importing modules
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from report.database.db_manager import initialize_database, get_db_manager
|
|
from report.document_scraper import get_document_scraper
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Sample URLs for testing
|
|
TEST_URLS = [
|
|
"https://en.wikipedia.org/wiki/Web_scraping",
|
|
"https://en.wikipedia.org/wiki/Natural_language_processing",
|
|
"https://en.wikipedia.org/wiki/SQLite"
|
|
]
|
|
|
|
async def test_document_scraper():
|
|
"""Test the document scraper with sample URLs."""
|
|
# Initialize database
|
|
await initialize_database()
|
|
logger.info("Database initialized")
|
|
|
|
# Get document scraper
|
|
scraper = get_document_scraper()
|
|
|
|
# Scrape URLs
|
|
logger.info(f"Scraping {len(TEST_URLS)} URLs...")
|
|
documents = await scraper.scrape_urls(TEST_URLS)
|
|
|
|
# Print results
|
|
logger.info(f"Successfully scraped {len(documents)} documents")
|
|
for doc in documents:
|
|
logger.info(f"Title: {doc['title']}")
|
|
logger.info(f"URL: {doc['url']}")
|
|
logger.info(f"Token count: {doc['token_count']}")
|
|
logger.info(f"Content preview: {doc['content'][:200]}...")
|
|
logger.info("-" * 80)
|
|
|
|
# Test database search
|
|
db_manager = get_db_manager()
|
|
search_results = await db_manager.search_documents("scraping")
|
|
logger.info(f"Found {len(search_results)} documents matching 'scraping'")
|
|
|
|
# Test document retrieval by URL
|
|
doc = await db_manager.get_document_by_url(TEST_URLS[0])
|
|
if doc:
|
|
logger.info(f"Retrieved document by URL: {doc['title']}")
|
|
else:
|
|
logger.error(f"Failed to retrieve document by URL: {TEST_URLS[0]}")
|
|
|
|
# Count documents in database
|
|
count = await db_manager.count_documents()
|
|
logger.info(f"Total documents in database: {count}")
|
|
|
|
return True
|
|
|
|
async def test_document_scraper_single_url(url, use_mock=False):
|
|
"""
|
|
Test the document scraper with a single URL.
|
|
|
|
Args:
|
|
url: The URL to scrape
|
|
use_mock: If True, use mock data instead of making actual API calls
|
|
"""
|
|
# Get document scraper
|
|
document_scraper = get_document_scraper(use_mock=use_mock)
|
|
|
|
logger.info(f"Testing document scraper with URL: {url}")
|
|
logger.info(f"Using mock data: {use_mock}")
|
|
|
|
# Scrape the URL
|
|
document = await document_scraper.scrape_url(url)
|
|
|
|
if document:
|
|
logger.info(f"Successfully scraped document: {document.get('title')}")
|
|
logger.info(f"URL: {document.get('url')}")
|
|
logger.info(f"Token count: {document.get('token_count')}")
|
|
content_preview = document.get('content', '')[:200] + '...' if document.get('content') else 'No content'
|
|
logger.info(f"Content snippet: {content_preview}")
|
|
|
|
# Print metadata
|
|
logger.info("\nMetadata:")
|
|
for key, value in document.get('metadata', {}).items():
|
|
logger.info(f" {key}: {value}")
|
|
else:
|
|
logger.info(f"Failed to scrape document: {url}")
|
|
|
|
async def clear_database():
|
|
"""Clear the document database."""
|
|
from report.database.db_manager import get_db_manager
|
|
|
|
# Get database manager
|
|
db_manager = get_db_manager()
|
|
|
|
# Clear the database
|
|
await db_manager.clear_database()
|
|
logger.info("Database cleared")
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description='Test the document scraper')
|
|
parser.add_argument('--url', type=str, default='https://fastapi.tiangolo.com/', help='URL to scrape')
|
|
parser.add_argument('--mock', action='store_true', help='Use mock data instead of making actual API calls')
|
|
parser.add_argument('--run-all', action='store_true', help='Run all tests')
|
|
parser.add_argument('--clear-db', action='store_true', help='Clear the database')
|
|
args = parser.parse_args()
|
|
|
|
if args.run_all:
|
|
try:
|
|
success = asyncio.run(test_document_scraper())
|
|
if success:
|
|
logger.info("All tests passed!")
|
|
sys.exit(0)
|
|
else:
|
|
logger.error("Tests failed!")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logger.exception(f"Error running tests: {str(e)}")
|
|
sys.exit(1)
|
|
elif args.clear_db:
|
|
try:
|
|
asyncio.run(clear_database())
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
logger.exception(f"Error clearing database: {str(e)}")
|
|
sys.exit(1)
|
|
else:
|
|
asyncio.run(test_document_scraper_single_url(args.url, use_mock=args.mock))
|