ira/execution/api_handlers/stackexchange_handler.py

231 lines
7.9 KiB
Python

"""
StackExchange API handler for programming question search.
This module implements a search handler for the StackExchange API,
focusing on Stack Overflow and related programming Q&A sites.
"""
import os
import requests
import time
from typing import Dict, List, Any, Optional
from urllib.parse import quote
from config.config import get_config
from ..api_handlers.base_handler import BaseSearchHandler
class StackExchangeSearchHandler(BaseSearchHandler):
"""Handler for StackExchange/Stack Overflow search."""
def __init__(self):
"""Initialize the StackExchange search handler."""
self.config = get_config()
self.api_key = os.environ.get('STACKEXCHANGE_API_KEY') or self.config.config_data.get('api_keys', {}).get('stackexchange')
self.api_url = "https://api.stackexchange.com/2.3"
self.search_endpoint = "/search/advanced"
self.last_request_time = 0
self.min_request_interval = 1.0 # seconds between requests to avoid throttling
def search(self, query: str, num_results: int = 10, **kwargs) -> List[Dict[str, Any]]:
"""
Execute a search on StackExchange.
Args:
query: The search query
num_results: Number of results to return
**kwargs: Additional search parameters
- site: StackExchange site to search (default: stackoverflow)
- sort: Sort by (relevance, votes, creation, activity)
- tags: List of tags to filter by
- accepted: Only return questions with accepted answers
Returns:
List of search results
"""
if not self.is_available():
return []
# Rate limiting to avoid API restrictions
self._respect_rate_limit()
# Prepare query parameters
site = kwargs.get("site", "stackoverflow")
params = {
"q": query,
"site": site,
"pagesize": min(num_results, 30), # SE API limit per page
"page": 1,
"filter": "withbody", # Include question body
"key": self.api_key
}
# Add optional parameters
if kwargs.get("sort"):
params["sort"] = kwargs["sort"]
if kwargs.get("tags"):
params["tagged"] = ";".join(kwargs["tags"])
if kwargs.get("accepted"):
params["accepted"] = "True"
try:
# Make the API request
response = requests.get(
f"{self.api_url}{self.search_endpoint}",
params=params
)
response.raise_for_status()
# Process results
data = response.json()
results = []
for item in data.get("items", []):
# Get answer count and score
answer_count = item.get("answer_count", 0)
score = item.get("score", 0)
has_accepted = item.get("is_answered", False)
# Format tags
tags = item.get("tags", [])
tag_str = ", ".join(tags)
# Create snippet from question body
body = item.get("body", "")
snippet = self._extract_snippet(body, max_length=300)
# Additional metadata for result display
meta_info = f"Score: {score} | Answers: {answer_count}"
if has_accepted:
meta_info += " | Has accepted answer"
# Format the snippet with meta information
full_snippet = f"{snippet}\n\nTags: {tag_str}\n{meta_info}"
# Construct a standardized result entry
result = {
"title": item.get("title", "Unnamed Question"),
"url": item.get("link", ""),
"snippet": full_snippet,
"source": f"stackexchange_{site}",
"metadata": {
"score": score,
"answer_count": answer_count,
"has_accepted": has_accepted,
"tags": tags,
"question_id": item.get("question_id", ""),
"creation_date": item.get("creation_date", "")
}
}
results.append(result)
return results
except requests.RequestException as e:
print(f"StackExchange API error: {e}")
return []
def _extract_snippet(self, html_content: str, max_length: int = 300) -> str:
"""
Extract a readable snippet from HTML content.
Args:
html_content: HTML content from Stack Overflow
max_length: Maximum length of the snippet
Returns:
A plain text snippet
"""
try:
# Basic HTML tag removal (a more robust solution would use a library like BeautifulSoup)
import re
text = re.sub(r'<[^>]+>', ' ', html_content)
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text).strip()
# Truncate to max_length
if len(text) > max_length:
text = text[:max_length] + "..."
return text
except Exception as e:
print(f"Error extracting snippet: {e}")
return "Snippet extraction failed"
def _respect_rate_limit(self):
"""
Ensure we don't exceed StackExchange API rate limits.
"""
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.min_request_interval:
sleep_time = self.min_request_interval - time_since_last
time.sleep(sleep_time)
self.last_request_time = time.time()
def get_name(self) -> str:
"""
Get the name of the search handler.
Returns:
Name of the search handler
"""
return "stackexchange"
def is_available(self) -> bool:
"""
Check if the StackExchange API is available.
Note: StackExchange API can be used without an API key with reduced quotas.
Returns:
True if the API is available
"""
return True # Can be used with or without API key
def get_rate_limit_info(self) -> Dict[str, Any]:
"""
Get information about StackExchange API rate limits.
Returns:
Dictionary with rate limit information
"""
quota_max = 300 if self.api_key else 100 # Default quotas
try:
# Make a request to check quota
params = {
"site": "stackoverflow"
}
if self.api_key:
params["key"] = self.api_key
response = requests.get(
f"{self.api_url}/info",
params=params
)
response.raise_for_status()
data = response.json()
quota_remaining = data.get("quota_remaining", quota_max)
return {
"requests_per_minute": 30, # Conservative estimate
"requests_per_day": quota_max,
"current_usage": {
"remaining": quota_remaining,
"max": quota_max,
"reset_time": "Daily" # SE resets quotas daily
}
}
except Exception as e:
print(f"Error getting rate limit info: {e}")
return {
"error": str(e),
"requests_per_minute": 30,
"requests_per_day": quota_max
}