511 lines
18 KiB
Python
511 lines
18 KiB
Python
"""
|
|
Document scraper module for the report generation module.
|
|
|
|
This module provides functionality to scrape web pages and extract clean content
|
|
using Jina Reader API or fallback methods.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
import hashlib
|
|
import logging
|
|
import asyncio
|
|
import aiohttp
|
|
import validators
|
|
import tiktoken
|
|
from typing import Dict, List, Any, Optional, Tuple, Union
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse, urljoin
|
|
from bs4 import BeautifulSoup
|
|
import html2text
|
|
|
|
from config.config import get_config
|
|
from report.database.db_manager import get_db_manager, DBManager
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DocumentScraper:
|
|
"""
|
|
Document scraper for the report generation module.
|
|
|
|
This class provides methods to scrape web pages and extract clean content
|
|
using Jina Reader API or fallback methods.
|
|
"""
|
|
|
|
def __init__(self, use_mock: bool = False):
|
|
"""
|
|
Initialize the document scraper.
|
|
|
|
Args:
|
|
use_mock: If True, use mock data instead of making actual API calls
|
|
"""
|
|
self.config = get_config()
|
|
self.api_key = self._get_api_key()
|
|
self.endpoint = "https://api.jina.ai/v1/reader"
|
|
self.db_manager = get_db_manager()
|
|
self.tokenizer = tiktoken.get_encoding("cl100k_base") # Using OpenAI's tokenizer
|
|
self.use_mock = use_mock
|
|
self.jina_api_available = self.api_key != ""
|
|
|
|
def _get_api_key(self) -> str:
|
|
"""
|
|
Get the Jina AI API key.
|
|
|
|
Returns:
|
|
The API key as a string
|
|
|
|
Raises:
|
|
ValueError: If the API key is not found
|
|
"""
|
|
try:
|
|
return self.config.get_api_key('jina')
|
|
except ValueError as e:
|
|
logger.warning(f"Jina AI API key not found. Fallback methods will be used. {str(e)}")
|
|
return ""
|
|
|
|
def _count_tokens(self, text: str) -> int:
|
|
"""
|
|
Count the number of tokens in a text.
|
|
|
|
Args:
|
|
text: The text to count tokens for
|
|
|
|
Returns:
|
|
Number of tokens in the text
|
|
"""
|
|
return len(self.tokenizer.encode(text))
|
|
|
|
def _compute_hash(self, content: str) -> str:
|
|
"""
|
|
Compute a hash of the document content for deduplication.
|
|
|
|
Args:
|
|
content: The document content
|
|
|
|
Returns:
|
|
Hash of the content
|
|
"""
|
|
return hashlib.sha256(content.encode('utf-8')).hexdigest()
|
|
|
|
def _normalize_url(self, url: str) -> str:
|
|
"""
|
|
Normalize a URL by removing fragments and unnecessary query parameters.
|
|
|
|
Args:
|
|
url: The URL to normalize
|
|
|
|
Returns:
|
|
Normalized URL
|
|
"""
|
|
parsed = urlparse(url)
|
|
# Remove fragment
|
|
normalized = parsed._replace(fragment="")
|
|
|
|
# TODO: Add more normalization rules if needed
|
|
|
|
return normalized.geturl()
|
|
|
|
def _validate_url(self, url: str) -> bool:
|
|
"""
|
|
Validate a URL.
|
|
|
|
Args:
|
|
url: The URL to validate
|
|
|
|
Returns:
|
|
True if the URL is valid, False otherwise
|
|
"""
|
|
return validators.url(url) is True
|
|
|
|
async def _extract_metadata_from_html(self, html: str, url: str) -> Dict[str, str]:
|
|
"""
|
|
Extract metadata from HTML content.
|
|
|
|
Args:
|
|
html: The HTML content
|
|
url: The URL of the page
|
|
|
|
Returns:
|
|
Dictionary of metadata
|
|
"""
|
|
metadata = {
|
|
"source_url": url,
|
|
"scrape_date": datetime.now().isoformat()
|
|
}
|
|
|
|
try:
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
|
|
# Extract title
|
|
if soup.title:
|
|
metadata["title"] = soup.title.string
|
|
|
|
# Extract meta tags
|
|
for meta in soup.find_all('meta'):
|
|
# Author
|
|
if meta.get('name') and meta.get('name').lower() == 'author' and meta.get('content'):
|
|
metadata["author"] = meta.get('content')
|
|
|
|
# Description
|
|
if meta.get('name') and meta.get('name').lower() == 'description' and meta.get('content'):
|
|
metadata["description"] = meta.get('content')
|
|
|
|
# Keywords
|
|
if meta.get('name') and meta.get('name').lower() == 'keywords' and meta.get('content'):
|
|
metadata["keywords"] = meta.get('content')
|
|
|
|
# Publication date
|
|
if meta.get('property') and meta.get('property').lower() in ['article:published_time', 'og:published_time'] and meta.get('content'):
|
|
metadata["publication_date"] = meta.get('content')
|
|
|
|
# Open Graph data
|
|
if meta.get('property') and meta.get('property').lower().startswith('og:') and meta.get('content'):
|
|
og_key = meta.get('property').lower().replace('og:', 'og_')
|
|
metadata[og_key] = meta.get('content')
|
|
|
|
# Extract structured data (JSON-LD)
|
|
for script in soup.find_all('script', type='application/ld+json'):
|
|
try:
|
|
ld_data = json.loads(script.string)
|
|
if isinstance(ld_data, dict):
|
|
# Extract date published
|
|
if ld_data.get('@type') in ['Article', 'NewsArticle', 'BlogPosting'] and ld_data.get('datePublished'):
|
|
metadata["publication_date"] = ld_data.get('datePublished')
|
|
|
|
# Extract author
|
|
if ld_data.get('author'):
|
|
author = ld_data.get('author')
|
|
if isinstance(author, dict) and author.get('name'):
|
|
metadata["author"] = author.get('name')
|
|
elif isinstance(author, str):
|
|
metadata["author"] = author
|
|
except (json.JSONDecodeError, AttributeError):
|
|
pass
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error extracting metadata: {str(e)}")
|
|
|
|
return metadata
|
|
|
|
async def _html_to_markdown(self, html: str) -> str:
|
|
"""
|
|
Convert HTML to Markdown.
|
|
|
|
Args:
|
|
html: The HTML content
|
|
|
|
Returns:
|
|
Markdown content
|
|
"""
|
|
converter = html2text.HTML2Text()
|
|
converter.ignore_links = False
|
|
converter.ignore_images = False
|
|
converter.ignore_tables = False
|
|
converter.body_width = 0 # No wrapping
|
|
|
|
return converter.handle(html)
|
|
|
|
async def _get_mock_content(self, url: str) -> Tuple[str, Dict[str, str]]:
|
|
"""
|
|
Generate mock content for testing.
|
|
|
|
Args:
|
|
url: The URL to generate mock content for
|
|
|
|
Returns:
|
|
Tuple of (content, metadata)
|
|
"""
|
|
domain = urlparse(url).netloc
|
|
path = urlparse(url).path
|
|
|
|
# Generate a title based on the URL
|
|
title = f"Mock Content for {domain}{path}"
|
|
|
|
# Generate mock content
|
|
content = f"""# {title}
|
|
|
|
## Introduction
|
|
|
|
This is mock content generated for testing purposes. The original URL is {url}.
|
|
|
|
## Section 1
|
|
|
|
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nullam euismod, nisl eget
|
|
aliquam ultricies, nunc nisl aliquet nunc, quis aliquam nisl nunc eu nisl.
|
|
|
|
## Section 2
|
|
|
|
Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas.
|
|
Vestibulum tortor quam, feugiat vitae, ultricies eget, tempor sit amet, ante.
|
|
|
|
## Conclusion
|
|
|
|
This mock content was generated on {datetime.now().isoformat()}.
|
|
"""
|
|
|
|
# Generate mock metadata
|
|
metadata = {
|
|
"source_url": url,
|
|
"title": title,
|
|
"description": "This is mock content generated for testing purposes.",
|
|
"author": "Mock Generator",
|
|
"scrape_date": datetime.now().isoformat(),
|
|
"publication_date": datetime.now().isoformat()
|
|
}
|
|
|
|
return content, metadata
|
|
|
|
async def _scrape_with_jina_reader(self, url: str) -> Tuple[Optional[str], Optional[Dict[str, str]]]:
|
|
"""
|
|
Scrape a web page using Jina Reader API.
|
|
|
|
Args:
|
|
url: The URL to scrape
|
|
|
|
Returns:
|
|
Tuple of (content, metadata)
|
|
"""
|
|
# If using mock data, return mock content
|
|
if self.use_mock:
|
|
logger.info(f"Using mock data for URL: {url}")
|
|
return await self._get_mock_content(url)
|
|
|
|
# If Jina API is not available, skip this step
|
|
if not self.jina_api_available:
|
|
logger.info("Jina API key not available. Using fallback method.")
|
|
return None, None
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {self.api_key}",
|
|
"Accept": "application/json"
|
|
}
|
|
|
|
data = {
|
|
"url": url,
|
|
"format": "markdown" # Request markdown format
|
|
}
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(self.endpoint, headers=headers, json=data, timeout=30) as response:
|
|
if response.status != 200:
|
|
error_text = await response.text()
|
|
logger.warning(f"Jina Reader API error: {response.status} - {error_text}")
|
|
|
|
# If we get a 404 or 429 (rate limit), mark the API as unavailable for this session
|
|
if response.status in [404, 429]:
|
|
logger.warning("Jina Reader API appears to be unavailable. Using fallback method for all subsequent requests.")
|
|
self.jina_api_available = False
|
|
|
|
return None, None
|
|
|
|
result = await response.json()
|
|
|
|
if "content" not in result:
|
|
logger.warning(f"Jina Reader API returned no content: {result}")
|
|
return None, None
|
|
|
|
content = result.get("content", "")
|
|
metadata = result.get("metadata", {})
|
|
|
|
# Add source URL to metadata
|
|
metadata["source_url"] = url
|
|
|
|
return content, metadata
|
|
|
|
except asyncio.TimeoutError:
|
|
logger.warning(f"Timeout calling Jina Reader API for URL: {url}")
|
|
return None, None
|
|
except Exception as e:
|
|
logger.error(f"Error calling Jina Reader API: {str(e)}")
|
|
return None, None
|
|
|
|
async def _scrape_with_fallback(self, url: str) -> Tuple[Optional[str], Optional[Dict[str, str]]]:
|
|
"""
|
|
Scrape a web page using fallback method (aiohttp + BeautifulSoup).
|
|
|
|
Args:
|
|
url: The URL to scrape
|
|
|
|
Returns:
|
|
Tuple of (content, metadata)
|
|
"""
|
|
# If using mock data, return mock content
|
|
if self.use_mock:
|
|
logger.info(f"Using mock data for URL: {url}")
|
|
return await self._get_mock_content(url)
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, headers={"User-Agent": "Mozilla/5.0"}, timeout=30) as response:
|
|
if response.status != 200:
|
|
logger.warning(f"Failed to fetch URL: {url} - Status: {response.status}")
|
|
return None, None
|
|
|
|
html = await response.text()
|
|
|
|
# Extract metadata
|
|
metadata = await self._extract_metadata_from_html(html, url)
|
|
|
|
# Convert to markdown
|
|
content = await self._html_to_markdown(html)
|
|
|
|
return content, metadata
|
|
|
|
except asyncio.TimeoutError:
|
|
logger.warning(f"Timeout fetching URL: {url}")
|
|
return None, None
|
|
except Exception as e:
|
|
logger.error(f"Error in fallback scraping: {str(e)}")
|
|
return None, None
|
|
|
|
async def scrape_url(self, url: str, force_refresh: bool = False) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Scrape a web page and store the content in the database.
|
|
|
|
Args:
|
|
url: The URL to scrape
|
|
force_refresh: If True, scrape the URL even if it's already in the database
|
|
|
|
Returns:
|
|
Document dictionary if successful, None otherwise
|
|
"""
|
|
# Validate URL
|
|
if not self._validate_url(url):
|
|
logger.warning(f"Invalid URL: {url}")
|
|
return None
|
|
|
|
# Normalize URL
|
|
normalized_url = self._normalize_url(url)
|
|
|
|
# Check if document already exists in database
|
|
if not force_refresh and await self.db_manager.document_exists(normalized_url):
|
|
logger.info(f"Document already exists in database: {normalized_url}")
|
|
return await self.db_manager.get_document_by_url(normalized_url)
|
|
|
|
# Try Jina Reader first if it's available
|
|
content, metadata = None, None
|
|
if self.jina_api_available:
|
|
content, metadata = await self._scrape_with_jina_reader(normalized_url)
|
|
|
|
# Fallback to custom scraping if Jina Reader fails or is unavailable
|
|
if content is None:
|
|
logger.info(f"Falling back to custom scraping for URL: {normalized_url}")
|
|
content, metadata = await self._scrape_with_fallback(normalized_url)
|
|
|
|
if content is None or not content.strip():
|
|
logger.warning(f"Failed to extract content from URL: {normalized_url}")
|
|
return None
|
|
|
|
# Count tokens
|
|
token_count = self._count_tokens(content)
|
|
|
|
# Compute hash for deduplication
|
|
doc_hash = self._compute_hash(content)
|
|
|
|
# Get title from metadata or use URL as fallback
|
|
title = metadata.get("title", urlparse(normalized_url).netloc)
|
|
|
|
# Store in database
|
|
try:
|
|
document_id = await self.db_manager.add_document(
|
|
url=normalized_url,
|
|
title=title,
|
|
content=content,
|
|
content_type="markdown",
|
|
token_count=token_count,
|
|
metadata=metadata,
|
|
doc_hash=doc_hash
|
|
)
|
|
|
|
# Return the document
|
|
return await self.db_manager.get_document_by_url(normalized_url)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error storing document in database: {str(e)}")
|
|
return None
|
|
|
|
async def scrape_urls(self, urls: List[str], force_refresh: bool = False) -> List[Dict[str, Any]]:
|
|
"""
|
|
Scrape multiple URLs in parallel.
|
|
|
|
Args:
|
|
urls: List of URLs to scrape
|
|
force_refresh: If True, scrape URLs even if they're already in the database
|
|
|
|
Returns:
|
|
List of document dictionaries
|
|
"""
|
|
tasks = [self.scrape_url(url, force_refresh) for url in urls]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
# Filter out None results
|
|
return [doc for doc in results if doc is not None]
|
|
|
|
|
|
# Create a singleton instance for global use
|
|
document_scraper = DocumentScraper()
|
|
|
|
def get_document_scraper(use_mock: bool = False) -> DocumentScraper:
|
|
"""
|
|
Get the global document scraper instance.
|
|
|
|
Args:
|
|
use_mock: If True, create a new instance with mock data
|
|
|
|
Returns:
|
|
DocumentScraper instance
|
|
"""
|
|
global document_scraper
|
|
|
|
# If mock is requested, create a new instance with mock enabled
|
|
if use_mock:
|
|
return DocumentScraper(use_mock=True)
|
|
|
|
return document_scraper
|
|
|
|
# Example usage
|
|
async def test_scraper(use_mock: bool = False):
|
|
"""
|
|
Test the document scraper with a sample URL.
|
|
|
|
Args:
|
|
use_mock: If True, use mock data instead of making actual API calls
|
|
"""
|
|
from report.database.db_manager import initialize_database
|
|
|
|
# Initialize database
|
|
await initialize_database()
|
|
|
|
# Scrape a URL
|
|
scraper = get_document_scraper(use_mock=use_mock)
|
|
|
|
# Test URLs
|
|
test_urls = [
|
|
"https://en.wikipedia.org/wiki/Web_scraping",
|
|
"https://docs.python.org/3/",
|
|
"https://www.python.org/"
|
|
]
|
|
|
|
print(f"Testing scraper with {'mock data' if use_mock else 'real data'}")
|
|
|
|
for url in test_urls:
|
|
print(f"\nScraping URL: {url}")
|
|
document = await scraper.scrape_url(url)
|
|
|
|
if document:
|
|
print(f"Successfully scraped document: {document['title']}")
|
|
print(f"Token count: {document['token_count']}")
|
|
print(f"Content preview: {document['content'][:200]}...")
|
|
else:
|
|
print(f"Failed to scrape document: {url}")
|
|
|
|
# Run test if this module is executed directly
|
|
if __name__ == "__main__":
|
|
# Test with real data by default
|
|
asyncio.run(test_scraper(use_mock=False))
|