""" StackExchange API handler for programming question search. This module implements a search handler for the StackExchange API, focusing on Stack Overflow and related programming Q&A sites. """ import os import requests import time from typing import Dict, List, Any, Optional from urllib.parse import quote from config.config import get_config from ..api_handlers.base_handler import BaseSearchHandler class StackExchangeSearchHandler(BaseSearchHandler): """Handler for StackExchange/Stack Overflow search.""" def __init__(self): """Initialize the StackExchange search handler.""" self.config = get_config() self.api_key = os.environ.get('STACKEXCHANGE_API_KEY') or self.config.config_data.get('api_keys', {}).get('stackexchange') self.api_url = "https://api.stackexchange.com/2.3" self.search_endpoint = "/search/advanced" self.last_request_time = 0 self.min_request_interval = 1.0 # seconds between requests to avoid throttling def search(self, query: str, num_results: int = 10, **kwargs) -> List[Dict[str, Any]]: """ Execute a search on StackExchange. Args: query: The search query num_results: Number of results to return **kwargs: Additional search parameters - site: StackExchange site to search (default: stackoverflow) - sort: Sort by (relevance, votes, creation, activity) - tags: List of tags to filter by - accepted: Only return questions with accepted answers Returns: List of search results """ if not self.is_available(): return [] # Rate limiting to avoid API restrictions self._respect_rate_limit() # Prepare query parameters site = kwargs.get("site", "stackoverflow") params = { "q": query, "site": site, "pagesize": min(num_results, 30), # SE API limit per page "page": 1, "filter": "withbody", # Include question body "key": self.api_key } # Add optional parameters if kwargs.get("sort"): params["sort"] = kwargs["sort"] if kwargs.get("tags"): params["tagged"] = ";".join(kwargs["tags"]) if kwargs.get("accepted"): params["accepted"] = "True" try: # Make the API request response = requests.get( f"{self.api_url}{self.search_endpoint}", params=params ) response.raise_for_status() # Process results data = response.json() results = [] for item in data.get("items", []): # Get answer count and score answer_count = item.get("answer_count", 0) score = item.get("score", 0) has_accepted = item.get("is_answered", False) # Format tags tags = item.get("tags", []) tag_str = ", ".join(tags) # Create snippet from question body body = item.get("body", "") snippet = self._extract_snippet(body, max_length=300) # Additional metadata for result display meta_info = f"Score: {score} | Answers: {answer_count}" if has_accepted: meta_info += " | Has accepted answer" # Format the snippet with meta information full_snippet = f"{snippet}\n\nTags: {tag_str}\n{meta_info}" # Construct a standardized result entry result = { "title": item.get("title", "Unnamed Question"), "url": item.get("link", ""), "snippet": full_snippet, "source": f"stackexchange_{site}", "metadata": { "score": score, "answer_count": answer_count, "has_accepted": has_accepted, "tags": tags, "question_id": item.get("question_id", ""), "creation_date": item.get("creation_date", "") } } results.append(result) return results except requests.RequestException as e: print(f"StackExchange API error: {e}") return [] def _extract_snippet(self, html_content: str, max_length: int = 300) -> str: """ Extract a readable snippet from HTML content. Args: html_content: HTML content from Stack Overflow max_length: Maximum length of the snippet Returns: A plain text snippet """ try: # Basic HTML tag removal (a more robust solution would use a library like BeautifulSoup) import re text = re.sub(r'<[^>]+>', ' ', html_content) # Remove excessive whitespace text = re.sub(r'\s+', ' ', text).strip() # Truncate to max_length if len(text) > max_length: text = text[:max_length] + "..." return text except Exception as e: print(f"Error extracting snippet: {e}") return "Snippet extraction failed" def _respect_rate_limit(self): """ Ensure we don't exceed StackExchange API rate limits. """ current_time = time.time() time_since_last = current_time - self.last_request_time if time_since_last < self.min_request_interval: sleep_time = self.min_request_interval - time_since_last time.sleep(sleep_time) self.last_request_time = time.time() def get_name(self) -> str: """ Get the name of the search handler. Returns: Name of the search handler """ return "stackexchange" def is_available(self) -> bool: """ Check if the StackExchange API is available. Note: StackExchange API can be used without an API key with reduced quotas. Returns: True if the API is available """ return True # Can be used with or without API key def get_rate_limit_info(self) -> Dict[str, Any]: """ Get information about StackExchange API rate limits. Returns: Dictionary with rate limit information """ quota_max = 300 if self.api_key else 100 # Default quotas try: # Make a request to check quota params = { "site": "stackoverflow" } if self.api_key: params["key"] = self.api_key response = requests.get( f"{self.api_url}/info", params=params ) response.raise_for_status() data = response.json() quota_remaining = data.get("quota_remaining", quota_max) return { "requests_per_minute": 30, # Conservative estimate "requests_per_day": quota_max, "current_usage": { "remaining": quota_remaining, "max": quota_max, "reset_time": "Daily" # SE resets quotas daily } } except Exception as e: print(f"Error getting rate limit info: {e}") return { "error": str(e), "requests_per_minute": 30, "requests_per_day": quota_max }