ira/execution/search_executor.py

384 lines
17 KiB
Python

"""
Search executor module.
Handles the execution of search queries across multiple search engines.
"""
import os
import json
import time
import asyncio
import concurrent.futures
from typing import Dict, List, Any, Optional, Union
from config.config import get_config
from .api_handlers.base_handler import BaseSearchHandler
from .api_handlers.serper_handler import SerperSearchHandler
from .api_handlers.scholar_handler import ScholarSearchHandler
from .api_handlers.arxiv_handler import ArxivSearchHandler
from .api_handlers.news_handler import NewsSearchHandler
from .api_handlers.openalex_handler import OpenAlexSearchHandler
from .api_handlers.core_handler import CoreSearchHandler
from .api_handlers.github_handler import GitHubSearchHandler
from .api_handlers.stackexchange_handler import StackExchangeSearchHandler
from .result_enrichers.unpaywall_enricher import UnpaywallEnricher
class SearchExecutor:
"""
Executes search queries across multiple search engines.
Manages rate limiting, error handling, and result aggregation.
"""
def __init__(self):
"""Initialize the search executor with available search handlers."""
self.config = get_config()
self.handlers = self._initialize_handlers()
self.available_handlers = {name: handler for name, handler in self.handlers.items()
if handler.is_available()}
# Initialize result enrichers
self.unpaywall_enricher = UnpaywallEnricher()
def _initialize_handlers(self) -> Dict[str, BaseSearchHandler]:
"""
Initialize all search handlers.
Returns:
Dictionary mapping handler names to handler instances
"""
return {
"serper": SerperSearchHandler(),
"scholar": ScholarSearchHandler(),
"arxiv": ArxivSearchHandler(),
"news": NewsSearchHandler(),
"openalex": OpenAlexSearchHandler(),
"core": CoreSearchHandler(),
"github": GitHubSearchHandler(),
"stackexchange": StackExchangeSearchHandler()
}
def get_available_search_engines(self) -> List[str]:
"""
Get a list of available search engines.
Returns:
List of available search engine names
"""
return list(self.available_handlers.keys())
def execute_search(self,
structured_query: Dict[str, Any],
search_engines: Optional[List[str]] = None,
num_results: int = 10,
timeout: int = 30) -> Dict[str, List[Dict[str, Any]]]:
"""
Execute a search query across multiple search engines.
Args:
structured_query: Structured query from the query processor
search_engines: List of search engines to use (if None, use all available)
num_results: Number of results to return per search engine
timeout: Timeout in seconds for each search engine
Returns:
Dictionary mapping search engine names to lists of search results
"""
# Get the raw query
raw_query = structured_query.get("raw_query", "")
# Get the enhanced query if available, otherwise use the raw query
query = structured_query.get("enhanced_query", raw_query)
# Truncate the query if it's too long (Serper API has a 2048 character limit)
if len(query) > 2000:
query = query[:2000]
# If no search engines specified, use all available
if search_engines is None:
search_engines = list(self.available_handlers.keys())
# Handle specialized query types
# Current events queries
if structured_query.get("is_current_events", False) and "news" in self.available_handlers:
print("Current events query detected, prioritizing news search")
# Make sure news is in the search engines
if "news" not in search_engines:
search_engines.append("news")
# If a specific engine is requested, honor that - otherwise limit to news + a general search engine
# for a faster response with more relevant results
if not structured_query.get("specific_engines", False):
general_engines = ["serper", "google"]
# Find an available general engine
general_engine = next((e for e in general_engines if e in self.available_handlers), None)
if general_engine:
search_engines = ["news", general_engine]
else:
# Fall back to just news
search_engines = ["news"]
# Academic queries
elif structured_query.get("is_academic", False):
print("Academic query detected, prioritizing academic search engines")
# Define academic search engines in order of priority
academic_engines = ["openalex", "core", "arxiv", "scholar"]
available_academic = [engine for engine in academic_engines if engine in self.available_handlers]
# Always include at least one general search engine for backup
general_engines = ["serper", "google"]
available_general = [engine for engine in general_engines if engine in self.available_handlers]
if available_academic and not structured_query.get("specific_engines", False):
# Use available academic engines plus one general engine if available
search_engines = available_academic
if available_general:
search_engines.append(available_general[0])
elif not available_academic:
# Just use general search if no academic engines are available
search_engines = available_general
print(f"Selected engines for academic query: {search_engines}")
# Code/programming queries
elif structured_query.get("is_code", False):
print("Code/programming query detected, prioritizing code search engines")
# Define code search engines in order of priority
code_engines = ["github", "stackexchange"]
available_code = [engine for engine in code_engines if engine in self.available_handlers]
# Always include at least one general search engine for backup
general_engines = ["serper", "google"]
available_general = [engine for engine in general_engines if engine in self.available_handlers]
if available_code and not structured_query.get("specific_engines", False):
# Use available code engines plus one general engine if available
search_engines = available_code
if available_general:
search_engines.append(available_general[0])
elif not available_code:
# Just use general search if no code engines are available
search_engines = available_general
print(f"Selected engines for code query: {search_engines}")
else:
# Filter to only include available search engines
search_engines = [engine for engine in search_engines
if engine in self.available_handlers]
# Add specialized handlers based on query type
# For current events queries
if structured_query.get("is_current_events", False) and "news" in self.available_handlers and "news" not in search_engines:
print("Current events query detected, adding news search")
search_engines.append("news")
# For academic queries
elif structured_query.get("is_academic", False):
academic_engines = ["openalex", "core", "arxiv", "scholar"]
for engine in academic_engines:
if engine in self.available_handlers and engine not in search_engines:
print(f"Academic query detected, adding {engine} search")
search_engines.append(engine)
# For code/programming queries
elif structured_query.get("is_code", False):
code_engines = ["github", "stackexchange"]
for engine in code_engines:
if engine in self.available_handlers and engine not in search_engines:
print(f"Code query detected, adding {engine} search")
search_engines.append(engine)
# Get the search queries for each engine
search_queries = structured_query.get("search_queries", {})
# For news searches on current events queries, add special parameters
news_params = {}
if "news" in search_engines and structured_query.get("is_current_events", False):
# Set up news search parameters
news_params["days_back"] = 7 # Limit to 7 days for current events
news_params["sort_by"] = "publishedAt" # Sort by publication date
# Execute searches in parallel
results = {}
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_engine = {}
for engine in search_engines:
if engine not in self.available_handlers:
continue
# Get the appropriate query for this engine
engine_query = search_queries.get(engine, query)
# Additional parameters for certain engines
kwargs = {}
if engine == "news" and news_params:
kwargs = news_params
# Submit the search task
future = executor.submit(
self._execute_single_search,
engine=engine,
query=engine_query,
num_results=num_results,
**kwargs
)
future_to_engine[future] = engine
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_engine, timeout=timeout):
engine = future_to_engine[future]
try:
engine_results = future.result()
results[engine] = engine_results
except Exception as e:
print(f"Error executing search for {engine}: {e}")
results[engine] = []
return results
def _execute_single_search(self, engine: str, query: str, num_results: int, **kwargs) -> List[Dict[str, Any]]:
"""
Execute a search on a single search engine.
Args:
engine: Name of the search engine
query: Query to execute
num_results: Number of results to return
**kwargs: Additional parameters to pass to the search handler
Returns:
List of search results
"""
handler = self.available_handlers.get(engine)
if not handler:
return []
try:
# Execute the search with any additional parameters
results = handler.search(query, num_results=num_results, **kwargs)
return results
except Exception as e:
print(f"Error executing search for {engine}: {e}")
return []
async def execute_search_async(self,
structured_query: Dict[str, Any],
search_engines: Optional[List[str]] = None,
num_results: int = 10,
timeout: int = 30) -> Dict[str, List[Dict[str, Any]]]:
"""
Execute a search query across specified search engines asynchronously.
Args:
structured_query: The structured query from the query processor
search_engines: List of search engines to use (if None, use all available)
num_results: Number of results to return per search engine
timeout: Timeout in seconds for each search engine
Returns:
Dictionary mapping search engine names to lists of search results
"""
# Get the enhanced query
query = structured_query.get("enhanced_query", structured_query.get("original_query", ""))
# If no search engines specified, use all available
if search_engines is None:
search_engines = list(self.available_handlers.keys())
# If this is a current events query, prioritize news handler if available
if structured_query.get("is_current_events", False) and "news" in self.available_handlers:
print("Current events query detected, prioritizing news search (async)")
# Make sure news is in the search engines
if "news" not in search_engines:
search_engines.append("news")
# If a specific engine is requested, honor that - otherwise limit to news + a general search engine
# for a faster response with more relevant results
if not structured_query.get("specific_engines", False):
general_engines = ["serper", "google"]
# Find an available general engine
general_engine = next((e for e in general_engines if e in self.available_handlers), None)
if general_engine:
search_engines = ["news", general_engine]
else:
# Fall back to just news
search_engines = ["news"]
else:
# Filter to only include available search engines
search_engines = [engine for engine in search_engines
if engine in self.available_handlers]
# If this is a current events query, add news handler if available and not already included
if structured_query.get("is_current_events", False) and "news" in self.available_handlers and "news" not in search_engines:
print("Current events query detected, adding news search (async)")
search_engines.append("news")
# Get the search queries for each engine
search_queries = structured_query.get("search_queries", {})
# For news searches on current events queries, add special parameters
news_params = {}
if "news" in search_engines and structured_query.get("is_current_events", False):
# Set up news search parameters
news_params["days_back"] = 7 # Limit to 7 days for current events
news_params["sort_by"] = "publishedAt" # Sort by publication date
# Create tasks for each search engine
tasks = []
for engine in search_engines:
if engine not in self.available_handlers:
continue
# Get the appropriate query for this engine
engine_query = search_queries.get(engine, query)
# Additional parameters for certain engines
kwargs = {}
if engine == "news" and news_params:
kwargs = news_params
# Create a task for this search
task = self._execute_single_search_async(engine, engine_query, num_results, **kwargs)
tasks.append((engine, task))
# Execute all tasks with timeout
results = {}
for engine, task in tasks:
try:
engine_results = await asyncio.wait_for(task, timeout=timeout)
results[engine] = engine_results
except asyncio.TimeoutError:
print(f"Search timed out for {engine}")
results[engine] = []
except Exception as e:
print(f"Error executing search for {engine}: {e}")
results[engine] = []
return results
async def _execute_single_search_async(self, engine: str, query: str, num_results: int, **kwargs) -> List[Dict[str, Any]]:
"""
Execute a search on a single search engine asynchronously.
Args:
engine: Name of the search engine
query: Query to execute
num_results: Number of results to return
**kwargs: Additional parameters to pass to the search handler
Returns:
List of search results
"""
# Execute in a thread pool since most API calls are blocking
loop = asyncio.get_event_loop()
# Create a partial function with all the arguments
def execute_search():
return self._execute_single_search(engine, query, num_results, **kwargs)
return await loop.run_in_executor(None, execute_search)