384 lines
17 KiB
Python
384 lines
17 KiB
Python
"""
|
|
Search executor module.
|
|
Handles the execution of search queries across multiple search engines.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
import asyncio
|
|
import concurrent.futures
|
|
from typing import Dict, List, Any, Optional, Union
|
|
|
|
from config.config import get_config
|
|
from .api_handlers.base_handler import BaseSearchHandler
|
|
from .api_handlers.serper_handler import SerperSearchHandler
|
|
from .api_handlers.scholar_handler import ScholarSearchHandler
|
|
from .api_handlers.arxiv_handler import ArxivSearchHandler
|
|
from .api_handlers.news_handler import NewsSearchHandler
|
|
from .api_handlers.openalex_handler import OpenAlexSearchHandler
|
|
from .api_handlers.core_handler import CoreSearchHandler
|
|
from .api_handlers.github_handler import GitHubSearchHandler
|
|
from .api_handlers.stackexchange_handler import StackExchangeSearchHandler
|
|
from .result_enrichers.unpaywall_enricher import UnpaywallEnricher
|
|
|
|
|
|
class SearchExecutor:
|
|
"""
|
|
Executes search queries across multiple search engines.
|
|
Manages rate limiting, error handling, and result aggregation.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the search executor with available search handlers."""
|
|
self.config = get_config()
|
|
self.handlers = self._initialize_handlers()
|
|
self.available_handlers = {name: handler for name, handler in self.handlers.items()
|
|
if handler.is_available()}
|
|
|
|
# Initialize result enrichers
|
|
self.unpaywall_enricher = UnpaywallEnricher()
|
|
|
|
def _initialize_handlers(self) -> Dict[str, BaseSearchHandler]:
|
|
"""
|
|
Initialize all search handlers.
|
|
|
|
Returns:
|
|
Dictionary mapping handler names to handler instances
|
|
"""
|
|
return {
|
|
"serper": SerperSearchHandler(),
|
|
"scholar": ScholarSearchHandler(),
|
|
"arxiv": ArxivSearchHandler(),
|
|
"news": NewsSearchHandler(),
|
|
"openalex": OpenAlexSearchHandler(),
|
|
"core": CoreSearchHandler(),
|
|
"github": GitHubSearchHandler(),
|
|
"stackexchange": StackExchangeSearchHandler()
|
|
}
|
|
|
|
def get_available_search_engines(self) -> List[str]:
|
|
"""
|
|
Get a list of available search engines.
|
|
|
|
Returns:
|
|
List of available search engine names
|
|
"""
|
|
return list(self.available_handlers.keys())
|
|
|
|
def execute_search(self,
|
|
structured_query: Dict[str, Any],
|
|
search_engines: Optional[List[str]] = None,
|
|
num_results: int = 10,
|
|
timeout: int = 30) -> Dict[str, List[Dict[str, Any]]]:
|
|
"""
|
|
Execute a search query across multiple search engines.
|
|
|
|
Args:
|
|
structured_query: Structured query from the query processor
|
|
search_engines: List of search engines to use (if None, use all available)
|
|
num_results: Number of results to return per search engine
|
|
timeout: Timeout in seconds for each search engine
|
|
|
|
Returns:
|
|
Dictionary mapping search engine names to lists of search results
|
|
"""
|
|
# Get the raw query
|
|
raw_query = structured_query.get("raw_query", "")
|
|
|
|
# Get the enhanced query if available, otherwise use the raw query
|
|
query = structured_query.get("enhanced_query", raw_query)
|
|
|
|
# Truncate the query if it's too long (Serper API has a 2048 character limit)
|
|
if len(query) > 2000:
|
|
query = query[:2000]
|
|
|
|
# If no search engines specified, use all available
|
|
if search_engines is None:
|
|
search_engines = list(self.available_handlers.keys())
|
|
|
|
# Handle specialized query types
|
|
|
|
# Current events queries
|
|
if structured_query.get("is_current_events", False) and "news" in self.available_handlers:
|
|
print("Current events query detected, prioritizing news search")
|
|
# Make sure news is in the search engines
|
|
if "news" not in search_engines:
|
|
search_engines.append("news")
|
|
|
|
# If a specific engine is requested, honor that - otherwise limit to news + a general search engine
|
|
# for a faster response with more relevant results
|
|
if not structured_query.get("specific_engines", False):
|
|
general_engines = ["serper", "google"]
|
|
# Find an available general engine
|
|
general_engine = next((e for e in general_engines if e in self.available_handlers), None)
|
|
if general_engine:
|
|
search_engines = ["news", general_engine]
|
|
else:
|
|
# Fall back to just news
|
|
search_engines = ["news"]
|
|
|
|
# Academic queries
|
|
elif structured_query.get("is_academic", False):
|
|
print("Academic query detected, prioritizing academic search engines")
|
|
|
|
# Define academic search engines in order of priority
|
|
academic_engines = ["openalex", "core", "arxiv", "scholar"]
|
|
available_academic = [engine for engine in academic_engines if engine in self.available_handlers]
|
|
|
|
# Always include at least one general search engine for backup
|
|
general_engines = ["serper", "google"]
|
|
available_general = [engine for engine in general_engines if engine in self.available_handlers]
|
|
|
|
if available_academic and not structured_query.get("specific_engines", False):
|
|
# Use available academic engines plus one general engine if available
|
|
search_engines = available_academic
|
|
if available_general:
|
|
search_engines.append(available_general[0])
|
|
elif not available_academic:
|
|
# Just use general search if no academic engines are available
|
|
search_engines = available_general
|
|
|
|
print(f"Selected engines for academic query: {search_engines}")
|
|
|
|
# Code/programming queries
|
|
elif structured_query.get("is_code", False):
|
|
print("Code/programming query detected, prioritizing code search engines")
|
|
|
|
# Define code search engines in order of priority
|
|
code_engines = ["github", "stackexchange"]
|
|
available_code = [engine for engine in code_engines if engine in self.available_handlers]
|
|
|
|
# Always include at least one general search engine for backup
|
|
general_engines = ["serper", "google"]
|
|
available_general = [engine for engine in general_engines if engine in self.available_handlers]
|
|
|
|
if available_code and not structured_query.get("specific_engines", False):
|
|
# Use available code engines plus one general engine if available
|
|
search_engines = available_code
|
|
if available_general:
|
|
search_engines.append(available_general[0])
|
|
elif not available_code:
|
|
# Just use general search if no code engines are available
|
|
search_engines = available_general
|
|
|
|
print(f"Selected engines for code query: {search_engines}")
|
|
else:
|
|
# Filter to only include available search engines
|
|
search_engines = [engine for engine in search_engines
|
|
if engine in self.available_handlers]
|
|
|
|
# Add specialized handlers based on query type
|
|
|
|
# For current events queries
|
|
if structured_query.get("is_current_events", False) and "news" in self.available_handlers and "news" not in search_engines:
|
|
print("Current events query detected, adding news search")
|
|
search_engines.append("news")
|
|
|
|
# For academic queries
|
|
elif structured_query.get("is_academic", False):
|
|
academic_engines = ["openalex", "core", "arxiv", "scholar"]
|
|
for engine in academic_engines:
|
|
if engine in self.available_handlers and engine not in search_engines:
|
|
print(f"Academic query detected, adding {engine} search")
|
|
search_engines.append(engine)
|
|
|
|
# For code/programming queries
|
|
elif structured_query.get("is_code", False):
|
|
code_engines = ["github", "stackexchange"]
|
|
for engine in code_engines:
|
|
if engine in self.available_handlers and engine not in search_engines:
|
|
print(f"Code query detected, adding {engine} search")
|
|
search_engines.append(engine)
|
|
|
|
# Get the search queries for each engine
|
|
search_queries = structured_query.get("search_queries", {})
|
|
|
|
# For news searches on current events queries, add special parameters
|
|
news_params = {}
|
|
if "news" in search_engines and structured_query.get("is_current_events", False):
|
|
# Set up news search parameters
|
|
news_params["days_back"] = 7 # Limit to 7 days for current events
|
|
news_params["sort_by"] = "publishedAt" # Sort by publication date
|
|
|
|
# Execute searches in parallel
|
|
results = {}
|
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
future_to_engine = {}
|
|
|
|
for engine in search_engines:
|
|
if engine not in self.available_handlers:
|
|
continue
|
|
|
|
# Get the appropriate query for this engine
|
|
engine_query = search_queries.get(engine, query)
|
|
|
|
# Additional parameters for certain engines
|
|
kwargs = {}
|
|
if engine == "news" and news_params:
|
|
kwargs = news_params
|
|
|
|
# Submit the search task
|
|
future = executor.submit(
|
|
self._execute_single_search,
|
|
engine=engine,
|
|
query=engine_query,
|
|
num_results=num_results,
|
|
**kwargs
|
|
)
|
|
future_to_engine[future] = engine
|
|
|
|
# Collect results as they complete
|
|
for future in concurrent.futures.as_completed(future_to_engine, timeout=timeout):
|
|
engine = future_to_engine[future]
|
|
try:
|
|
engine_results = future.result()
|
|
results[engine] = engine_results
|
|
except Exception as e:
|
|
print(f"Error executing search for {engine}: {e}")
|
|
results[engine] = []
|
|
|
|
return results
|
|
|
|
def _execute_single_search(self, engine: str, query: str, num_results: int, **kwargs) -> List[Dict[str, Any]]:
|
|
"""
|
|
Execute a search on a single search engine.
|
|
|
|
Args:
|
|
engine: Name of the search engine
|
|
query: Query to execute
|
|
num_results: Number of results to return
|
|
**kwargs: Additional parameters to pass to the search handler
|
|
|
|
Returns:
|
|
List of search results
|
|
"""
|
|
handler = self.available_handlers.get(engine)
|
|
if not handler:
|
|
return []
|
|
|
|
try:
|
|
# Execute the search with any additional parameters
|
|
results = handler.search(query, num_results=num_results, **kwargs)
|
|
return results
|
|
except Exception as e:
|
|
print(f"Error executing search for {engine}: {e}")
|
|
return []
|
|
|
|
async def execute_search_async(self,
|
|
structured_query: Dict[str, Any],
|
|
search_engines: Optional[List[str]] = None,
|
|
num_results: int = 10,
|
|
timeout: int = 30) -> Dict[str, List[Dict[str, Any]]]:
|
|
"""
|
|
Execute a search query across specified search engines asynchronously.
|
|
|
|
Args:
|
|
structured_query: The structured query from the query processor
|
|
search_engines: List of search engines to use (if None, use all available)
|
|
num_results: Number of results to return per search engine
|
|
timeout: Timeout in seconds for each search engine
|
|
|
|
Returns:
|
|
Dictionary mapping search engine names to lists of search results
|
|
"""
|
|
# Get the enhanced query
|
|
query = structured_query.get("enhanced_query", structured_query.get("original_query", ""))
|
|
|
|
# If no search engines specified, use all available
|
|
if search_engines is None:
|
|
search_engines = list(self.available_handlers.keys())
|
|
|
|
# If this is a current events query, prioritize news handler if available
|
|
if structured_query.get("is_current_events", False) and "news" in self.available_handlers:
|
|
print("Current events query detected, prioritizing news search (async)")
|
|
# Make sure news is in the search engines
|
|
if "news" not in search_engines:
|
|
search_engines.append("news")
|
|
|
|
# If a specific engine is requested, honor that - otherwise limit to news + a general search engine
|
|
# for a faster response with more relevant results
|
|
if not structured_query.get("specific_engines", False):
|
|
general_engines = ["serper", "google"]
|
|
# Find an available general engine
|
|
general_engine = next((e for e in general_engines if e in self.available_handlers), None)
|
|
if general_engine:
|
|
search_engines = ["news", general_engine]
|
|
else:
|
|
# Fall back to just news
|
|
search_engines = ["news"]
|
|
else:
|
|
# Filter to only include available search engines
|
|
search_engines = [engine for engine in search_engines
|
|
if engine in self.available_handlers]
|
|
|
|
# If this is a current events query, add news handler if available and not already included
|
|
if structured_query.get("is_current_events", False) and "news" in self.available_handlers and "news" not in search_engines:
|
|
print("Current events query detected, adding news search (async)")
|
|
search_engines.append("news")
|
|
|
|
# Get the search queries for each engine
|
|
search_queries = structured_query.get("search_queries", {})
|
|
|
|
# For news searches on current events queries, add special parameters
|
|
news_params = {}
|
|
if "news" in search_engines and structured_query.get("is_current_events", False):
|
|
# Set up news search parameters
|
|
news_params["days_back"] = 7 # Limit to 7 days for current events
|
|
news_params["sort_by"] = "publishedAt" # Sort by publication date
|
|
|
|
# Create tasks for each search engine
|
|
tasks = []
|
|
for engine in search_engines:
|
|
if engine not in self.available_handlers:
|
|
continue
|
|
|
|
# Get the appropriate query for this engine
|
|
engine_query = search_queries.get(engine, query)
|
|
|
|
# Additional parameters for certain engines
|
|
kwargs = {}
|
|
if engine == "news" and news_params:
|
|
kwargs = news_params
|
|
|
|
# Create a task for this search
|
|
task = self._execute_single_search_async(engine, engine_query, num_results, **kwargs)
|
|
tasks.append((engine, task))
|
|
|
|
# Execute all tasks with timeout
|
|
results = {}
|
|
for engine, task in tasks:
|
|
try:
|
|
engine_results = await asyncio.wait_for(task, timeout=timeout)
|
|
results[engine] = engine_results
|
|
except asyncio.TimeoutError:
|
|
print(f"Search timed out for {engine}")
|
|
results[engine] = []
|
|
except Exception as e:
|
|
print(f"Error executing search for {engine}: {e}")
|
|
results[engine] = []
|
|
|
|
return results
|
|
|
|
async def _execute_single_search_async(self, engine: str, query: str, num_results: int, **kwargs) -> List[Dict[str, Any]]:
|
|
"""
|
|
Execute a search on a single search engine asynchronously.
|
|
|
|
Args:
|
|
engine: Name of the search engine
|
|
query: Query to execute
|
|
num_results: Number of results to return
|
|
**kwargs: Additional parameters to pass to the search handler
|
|
|
|
Returns:
|
|
List of search results
|
|
"""
|
|
# Execute in a thread pool since most API calls are blocking
|
|
loop = asyncio.get_event_loop()
|
|
|
|
# Create a partial function with all the arguments
|
|
def execute_search():
|
|
return self._execute_single_search(engine, query, num_results, **kwargs)
|
|
|
|
return await loop.run_in_executor(None, execute_search)
|