ira/report/document_scraper.py

511 lines
18 KiB
Python

"""
Document scraper module for the report generation module.
This module provides functionality to scrape web pages and extract clean content
using Jina Reader API or fallback methods.
"""
import os
import re
import json
import hashlib
import logging
import asyncio
import aiohttp
import validators
import tiktoken
from typing import Dict, List, Any, Optional, Tuple, Union
from datetime import datetime
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup
import html2text
from config.config import get_config
from report.database.db_manager import get_db_manager, DBManager
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class DocumentScraper:
"""
Document scraper for the report generation module.
This class provides methods to scrape web pages and extract clean content
using Jina Reader API or fallback methods.
"""
def __init__(self, use_mock: bool = False):
"""
Initialize the document scraper.
Args:
use_mock: If True, use mock data instead of making actual API calls
"""
self.config = get_config()
self.api_key = self._get_api_key()
self.endpoint = "https://api.jina.ai/v1/reader"
self.db_manager = get_db_manager()
self.tokenizer = tiktoken.get_encoding("cl100k_base") # Using OpenAI's tokenizer
self.use_mock = use_mock
self.jina_api_available = self.api_key != ""
def _get_api_key(self) -> str:
"""
Get the Jina AI API key.
Returns:
The API key as a string
Raises:
ValueError: If the API key is not found
"""
try:
return self.config.get_api_key('jina')
except ValueError as e:
logger.warning(f"Jina AI API key not found. Fallback methods will be used. {str(e)}")
return ""
def _count_tokens(self, text: str) -> int:
"""
Count the number of tokens in a text.
Args:
text: The text to count tokens for
Returns:
Number of tokens in the text
"""
return len(self.tokenizer.encode(text))
def _compute_hash(self, content: str) -> str:
"""
Compute a hash of the document content for deduplication.
Args:
content: The document content
Returns:
Hash of the content
"""
return hashlib.sha256(content.encode('utf-8')).hexdigest()
def _normalize_url(self, url: str) -> str:
"""
Normalize a URL by removing fragments and unnecessary query parameters.
Args:
url: The URL to normalize
Returns:
Normalized URL
"""
parsed = urlparse(url)
# Remove fragment
normalized = parsed._replace(fragment="")
# TODO: Add more normalization rules if needed
return normalized.geturl()
def _validate_url(self, url: str) -> bool:
"""
Validate a URL.
Args:
url: The URL to validate
Returns:
True if the URL is valid, False otherwise
"""
return validators.url(url) is True
async def _extract_metadata_from_html(self, html: str, url: str) -> Dict[str, str]:
"""
Extract metadata from HTML content.
Args:
html: The HTML content
url: The URL of the page
Returns:
Dictionary of metadata
"""
metadata = {
"source_url": url,
"scrape_date": datetime.now().isoformat()
}
try:
soup = BeautifulSoup(html, 'html.parser')
# Extract title
if soup.title:
metadata["title"] = soup.title.string
# Extract meta tags
for meta in soup.find_all('meta'):
# Author
if meta.get('name') and meta.get('name').lower() == 'author' and meta.get('content'):
metadata["author"] = meta.get('content')
# Description
if meta.get('name') and meta.get('name').lower() == 'description' and meta.get('content'):
metadata["description"] = meta.get('content')
# Keywords
if meta.get('name') and meta.get('name').lower() == 'keywords' and meta.get('content'):
metadata["keywords"] = meta.get('content')
# Publication date
if meta.get('property') and meta.get('property').lower() in ['article:published_time', 'og:published_time'] and meta.get('content'):
metadata["publication_date"] = meta.get('content')
# Open Graph data
if meta.get('property') and meta.get('property').lower().startswith('og:') and meta.get('content'):
og_key = meta.get('property').lower().replace('og:', 'og_')
metadata[og_key] = meta.get('content')
# Extract structured data (JSON-LD)
for script in soup.find_all('script', type='application/ld+json'):
try:
ld_data = json.loads(script.string)
if isinstance(ld_data, dict):
# Extract date published
if ld_data.get('@type') in ['Article', 'NewsArticle', 'BlogPosting'] and ld_data.get('datePublished'):
metadata["publication_date"] = ld_data.get('datePublished')
# Extract author
if ld_data.get('author'):
author = ld_data.get('author')
if isinstance(author, dict) and author.get('name'):
metadata["author"] = author.get('name')
elif isinstance(author, str):
metadata["author"] = author
except (json.JSONDecodeError, AttributeError):
pass
except Exception as e:
logger.warning(f"Error extracting metadata: {str(e)}")
return metadata
async def _html_to_markdown(self, html: str) -> str:
"""
Convert HTML to Markdown.
Args:
html: The HTML content
Returns:
Markdown content
"""
converter = html2text.HTML2Text()
converter.ignore_links = False
converter.ignore_images = False
converter.ignore_tables = False
converter.body_width = 0 # No wrapping
return converter.handle(html)
async def _get_mock_content(self, url: str) -> Tuple[str, Dict[str, str]]:
"""
Generate mock content for testing.
Args:
url: The URL to generate mock content for
Returns:
Tuple of (content, metadata)
"""
domain = urlparse(url).netloc
path = urlparse(url).path
# Generate a title based on the URL
title = f"Mock Content for {domain}{path}"
# Generate mock content
content = f"""# {title}
## Introduction
This is mock content generated for testing purposes. The original URL is {url}.
## Section 1
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nullam euismod, nisl eget
aliquam ultricies, nunc nisl aliquet nunc, quis aliquam nisl nunc eu nisl.
## Section 2
Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas.
Vestibulum tortor quam, feugiat vitae, ultricies eget, tempor sit amet, ante.
## Conclusion
This mock content was generated on {datetime.now().isoformat()}.
"""
# Generate mock metadata
metadata = {
"source_url": url,
"title": title,
"description": "This is mock content generated for testing purposes.",
"author": "Mock Generator",
"scrape_date": datetime.now().isoformat(),
"publication_date": datetime.now().isoformat()
}
return content, metadata
async def _scrape_with_jina_reader(self, url: str) -> Tuple[Optional[str], Optional[Dict[str, str]]]:
"""
Scrape a web page using Jina Reader API.
Args:
url: The URL to scrape
Returns:
Tuple of (content, metadata)
"""
# If using mock data, return mock content
if self.use_mock:
logger.info(f"Using mock data for URL: {url}")
return await self._get_mock_content(url)
# If Jina API is not available, skip this step
if not self.jina_api_available:
logger.info("Jina API key not available. Using fallback method.")
return None, None
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
"Accept": "application/json"
}
data = {
"url": url,
"format": "markdown" # Request markdown format
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(self.endpoint, headers=headers, json=data, timeout=30) as response:
if response.status != 200:
error_text = await response.text()
logger.warning(f"Jina Reader API error: {response.status} - {error_text}")
# If we get a 404 or 429 (rate limit), mark the API as unavailable for this session
if response.status in [404, 429]:
logger.warning("Jina Reader API appears to be unavailable. Using fallback method for all subsequent requests.")
self.jina_api_available = False
return None, None
result = await response.json()
if "content" not in result:
logger.warning(f"Jina Reader API returned no content: {result}")
return None, None
content = result.get("content", "")
metadata = result.get("metadata", {})
# Add source URL to metadata
metadata["source_url"] = url
return content, metadata
except asyncio.TimeoutError:
logger.warning(f"Timeout calling Jina Reader API for URL: {url}")
return None, None
except Exception as e:
logger.error(f"Error calling Jina Reader API: {str(e)}")
return None, None
async def _scrape_with_fallback(self, url: str) -> Tuple[Optional[str], Optional[Dict[str, str]]]:
"""
Scrape a web page using fallback method (aiohttp + BeautifulSoup).
Args:
url: The URL to scrape
Returns:
Tuple of (content, metadata)
"""
# If using mock data, return mock content
if self.use_mock:
logger.info(f"Using mock data for URL: {url}")
return await self._get_mock_content(url)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers={"User-Agent": "Mozilla/5.0"}, timeout=30) as response:
if response.status != 200:
logger.warning(f"Failed to fetch URL: {url} - Status: {response.status}")
return None, None
html = await response.text()
# Extract metadata
metadata = await self._extract_metadata_from_html(html, url)
# Convert to markdown
content = await self._html_to_markdown(html)
return content, metadata
except asyncio.TimeoutError:
logger.warning(f"Timeout fetching URL: {url}")
return None, None
except Exception as e:
logger.error(f"Error in fallback scraping: {str(e)}")
return None, None
async def scrape_url(self, url: str, force_refresh: bool = False) -> Optional[Dict[str, Any]]:
"""
Scrape a web page and store the content in the database.
Args:
url: The URL to scrape
force_refresh: If True, scrape the URL even if it's already in the database
Returns:
Document dictionary if successful, None otherwise
"""
# Validate URL
if not self._validate_url(url):
logger.warning(f"Invalid URL: {url}")
return None
# Normalize URL
normalized_url = self._normalize_url(url)
# Check if document already exists in database
if not force_refresh and await self.db_manager.document_exists(normalized_url):
logger.info(f"Document already exists in database: {normalized_url}")
return await self.db_manager.get_document_by_url(normalized_url)
# Try Jina Reader first if it's available
content, metadata = None, None
if self.jina_api_available:
content, metadata = await self._scrape_with_jina_reader(normalized_url)
# Fallback to custom scraping if Jina Reader fails or is unavailable
if content is None:
logger.info(f"Falling back to custom scraping for URL: {normalized_url}")
content, metadata = await self._scrape_with_fallback(normalized_url)
if content is None or not content.strip():
logger.warning(f"Failed to extract content from URL: {normalized_url}")
return None
# Count tokens
token_count = self._count_tokens(content)
# Compute hash for deduplication
doc_hash = self._compute_hash(content)
# Get title from metadata or use URL as fallback
title = metadata.get("title", urlparse(normalized_url).netloc)
# Store in database
try:
document_id = await self.db_manager.add_document(
url=normalized_url,
title=title,
content=content,
content_type="markdown",
token_count=token_count,
metadata=metadata,
doc_hash=doc_hash
)
# Return the document
return await self.db_manager.get_document_by_url(normalized_url)
except Exception as e:
logger.error(f"Error storing document in database: {str(e)}")
return None
async def scrape_urls(self, urls: List[str], force_refresh: bool = False) -> List[Dict[str, Any]]:
"""
Scrape multiple URLs in parallel.
Args:
urls: List of URLs to scrape
force_refresh: If True, scrape URLs even if they're already in the database
Returns:
List of document dictionaries
"""
tasks = [self.scrape_url(url, force_refresh) for url in urls]
results = await asyncio.gather(*tasks)
# Filter out None results
return [doc for doc in results if doc is not None]
# Create a singleton instance for global use
document_scraper = DocumentScraper()
def get_document_scraper(use_mock: bool = False) -> DocumentScraper:
"""
Get the global document scraper instance.
Args:
use_mock: If True, create a new instance with mock data
Returns:
DocumentScraper instance
"""
global document_scraper
# If mock is requested, create a new instance with mock enabled
if use_mock:
return DocumentScraper(use_mock=True)
return document_scraper
# Example usage
async def test_scraper(use_mock: bool = False):
"""
Test the document scraper with a sample URL.
Args:
use_mock: If True, use mock data instead of making actual API calls
"""
from report.database.db_manager import initialize_database
# Initialize database
await initialize_database()
# Scrape a URL
scraper = get_document_scraper(use_mock=use_mock)
# Test URLs
test_urls = [
"https://en.wikipedia.org/wiki/Web_scraping",
"https://docs.python.org/3/",
"https://www.python.org/"
]
print(f"Testing scraper with {'mock data' if use_mock else 'real data'}")
for url in test_urls:
print(f"\nScraping URL: {url}")
document = await scraper.scrape_url(url)
if document:
print(f"Successfully scraped document: {document['title']}")
print(f"Token count: {document['token_count']}")
print(f"Content preview: {document['content'][:200]}...")
else:
print(f"Failed to scrape document: {url}")
# Run test if this module is executed directly
if __name__ == "__main__":
# Test with real data by default
asyncio.run(test_scraper(use_mock=False))