Update result collector, database manager, and document scraper test with improved error handling and performance optimizations
This commit is contained in:
parent
e748c345e2
commit
2c7b086930
|
@ -40,53 +40,55 @@ class ResultCollector:
|
||||||
search_results: Dictionary mapping search engine names to lists of results
|
search_results: Dictionary mapping search engine names to lists of results
|
||||||
dedup: Whether to deduplicate results
|
dedup: Whether to deduplicate results
|
||||||
max_results: Maximum number of results to return
|
max_results: Maximum number of results to return
|
||||||
use_reranker: Whether to use the Jina Reranker for semantic ranking
|
use_reranker: Whether to use the reranker for semantic ranking
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of processed search results
|
List of processed search results
|
||||||
"""
|
"""
|
||||||
# Combine results from all search engines
|
# Flatten results from all search engines
|
||||||
all_results = []
|
flattened_results = []
|
||||||
|
for engine, results in search_results.items():
|
||||||
|
for result in results:
|
||||||
|
# Add the source to each result
|
||||||
|
result['source'] = engine
|
||||||
|
flattened_results.append(result)
|
||||||
|
|
||||||
# Check if we have a flattened structure (single key with all results)
|
# Print a verification of the query in the flattened results
|
||||||
if len(search_results) == 1 and "combined" in search_results:
|
if flattened_results:
|
||||||
all_results = search_results["combined"]
|
first_result = flattened_results[0]
|
||||||
print(f"Processing {len(all_results)} combined results")
|
query = first_result.get('query', '')
|
||||||
else:
|
print(f"Verifying query in flattened results:")
|
||||||
# Traditional structure with separate engines
|
print(f"Query in first result: {query[:50]}...")
|
||||||
for engine, results in search_results.items():
|
|
||||||
for result in results:
|
|
||||||
# Add the source if not already present
|
|
||||||
if "source" not in result:
|
|
||||||
result["source"] = engine
|
|
||||||
all_results.append(result)
|
|
||||||
print(f"Processing {len(all_results)} results from {len(search_results)} engines")
|
|
||||||
|
|
||||||
# Deduplicate results if requested
|
# Deduplicate results if requested
|
||||||
if dedup:
|
if dedup:
|
||||||
all_results = self._deduplicate_results(all_results)
|
flattened_results = self._deduplicate_results(flattened_results)
|
||||||
print(f"Deduplicated to {len(all_results)} results")
|
|
||||||
|
|
||||||
# Use the reranker if available and requested
|
print(f"Processing {len(flattened_results)} combined results")
|
||||||
|
if dedup:
|
||||||
|
print(f"Deduplicated to {len(flattened_results)} results")
|
||||||
|
|
||||||
|
# Apply reranking if requested and available
|
||||||
if use_reranker and self.reranker is not None:
|
if use_reranker and self.reranker is not None:
|
||||||
|
print("Using Jina Reranker for semantic ranking")
|
||||||
try:
|
try:
|
||||||
print("Using Jina Reranker for semantic ranking")
|
reranked_results = self._rerank_results(flattened_results)
|
||||||
all_results = self._rerank_results(all_results)
|
print(f"Reranked {len(reranked_results)} results")
|
||||||
print(f"Reranked {len(all_results)} results")
|
processed_results = reranked_results
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error using reranker: {str(e)}")
|
print(f"Error during reranking: {str(e)}. Falling back to basic scoring.")
|
||||||
# Fall back to basic scoring
|
print("Using basic scoring")
|
||||||
all_results = self._score_and_sort_results(all_results)
|
processed_results = self._score_and_sort_results(flattened_results)
|
||||||
else:
|
else:
|
||||||
# Use basic scoring
|
|
||||||
print("Using basic scoring")
|
print("Using basic scoring")
|
||||||
all_results = self._score_and_sort_results(all_results)
|
processed_results = self._score_and_sort_results(flattened_results)
|
||||||
|
|
||||||
# Limit the number of results if requested
|
# Limit the number of results if requested
|
||||||
if max_results is not None:
|
if max_results is not None:
|
||||||
all_results = all_results[:max_results]
|
processed_results = processed_results[:max_results]
|
||||||
|
|
||||||
return all_results
|
print(f"Processed {len(processed_results)} results {'with' if use_reranker and self.reranker is not None else 'without'} reranking")
|
||||||
|
return processed_results
|
||||||
|
|
||||||
def _flatten_results(self, search_results: Dict[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
|
def _flatten_results(self, search_results: Dict[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -298,6 +298,15 @@ class DBManager:
|
||||||
logger.error(f"Error deleting document: {str(e)}")
|
logger.error(f"Error deleting document: {str(e)}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
async def clear_database(self):
|
||||||
|
"""Clear all data from the database."""
|
||||||
|
async with aiosqlite.connect(self.db_path) as db:
|
||||||
|
await db.execute('DELETE FROM metadata')
|
||||||
|
await db.execute('DELETE FROM documents')
|
||||||
|
await db.execute('DELETE FROM sqlite_sequence')
|
||||||
|
await db.commit()
|
||||||
|
logger.info("Database cleared")
|
||||||
|
|
||||||
async def search_documents(self, query: str, limit: int = 10, offset: int = 0) -> List[Dict[str, Any]]:
|
async def search_documents(self, query: str, limit: int = 10, offset: int = 0) -> List[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
Search for documents matching the query.
|
Search for documents matching the query.
|
||||||
|
|
|
@ -68,15 +68,75 @@ async def test_document_scraper():
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
async def test_document_scraper_single_url(url, use_mock=False):
|
||||||
|
"""
|
||||||
|
Test the document scraper with a single URL.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
url: The URL to scrape
|
||||||
|
use_mock: If True, use mock data instead of making actual API calls
|
||||||
|
"""
|
||||||
|
# Get document scraper
|
||||||
|
document_scraper = get_document_scraper(use_mock=use_mock)
|
||||||
|
|
||||||
|
logger.info(f"Testing document scraper with URL: {url}")
|
||||||
|
logger.info(f"Using mock data: {use_mock}")
|
||||||
|
|
||||||
|
# Scrape the URL
|
||||||
|
document = await document_scraper.scrape_url(url)
|
||||||
|
|
||||||
|
if document:
|
||||||
|
logger.info(f"Successfully scraped document: {document.get('title')}")
|
||||||
|
logger.info(f"URL: {document.get('url')}")
|
||||||
|
logger.info(f"Token count: {document.get('token_count')}")
|
||||||
|
content_preview = document.get('content', '')[:200] + '...' if document.get('content') else 'No content'
|
||||||
|
logger.info(f"Content snippet: {content_preview}")
|
||||||
|
|
||||||
|
# Print metadata
|
||||||
|
logger.info("\nMetadata:")
|
||||||
|
for key, value in document.get('metadata', {}).items():
|
||||||
|
logger.info(f" {key}: {value}")
|
||||||
|
else:
|
||||||
|
logger.info(f"Failed to scrape document: {url}")
|
||||||
|
|
||||||
|
async def clear_database():
|
||||||
|
"""Clear the document database."""
|
||||||
|
from report.database.db_manager import get_db_manager
|
||||||
|
|
||||||
|
# Get database manager
|
||||||
|
db_manager = get_db_manager()
|
||||||
|
|
||||||
|
# Clear the database
|
||||||
|
await db_manager.clear_database()
|
||||||
|
logger.info("Database cleared")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
try:
|
import argparse
|
||||||
success = asyncio.run(test_document_scraper())
|
parser = argparse.ArgumentParser(description='Test the document scraper')
|
||||||
if success:
|
parser.add_argument('--url', type=str, default='https://fastapi.tiangolo.com/', help='URL to scrape')
|
||||||
logger.info("All tests passed!")
|
parser.add_argument('--mock', action='store_true', help='Use mock data instead of making actual API calls')
|
||||||
sys.exit(0)
|
parser.add_argument('--run-all', action='store_true', help='Run all tests')
|
||||||
else:
|
parser.add_argument('--clear-db', action='store_true', help='Clear the database')
|
||||||
logger.error("Tests failed!")
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.run_all:
|
||||||
|
try:
|
||||||
|
success = asyncio.run(test_document_scraper())
|
||||||
|
if success:
|
||||||
|
logger.info("All tests passed!")
|
||||||
|
sys.exit(0)
|
||||||
|
else:
|
||||||
|
logger.error("Tests failed!")
|
||||||
|
sys.exit(1)
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(f"Error running tests: {str(e)}")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
except Exception as e:
|
elif args.clear_db:
|
||||||
logger.exception(f"Error running tests: {str(e)}")
|
try:
|
||||||
sys.exit(1)
|
asyncio.run(clear_database())
|
||||||
|
sys.exit(0)
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(f"Error clearing database: {str(e)}")
|
||||||
|
sys.exit(1)
|
||||||
|
else:
|
||||||
|
asyncio.run(test_document_scraper_single_url(args.url, use_mock=args.mock))
|
||||||
|
|
Loading…
Reference in New Issue