""" Search executor module. Handles the execution of search queries across multiple search engines, including processing of decomposed sub-questions. """ import os import json import time import asyncio import concurrent.futures from typing import Dict, List, Any, Optional, Union import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) from config.config import get_config from .api_handlers.base_handler import BaseSearchHandler from .api_handlers.serper_handler import SerperSearchHandler from .api_handlers.scholar_handler import ScholarSearchHandler from .api_handlers.arxiv_handler import ArxivSearchHandler from .api_handlers.news_handler import NewsSearchHandler from .api_handlers.openalex_handler import OpenAlexSearchHandler from .api_handlers.core_handler import CoreSearchHandler from .api_handlers.github_handler import GitHubSearchHandler from .api_handlers.stackexchange_handler import StackExchangeSearchHandler from .result_enrichers.unpaywall_enricher import UnpaywallEnricher class SearchExecutor: """ Executes search queries across multiple search engines. Manages rate limiting, error handling, and result aggregation. """ def __init__(self): """Initialize the search executor with available search handlers.""" self.config = get_config() self.handlers = self._initialize_handlers() self.available_handlers = {name: handler for name, handler in self.handlers.items() if handler.is_available()} # Initialize result enrichers self.unpaywall_enricher = UnpaywallEnricher() def _initialize_handlers(self) -> Dict[str, BaseSearchHandler]: """ Initialize all search handlers. Returns: Dictionary mapping handler names to handler instances """ return { "serper": SerperSearchHandler(), "scholar": ScholarSearchHandler(), "arxiv": ArxivSearchHandler(), "news": NewsSearchHandler(), "openalex": OpenAlexSearchHandler(), "core": CoreSearchHandler(), "github": GitHubSearchHandler(), "stackexchange": StackExchangeSearchHandler() } def get_available_search_engines(self) -> List[str]: """ Get a list of available search engines. Returns: List of available search engine names """ return list(self.available_handlers.keys()) def execute_search(self, structured_query: Dict[str, Any], search_engines: Optional[List[str]] = None, num_results: int = 10, timeout: int = 30) -> Dict[str, List[Dict[str, Any]]]: """ Execute a search query across multiple search engines. Args: structured_query: Structured query from the query processor search_engines: List of search engines to use (if None, use all available) num_results: Number of results to return per search engine timeout: Timeout in seconds for each search engine Returns: Dictionary mapping search engine names to lists of search results """ # Get the raw query raw_query = structured_query.get("raw_query", "") # Get the enhanced query if available, otherwise use the raw query query = structured_query.get("enhanced_query", raw_query) # Truncate the query if it's too long (Serper API has a 2048 character limit) if len(query) > 2000: query = query[:2000] # If no search engines specified, use all available if search_engines is None: search_engines = list(self.available_handlers.keys()) # Handle specialized query types # Current events queries if structured_query.get("is_current_events", False) and "news" in self.available_handlers: print("Current events query detected, prioritizing news search") # Make sure news is in the search engines if "news" not in search_engines: search_engines.append("news") # If a specific engine is requested, honor that - otherwise limit to news + a general search engine # for a faster response with more relevant results if not structured_query.get("specific_engines", False): general_engines = ["serper", "google"] # Find an available general engine general_engine = next((e for e in general_engines if e in self.available_handlers), None) if general_engine: search_engines = ["news", general_engine] else: # Fall back to just news search_engines = ["news"] # Academic queries elif structured_query.get("is_academic", False): print("Academic query detected, prioritizing academic search engines") # Define academic search engines in order of priority academic_engines = ["openalex", "core", "arxiv", "scholar"] available_academic = [engine for engine in academic_engines if engine in self.available_handlers] # Always include at least one general search engine for backup general_engines = ["serper", "google"] available_general = [engine for engine in general_engines if engine in self.available_handlers] if available_academic and not structured_query.get("specific_engines", False): # Use available academic engines plus one general engine if available search_engines = available_academic if available_general: search_engines.append(available_general[0]) elif not available_academic: # Just use general search if no academic engines are available search_engines = available_general print(f"Selected engines for academic query: {search_engines}") # Code/programming queries elif structured_query.get("is_code", False): print("Code/programming query detected, prioritizing code search engines") # Define code search engines in order of priority code_engines = ["github", "stackexchange"] available_code = [engine for engine in code_engines if engine in self.available_handlers] # Always include at least one general search engine for backup general_engines = ["serper", "google"] available_general = [engine for engine in general_engines if engine in self.available_handlers] if available_code and not structured_query.get("specific_engines", False): # Use available code engines plus one general engine if available search_engines = available_code if available_general: search_engines.append(available_general[0]) elif not available_code: # Just use general search if no code engines are available search_engines = available_general print(f"Selected engines for code query: {search_engines}") else: # Filter to only include available search engines search_engines = [engine for engine in search_engines if engine in self.available_handlers] # Add specialized handlers based on query type # For current events queries if structured_query.get("is_current_events", False) and "news" in self.available_handlers and "news" not in search_engines: print("Current events query detected, adding news search") search_engines.append("news") # For academic queries elif structured_query.get("is_academic", False): academic_engines = ["openalex", "core", "arxiv", "scholar"] for engine in academic_engines: if engine in self.available_handlers and engine not in search_engines: print(f"Academic query detected, adding {engine} search") search_engines.append(engine) # For code/programming queries elif structured_query.get("is_code", False): code_engines = ["github", "stackexchange"] for engine in code_engines: if engine in self.available_handlers and engine not in search_engines: print(f"Code query detected, adding {engine} search") search_engines.append(engine) # Get the search queries for each engine search_queries = structured_query.get("search_queries", {}) # For news searches on current events queries, add special parameters news_params = {} if "news" in search_engines and structured_query.get("is_current_events", False): # Set up news search parameters news_params["days_back"] = 7 # Limit to 7 days for current events news_params["sort_by"] = "publishedAt" # Sort by publication date # Execute searches in parallel results = {} with concurrent.futures.ThreadPoolExecutor() as executor: future_to_engine = {} for engine in search_engines: if engine not in self.available_handlers: continue # Get the appropriate query for this engine engine_query = search_queries.get(engine, query) # Additional parameters for certain engines kwargs = {} if engine == "news" and news_params: kwargs = news_params # Submit the search task future = executor.submit( self._execute_single_search, engine=engine, query=engine_query, num_results=num_results, **kwargs ) future_to_engine[future] = engine # Collect results as they complete for future in concurrent.futures.as_completed(future_to_engine, timeout=timeout): engine = future_to_engine[future] try: engine_results = future.result() results[engine] = engine_results except Exception as e: print(f"Error executing search for {engine}: {e}") results[engine] = [] return results def _execute_single_search(self, engine: str, query: str, num_results: int, **kwargs) -> List[Dict[str, Any]]: """ Execute a search on a single search engine. Args: engine: Name of the search engine query: Query to execute num_results: Number of results to return **kwargs: Additional parameters to pass to the search handler Returns: List of search results """ handler = self.available_handlers.get(engine) if not handler: return [] try: # Execute the search with any additional parameters results = handler.search(query, num_results=num_results, **kwargs) return results except Exception as e: print(f"Error executing search for {engine}: {e}") return [] async def execute_search_async(self, structured_query: Dict[str, Any], search_engines: Optional[List[str]] = None, num_results: int = 10, timeout: int = 30) -> Dict[str, List[Dict[str, Any]]]: """ Execute a search query across specified search engines asynchronously. Args: structured_query: The structured query from the query processor search_engines: List of search engines to use (if None, use all available) num_results: Number of results to return per search engine timeout: Timeout in seconds for each search engine Returns: Dictionary mapping search engine names to lists of search results """ # Get the enhanced query query = structured_query.get("enhanced_query", structured_query.get("original_query", "")) # If no search engines specified, use all available if search_engines is None: search_engines = list(self.available_handlers.keys()) # If this is a current events query, prioritize news handler if available if structured_query.get("is_current_events", False) and "news" in self.available_handlers: print("Current events query detected, prioritizing news search (async)") # Make sure news is in the search engines if "news" not in search_engines: search_engines.append("news") # If a specific engine is requested, honor that - otherwise limit to news + a general search engine # for a faster response with more relevant results if not structured_query.get("specific_engines", False): general_engines = ["serper", "google"] # Find an available general engine general_engine = next((e for e in general_engines if e in self.available_handlers), None) if general_engine: search_engines = ["news", general_engine] else: # Fall back to just news search_engines = ["news"] else: # Filter to only include available search engines search_engines = [engine for engine in search_engines if engine in self.available_handlers] # If this is a current events query, add news handler if available and not already included if structured_query.get("is_current_events", False) and "news" in self.available_handlers and "news" not in search_engines: print("Current events query detected, adding news search (async)") search_engines.append("news") # Get the search queries for each engine search_queries = structured_query.get("search_queries", {}) # For news searches on current events queries, add special parameters news_params = {} if "news" in search_engines and structured_query.get("is_current_events", False): # Set up news search parameters news_params["days_back"] = 7 # Limit to 7 days for current events news_params["sort_by"] = "publishedAt" # Sort by publication date # Create tasks for each search engine tasks = [] for engine in search_engines: if engine not in self.available_handlers: continue # Get the appropriate query for this engine engine_query = search_queries.get(engine, query) # Additional parameters for certain engines kwargs = {} if engine == "news" and news_params: kwargs = news_params # Create a task for this search task = self._execute_single_search_async(engine, engine_query, num_results, **kwargs) tasks.append((engine, task)) # Execute all tasks with timeout results = {} for engine, task in tasks: try: engine_results = await asyncio.wait_for(task, timeout=timeout) results[engine] = engine_results except asyncio.TimeoutError: print(f"Search timed out for {engine}") results[engine] = [] except Exception as e: print(f"Error executing search for {engine}: {e}") results[engine] = [] return results async def _execute_single_search_async(self, engine: str, query: str, num_results: int, **kwargs) -> List[Dict[str, Any]]: """ Execute a search on a single search engine asynchronously. Args: engine: Name of the search engine query: Query to execute num_results: Number of results to return **kwargs: Additional parameters to pass to the search handler Returns: List of search results """ # Execute in a thread pool since most API calls are blocking loop = asyncio.get_event_loop() # Create a partial function with all the arguments def execute_search(): return self._execute_single_search(engine, query, num_results, **kwargs) return await loop.run_in_executor(None, execute_search)