ira/ui/gradio_interface.py

1632 lines
76 KiB
Python

"""
Gradio interface for the intelligent research system.
This module provides a web interface for users to interact with the research system.
"""
import os
import json
import gradio as gr
import sys
import time
import asyncio
from pathlib import Path
from datetime import datetime
# Add the parent directory to the path to allow importing from other modules
sys.path.append(str(Path(__file__).parent.parent))
from query.query_processor import QueryProcessor
from execution.search_executor import SearchExecutor
from execution.result_collector import ResultCollector
from execution.sub_question_executor import get_sub_question_executor
from report.report_generator import get_report_generator, initialize_report_generator
from report.report_detail_levels import get_report_detail_level_manager, DetailLevel
from config.config import Config
class GradioInterface:
"""Gradio interface for the intelligent research system."""
def __init__(self):
"""Initialize the Gradio interface."""
self.query_processor = QueryProcessor()
self.search_executor = SearchExecutor()
self.result_collector = ResultCollector()
self.sub_question_executor = get_sub_question_executor()
self.results_dir = Path(__file__).parent.parent / "results"
self.results_dir.mkdir(exist_ok=True)
# Create a dedicated reports directory with subdirectories
self.reports_dir = Path(__file__).parent.parent / "reports"
self.reports_dir.mkdir(exist_ok=True)
# Create daily subdirectory for organization
self.reports_daily_dir = self.reports_dir / datetime.now().strftime("%Y-%m-%d")
self.reports_daily_dir.mkdir(exist_ok=True)
# Create a metadata file to track reports
self.reports_metadata_file = self.reports_dir / "reports_metadata.json"
if not self.reports_metadata_file.exists():
with open(self.reports_metadata_file, "w") as f:
json.dump({"reports": []}, f, indent=2)
self.detail_level_manager = get_report_detail_level_manager()
self.config = Config()
# The report generator will be initialized in the async init method
self.report_generator = None
# We're using Gradio's built-in progress tracking (gr.Progress) instead of custom elements
async def async_init(self):
"""Asynchronously initialize components that require async initialization."""
# Initialize the report generator
await initialize_report_generator()
self.report_generator = get_report_generator()
return self
def process_query(self, query, num_results=10, use_reranker=True):
"""
Process a query and return the results.
Args:
query (str): The query to process
num_results (int): Number of results to return
use_reranker (bool): Whether to use the Jina Reranker for semantic ranking
Returns:
tuple: (markdown_results, json_results_path)
"""
try:
# Process the query
print(f"Processing query: {query}")
processed_query = self.query_processor.process_query(query)
print(f"Processed query: {processed_query}")
# Get available search engines and print their status
available_engines = self.search_executor.get_available_search_engines()
print(f"Available search engines: {available_engines}")
# Check which handlers are actually available
for engine_name, handler in self.search_executor.available_handlers.items():
print(f"Handler {engine_name} available: {handler.is_available()}")
if not handler.is_available():
print(f" - Reason: API key may be missing for {engine_name}")
# Add search engines if not specified
if 'search_engines' not in processed_query:
processed_query['search_engines'] = available_engines
print(f"Using search engines: {available_engines}")
# Execute the search - request more results from each engine
print(f"Executing search...")
search_results = self.search_executor.execute_search(
structured_query=processed_query,
num_results=num_results
)
# Print which engines returned results
for engine, results in search_results.items():
print(f"Engine {engine} returned {len(results)} results")
# Add the query to each result for reranking
enhanced_query = processed_query.get("enhanced_query", processed_query.get("original_query", query))
# Flatten results for easier manipulation
flattened_results = []
for engine, results in search_results.items():
for result in results:
# Add the query and engine to each result
result["query"] = enhanced_query
result["engine"] = engine
flattened_results.append(result)
# Process the results - don't limit the number of results
print(f"Processing results...")
processed_results = self.result_collector.process_results(
{"combined": flattened_results}, dedup=True, max_results=None, use_reranker=use_reranker
)
print(f"Processed {len(processed_results)} results")
# Save results to file
timestamp = int(time.time())
results_file = self.results_dir / f"results_{timestamp}.json"
# Ensure the results are not empty before saving
if processed_results:
with open(results_file, "w") as f:
json.dump(processed_results, f, indent=2)
print(f"Results saved to {results_file}")
file_path = str(results_file)
else:
error_message = "No results found. Please try a different query or check API keys."
print(error_message)
file_path = None
return f"## No Results Found\n\n{error_message}", file_path
# Format results for display
markdown_results = self._format_results_as_markdown(processed_results)
return markdown_results, file_path
except Exception as e:
error_message = f"Error processing query: {str(e)}"
print(f"ERROR: {error_message}")
import traceback
traceback.print_exc()
return f"## Error\n\n{error_message}", None
def _format_results_as_markdown(self, results):
"""
Format results as markdown.
Args:
results (list): List of result dictionaries
Returns:
str: Markdown formatted results
"""
if not results:
return "## No Results Found\n\nNo results were found for your query."
# Count results by source
source_counts = {}
for result in results:
source = result.get("source", "unknown")
source_counts[source] = source_counts.get(source, 0) + 1
# Create source distribution string
source_distribution = ", ".join([f"{source}: {count}" for source, count in source_counts.items()])
markdown = f"## Search Results\n\n"
markdown += f"*Sources: {source_distribution}*\n\n"
for i, result in enumerate(results):
title = result.get("title", "Untitled")
url = result.get("url", "")
snippet = result.get("snippet", "No snippet available")
source = result.get("source", "unknown")
authors = result.get("authors", "Unknown")
year = result.get("year", "Unknown")
score = result.get("relevance_score", 0)
markdown += f"### {i+1}. {title}\n\n"
markdown += f"**Source**: {source}\n\n"
markdown += f"**URL**: [{url}]({url})\n\n"
markdown += f"**Snippet**: {snippet}\n\n"
markdown += f"**Authors**: {authors}\n\n"
markdown += f"**Year**: {year}\n\n"
markdown += f"**Score**: {score}\n\n"
markdown += "---\n\n"
return markdown
async def generate_report(self, query, detail_level="standard", query_type="auto-detect", custom_model=None,
results_file=None, process_thinking_tags=False, initial_results=10, final_results=7,
progress=gr.Progress()):
"""
Generate a report for the given query.
Args:
query: The query to generate a report for
detail_level: The level of detail for the report (brief, standard, detailed, comprehensive)
custom_model: Custom model to use for report generation
results_file: Path to a file containing search results
process_thinking_tags: Whether to process thinking tags in the model output
progress: Gradio progress indicator
Returns:
Path to the generated report
"""
try:
# Create a timestamped output file in the daily directory
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
model_suffix = ""
# Extract the actual model name from the description if selected
if custom_model:
# If the model is in the format "model_name (provider: model_display)"
original_custom_model = custom_model
if "(" in custom_model:
custom_model = custom_model.split(" (")[0]
model_name = custom_model.split('/')[-1]
model_suffix = f"_{model_name}"
# Log the model selection for debugging
print(f"Selected model from UI: {original_custom_model}")
print(f"Extracted model name: {custom_model}")
print(f"Using model suffix: {model_suffix}")
# Create a unique report ID
import hashlib
report_id = f"{timestamp}_{hashlib.md5(query.encode()).hexdigest()[:8]}"
# Define the output file path in the daily directory
output_file = self.reports_daily_dir / f"report_{report_id}{model_suffix}.md"
# Get detail level configuration
config = self.detail_level_manager.get_detail_level_config(detail_level)
# Override num_results if provided
if initial_results:
config["initial_results_per_engine"] = initial_results
# Set final results after reranking if provided
if final_results:
config["final_results_after_reranking"] = final_results
# If custom model is provided, use it
if custom_model:
# Extract the actual model name from the display name format if needed
model_name = custom_model.split(" (")[0] if " (" in custom_model else custom_model
config["model"] = model_name
print(f"Using custom model: {model_name}")
# Ensure report generator is initialized
if self.report_generator is None:
print("Initializing report generator...")
await initialize_report_generator()
self.report_generator = get_report_generator()
# Debug: Print initial model configuration based on detail level
detail_config = self.detail_level_manager.get_detail_level_config(detail_level)
default_model = detail_config.get("model", "unknown")
print(f"Default model for {detail_level} detail level: {default_model}")
# Then explicitly override with custom model if provided
if custom_model:
# Extract the actual model name from the display name format
# The format is "model_name (provider: model_display)"
model_name = custom_model.split(" (")[0] if " (" in custom_model else custom_model
print(f"Setting report generator to use custom model: {model_name}")
# Look for a set_model method in the report generator
if hasattr(self.report_generator, 'set_model'):
self.report_generator.set_model(model_name)
print(f"After setting custom model, report generator model is: {self.report_generator.model_name}")
else:
print("Warning: Report generator does not have set_model method. Using alternative approach.")
# Update the config with the model as a fallback
current_config = self.report_generator.get_detail_level_config()
if current_config:
current_config["model"] = model_name
print(f"Updated config model to: {model_name}")
print(f"Generating report with detail level: {detail_level}")
print(f"Detail level configuration: {config}")
print(f"Using model: {config['model']}")
print(f"Processing thinking tags: {process_thinking_tags}")
# If results file is provided, load results from it
search_results = []
if results_file and os.path.exists(results_file):
with open(results_file, 'r') as f:
search_results = json.load(f)
print(f"Loaded {len(search_results)} results from {results_file}")
else:
# If no results file is provided, perform a search
print(f"No results file provided, performing search for: {query}")
# Process the query to create a structured query
structured_query = await self.query_processor.process_query(query)
# Generate search queries for different engines
structured_query = await self.query_processor.generate_search_queries(
structured_query,
self.search_executor.get_available_search_engines()
)
# Set the number of results to fetch per engine early so it's available throughout the function
num_results_to_fetch = config.get("initial_results_per_engine", config.get("num_results", 10))
# Initialize sub_question_results as an empty dict in case there are no sub-questions
sub_question_results = {}
# Check if the query was decomposed into sub-questions
has_sub_questions = 'sub_questions' in structured_query and structured_query['sub_questions']
if has_sub_questions:
# Log sub-questions
print(f"Query was decomposed into {len(structured_query['sub_questions'])} sub-questions:")
for i, sq in enumerate(structured_query['sub_questions']):
print(f" {i+1}. {sq.get('sub_question')} (aspect: {sq.get('aspect')}, priority: {sq.get('priority')})")
# Execute searches for sub-questions
progress(0.1, desc="Executing searches for sub-questions...")
structured_query = await self.sub_question_executor.execute_sub_question_searches(
structured_query,
num_results_per_engine=3 # Use fewer results per engine for sub-questions
)
# Get combined results from sub-questions
sub_question_results = self.sub_question_executor.get_combined_results(structured_query)
print(f"Sub-questions returned results from {len(sub_question_results)} engines")
# Prioritize results from sub-questions
sub_question_results = self.sub_question_executor.prioritize_results(
sub_question_results,
max_results_per_engine=num_results_to_fetch # Use same limit as main query
)
progress(0.2, desc="Completed sub-question searches")
# Execute main search
progress(0.3, desc="Executing main search...")
search_results_dict = self.search_executor.execute_search(
structured_query,
num_results=num_results_to_fetch
)
# Add debug logging
print(f"Main search results by engine:")
for engine, results in search_results_dict.items():
print(f" {engine}: {len(results)} results")
# If we have sub-question results, combine them with the main search results
if has_sub_questions and 'sub_questions' in structured_query:
print("Combining main search results with sub-question results")
progress(0.4, desc="Combining results from sub-questions...")
# Merge results from sub-questions into the main search results
for engine, results in sub_question_results.items():
if engine in search_results_dict:
# Add sub-question results to the main results
search_results_dict[engine].extend(results)
print(f" Added {len(results)} results from sub-questions to {engine}")
else:
# Engine only has sub-question results
search_results_dict[engine] = results
print(f" Added {len(results)} results from sub-questions as new engine {engine}")
# Flatten the search results
search_results = []
for engine_results in search_results_dict.values():
search_results.extend(engine_results)
print(f"Total flattened search results: {len(search_results)}")
# Fallback mechanism if no search results are found
if len(search_results) == 0:
print("WARNING: No search results found. Using fallback search mechanism...")
# Try a simplified version of the query
simplified_query = query.split(" ")[:10] # Take first 10 words
simplified_query = " ".join(simplified_query)
if simplified_query != query:
print(f"Trying simplified query: {simplified_query}")
# Create a basic structured query
basic_structured_query = {
"original_query": simplified_query,
"enhanced_query": simplified_query,
"type": "unknown",
"intent": "research"
}
# Try search again with simplified query
search_results_dict = self.search_executor.execute_search(
basic_structured_query,
num_results=config["num_results"]
)
# Flatten the search results
search_results = []
for engine_results in search_results_dict.values():
search_results.extend(engine_results)
print(f"Fallback search returned {len(search_results)} results")
# Second fallback: If still no results, create a mock result to prevent report generation failure
if len(search_results) == 0:
print("WARNING: Fallback search also failed. Creating mock search result...")
# Create a mock search result with the query as the title
search_results = [{
"title": f"Information about: {query}",
"url": "https://example.com/search-result",
"snippet": f"This is a placeholder result for the query: {query}. " +
"The search system was unable to find relevant results. " +
"Please try refining your query or check your search API configuration.",
"source": "mock_result",
"score": 1.0
}]
print("Created mock search result to allow report generation to proceed")
# Rerank results if we have a reranker
if hasattr(self, 'reranker') and self.reranker:
# Use final_results_after_reranking if available, otherwise fall back to num_results
top_n_results = config.get("final_results_after_reranking", config.get("num_results", 7))
search_results = self.reranker.rerank_with_metadata(
query,
search_results,
document_key='snippet',
top_n=top_n_results
)
# Set up progress tracking
# Define progress callback function
def progress_callback(current_progress, total_chunks, current_report):
# Calculate current chunk number
current_chunk = int(current_progress * total_chunks) if total_chunks > 0 else 0
# Determine the status message based on progress
if current_progress == 0:
status_message = "Preparing documents..."
elif current_progress >= 1.0:
status_message = "Finalizing report..."
else:
status_message = f"Processing chunk {current_chunk}/{total_chunks}..."
# Add current chunk title if available
if hasattr(self.report_generator, 'current_chunk_title'):
chunk_title = self.report_generator.current_chunk_title
if chunk_title:
status_message += f" ({chunk_title})"
# Add model information to status message
if hasattr(self.report_generator, 'model_name') and self.report_generator.model_name:
model_display = self.report_generator.model_name.split('/')[-1] # Extract model name without provider
status_message += f" (Using model: {model_display})"
# Update the progress status directly
return status_message
# Set the progress callback for the report generator
if hasattr(self.report_generator, 'set_progress_callback'):
# Create a wrapper function that updates the UI elements
def ui_progress_callback(current_progress, total_chunks, current_report):
status_message = progress_callback(current_progress, total_chunks, current_report)
# Use Gradio's built-in progress tracking mechanism
# This will properly update the UI during async operations
progress(current_progress, desc=status_message)
return status_message
self.report_generator.set_progress_callback(ui_progress_callback)
# Generate the report
print(f"Generating report with {len(search_results)} search results")
if len(search_results) == 0:
print("WARNING: No search results found. Report generation may fail.")
# Log the current model being used by the report generator
print(f"Report generator is using model: {self.report_generator.model_name}")
# Update progress status based on detail level
if detail_level.lower() == "comprehensive":
self.progress_status = "Generating progressive report..."
else:
self.progress_status = "Processing document chunks..."
# Initial progress state is handled by Gradio's built-in progress tracking
# Handle query_type parameter
actual_query_type = None
if query_type != "auto-detect":
actual_query_type = query_type
print(f"Using user-selected query type: {actual_query_type}")
else:
print("Using auto-detection for query type")
# Ensure structured_query is defined
if not locals().get('structured_query'):
structured_query = None
report = await self.report_generator.generate_report(
search_results=search_results,
query=query,
token_budget=config["token_budget"],
chunk_size=config["chunk_size"],
overlap_size=config["overlap_size"],
detail_level=detail_level,
query_type=actual_query_type,
structured_query=structured_query if structured_query and 'sub_questions' in structured_query else None
)
# Final progress update
progress(1.0)
# Process thinking tags if requested
if process_thinking_tags:
report = self._process_thinking_tags(report)
# Save report to file
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report)
print(f"Report saved to: {output_file}")
# Update report metadata
self._update_report_metadata(report_id, {
"id": report_id,
"timestamp": timestamp,
"query": query,
"detail_level": detail_level,
"query_type": query_type,
"model": custom_model if custom_model else config.get("model", "default"),
"file_path": str(output_file),
"file_size": output_file.stat().st_size,
"creation_date": datetime.now().isoformat()
})
return report, str(output_file)
except Exception as e:
error_message = f"Error generating report: {str(e)}"
print(f"ERROR: {error_message}")
import traceback
traceback.print_exc()
return f"## Error\n\n{error_message}", None
def _process_thinking_tags(self, text):
"""
Process thinking tags in the text.
Args:
text (str): Text to process
Returns:
str: Processed text
"""
# Remove content between <thinking> and </thinking> tags
import re
return re.sub(r'<thinking>.*?</thinking>', '', text, flags=re.DOTALL)
def _update_report_metadata(self, report_id, metadata):
"""
Update the report metadata file with new report information.
Args:
report_id (str): Unique identifier for the report
metadata (dict): Report metadata to store
"""
try:
# Load existing metadata
with open(self.reports_metadata_file, 'r') as f:
all_metadata = json.load(f)
# Check if report already exists
existing_report = None
for i, report in enumerate(all_metadata.get('reports', [])):
if report.get('id') == report_id:
existing_report = i
break
# Update or add the report metadata
if existing_report is not None:
all_metadata['reports'][existing_report] = metadata
else:
all_metadata['reports'].append(metadata)
# Save updated metadata
with open(self.reports_metadata_file, 'w') as f:
json.dump(all_metadata, f, indent=2)
print(f"Updated metadata for report {report_id}")
except Exception as e:
print(f"Error updating report metadata: {str(e)}")
def get_all_reports(self):
"""
Get all report metadata.
Returns:
list: List of report metadata dictionaries
"""
try:
# Load metadata
with open(self.reports_metadata_file, 'r') as f:
all_metadata = json.load(f)
# Return reports sorted by creation date (newest first)
reports = all_metadata.get('reports', [])
return sorted(reports, key=lambda x: x.get('creation_date', ''), reverse=True)
except Exception as e:
print(f"Error getting report metadata: {str(e)}")
return []
def delete_report(self, report_id):
"""
Delete a report and its metadata.
Args:
report_id (str): ID of the report to delete
Returns:
bool: True if successful, False otherwise
"""
try:
# Load metadata
with open(self.reports_metadata_file, 'r') as f:
all_metadata = json.load(f)
# Find the report
report_to_delete = None
for report in all_metadata.get('reports', []):
if report.get('id') == report_id:
report_to_delete = report
break
if not report_to_delete:
print(f"Report {report_id} not found")
return False
# Delete the report file
file_path = report_to_delete.get('file_path')
if file_path and Path(file_path).exists():
Path(file_path).unlink()
print(f"Deleted report file: {file_path}")
# Remove from metadata
all_metadata['reports'] = [r for r in all_metadata.get('reports', []) if r.get('id') != report_id]
# Save updated metadata
with open(self.reports_metadata_file, 'w') as f:
json.dump(all_metadata, f, indent=2)
print(f"Deleted report {report_id} from metadata")
return True
except Exception as e:
print(f"Error deleting report: {str(e)}")
return False
def get_available_models(self):
"""
Get a list of available models for report generation.
Returns:
list: List of available model names
"""
# Get models from config
models = []
# Extract all model names from the config file
if 'models' in self.config.config_data:
models = list(self.config.config_data['models'].keys())
# If no models found, provide some defaults
if not models:
models = [
"llama-3.1-8b-instant",
"llama-3.3-70b-versatile",
"groq/deepseek-r1-distill-llama-70b-specdec",
"openrouter-mixtral",
"openrouter-claude",
"gemini-2.0-flash-lite"
]
return models
def get_model_descriptions(self):
"""
Get descriptions for available models.
Returns:
dict: Dictionary mapping model names to descriptions
"""
descriptions = {}
model_name_to_description = {}
if 'models' in self.config.config_data:
for model_name, model_config in self.config.config_data['models'].items():
provider = model_config.get('provider', 'unknown')
model_display = model_config.get('model_name', model_name)
max_tokens = model_config.get('max_tokens', 'unknown')
temperature = model_config.get('temperature', 'unknown')
# Create a description that includes the provider and actual model name
display_name = f"{model_name} ({provider}: {model_display})"
descriptions[model_name] = display_name
# Create a more detailed description for the dropdown tooltip
detailed_info = f"{display_name} - Max tokens: {max_tokens}, Temperature: {temperature}"
model_name_to_description[display_name] = detailed_info
self.model_name_to_description = model_name_to_description
return descriptions
def _get_reports_for_display(self):
"""Get reports formatted for display in the UI"""
reports = self.get_all_reports()
display_data = []
for report in reports:
# Format timestamp for display
timestamp = report.get('timestamp', '')
creation_date = report.get('creation_date', '')
if creation_date:
try:
# Convert ISO format to datetime and format for display
dt = datetime.fromisoformat(creation_date)
formatted_date = dt.strftime('%Y-%m-%d %H:%M:%S')
except:
formatted_date = creation_date
else:
formatted_date = timestamp
# Format file size
file_size = report.get('file_size', 0)
if file_size < 1024:
formatted_size = f"{file_size} B"
elif file_size < 1024 * 1024:
formatted_size = f"{file_size / 1024:.1f} KB"
else:
formatted_size = f"{file_size / (1024 * 1024):.1f} MB"
# Add row to display data
display_data.append([
report.get('id', ''),
report.get('query', '')[:50] + ('...' if len(report.get('query', '')) > 50 else ''),
report.get('model', '').split('/')[-1], # Show only the model name without provider
report.get('detail_level', ''),
formatted_date,
formatted_size,
Path(report.get('file_path', '')).name, # Just the filename
])
return display_data
def _delete_selected_reports(self, selected_choices):
"""Delete selected reports
Args:
selected_choices (list): List of selected checkbox values in format "ID: Query (Model)"
Returns:
tuple: Updated reports table data and updated checkbox choices
"""
if not selected_choices:
# If no reports are selected, just refresh the display
reports_data = self._get_reports_for_display()
choices = self._get_report_choices(reports_data)
return reports_data, choices, []
print(f"Selected choices for deletion: {selected_choices}")
# Extract report IDs from selected choices
selected_report_ids = []
for choice in selected_choices:
try:
# Split at the first colon to get the ID
if ':' in choice:
report_id = choice.split(':', 1)[0].strip()
selected_report_ids.append(report_id)
else:
print(f"Warning: Invalid choice format: {choice}")
except Exception as e:
print(f"Error processing choice {choice}: {e}")
print(f"Deleting report IDs: {selected_report_ids}")
# Delete selected reports
deleted_count = 0
for report_id in selected_report_ids:
if self.delete_report(report_id):
deleted_count += 1
print(f"Successfully deleted report: {report_id}")
else:
print(f"Failed to delete report: {report_id}")
print(f"Deleted {deleted_count} reports")
# Refresh the table and choices
reports_data = self._get_reports_for_display()
choices = self._get_report_choices(reports_data)
return reports_data, choices, []
def _download_selected_reports(self, selected_choices):
"""Prepare selected reports for download
Args:
selected_choices (list): List of selected checkbox values in format "ID: Query (Model)"
Returns:
list: List of file paths to download
"""
if not selected_choices:
return []
print(f"Selected choices for download: {selected_choices}")
# Extract report IDs from selected choices
selected_report_ids = []
for choice in selected_choices:
try:
# Split at the first colon to get the ID
if ':' in choice:
report_id = choice.split(':', 1)[0].strip()
selected_report_ids.append(report_id)
else:
print(f"Warning: Invalid choice format: {choice}")
except Exception as e:
print(f"Error processing choice {choice}: {e}")
print(f"Extracted report IDs: {selected_report_ids}")
# Get file paths for selected reports
all_reports = self.get_all_reports()
files_to_download = []
for report_id in selected_report_ids:
report = next((r for r in all_reports if r.get('id') == report_id), None)
if report and "file_path" in report:
file_path = report["file_path"]
# Verify the file exists
if os.path.exists(file_path):
files_to_download.append(file_path)
print(f"Added file for download: {file_path}")
else:
print(f"Warning: File does not exist: {file_path}")
else:
print(f"Warning: Could not find report with ID {report_id}")
return files_to_download
def _get_report_choices(self, reports_data):
"""Generate choices for the checkbox group based on reports data
Args:
reports_data (list): List of report data rows
Returns:
list: List of choices for the checkbox group in format "ID: Query (Model)"
"""
choices = []
# If reports_data is empty, return an empty list
if not reports_data:
return []
# Get all reports from the metadata file to ensure IDs are available
all_reports = self.get_all_reports()
# Create a mapping of report IDs to their full data
report_map = {report.get('id', ''): report for report in all_reports}
for row in reports_data:
try:
report_id = row[0]
if not report_id:
continue
# Get data from the table row
query = row[1]
model = row[2]
# Format: "ID: Query (Model)"
choice_text = f"{report_id}: {query} ({model})"
choices.append(choice_text)
except (IndexError, TypeError) as e:
print(f"Error processing report row: {e}")
continue
return choices
def _cleanup_old_reports(self, days):
"""Delete reports older than the specified number of days
Args:
days (int): Number of days to keep reports for
Returns:
list: Updated reports table data
"""
try:
if days <= 0:
print("Cleanup skipped - days parameter is 0 or negative")
return self._get_reports_for_display()
# Calculate cutoff date
from datetime import timedelta
cutoff_date = datetime.now() - timedelta(days=days)
cutoff_str = cutoff_date.isoformat()
print(f"Cleaning up reports older than {cutoff_date.strftime('%Y-%m-%d %H:%M:%S')}")
# Get all reports
all_reports = self.get_all_reports()
print(f"Found {len(all_reports)} total reports")
reports_to_delete = []
# Find reports older than cutoff date
for report in all_reports:
creation_date = report.get('creation_date', '')
if not creation_date:
print(f"Warning: Report {report.get('id')} has no creation date")
continue
if creation_date < cutoff_str:
reports_to_delete.append(report.get('id'))
print(f"Marking report {report.get('id')} from {creation_date} for deletion")
print(f"Found {len(reports_to_delete)} reports to delete")
# Delete old reports
deleted_count = 0
for report_id in reports_to_delete:
if self.delete_report(report_id):
deleted_count += 1
print(f"Successfully deleted {deleted_count} reports")
# Refresh the table
updated_display = self._get_reports_for_display()
print(f"Returning updated display with {len(updated_display)} reports")
return updated_display
except Exception as e:
print(f"Error in cleanup_old_reports: {e}")
import traceback
traceback.print_exc()
# Return current display data in case of error
return self._get_reports_for_display()
def migrate_existing_reports(self):
"""Migrate existing reports from the root directory to the reports directory structure
Returns:
str: Status message indicating the result of the migration
"""
import re
import shutil
import os
# Pattern to match report files like report_20250317_122351_llama-3.3-70b-versatile.md
report_pattern = re.compile(r'report_(?P<date>\d{8})_(?P<time>\d{6})_?(?P<model>.*?)?\.md$')
# Get the root directory
root_dir = Path(__file__).parent.parent
# Find all report files in the root directory
migrated_count = 0
for file_path in root_dir.glob('report_*.md'):
if not file_path.is_file():
continue
# Extract information from the filename
match = report_pattern.match(file_path.name)
if not match:
continue
date_str = match.group('date')
time_str = match.group('time')
model = match.group('model') or 'unknown'
# Format date for directory structure (YYYY-MM-DD)
try:
year = date_str[:4]
month = date_str[4:6]
day = date_str[6:8]
formatted_date = f"{year}-{month}-{day}"
# Create timestamp for metadata
timestamp = f"{year}-{month}-{day} {time_str[:2]}:{time_str[2:4]}:{time_str[4:6]}"
creation_date = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S").isoformat()
except ValueError:
# If date parsing fails, use current date
formatted_date = datetime.now().strftime("%Y-%m-%d")
creation_date = datetime.now().isoformat()
# Create directory for the date if it doesn't exist
date_dir = self.reports_dir / formatted_date
date_dir.mkdir(exist_ok=True)
# Generate a unique report ID
report_id = f"{date_str}_{time_str}"
# Copy the file to the new location
new_file_path = date_dir / file_path.name
shutil.copy2(file_path, new_file_path)
# Read the report content to extract query if possible
query = ""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read(1000) # Read just the beginning to find the query
# Try to extract query from title or first few lines
title_match = re.search(r'#\s*(.+?)\n', content)
if title_match:
query = title_match.group(1).strip()
else:
# Just use the first line as query
query = content.split('\n')[0].strip()
except Exception as e:
print(f"Error reading file {file_path}: {e}")
# Create metadata for the report
file_size = os.path.getsize(file_path)
metadata = {
"id": report_id,
"query": query,
"model": model,
"detail_level": "unknown", # We don't know the detail level from the filename
"timestamp": timestamp,
"creation_date": creation_date,
"file_path": str(new_file_path),
"file_size": file_size
}
# Update the metadata file
self._update_report_metadata(report_id, metadata)
migrated_count += 1
return f"Migrated {migrated_count} existing reports to the new directory structure."
def create_interface(self):
"""
Create and return the Gradio interface.
Returns:
gr.Blocks: The Gradio interface
"""
with gr.Blocks(title="Intelligent Research System") as interface:
gr.Markdown("# Intelligent Research System")
gr.Markdown(
"""
This system helps you research topics by searching across multiple sources
including Google (via Serper), Google Scholar, arXiv, and news sources.
You can either search for results or generate a comprehensive report.
**Special Capabilities:**
- Automatically detects and optimizes current events queries
- Specialized search handlers for different types of information
- Semantic ranking for the most relevant results
"""
)
# Create tabs for different sections
with gr.Tabs() as tabs:
# Report Generation Tab
with gr.TabItem("Generate Report"):
with gr.Row():
with gr.Column(scale=4):
report_query_input = gr.Textbox(
label="Research Query",
placeholder="Enter your research question here...",
lines=3
)
with gr.Column(scale=1):
report_detail_level = gr.Dropdown(
choices=["brief", "standard", "detailed", "comprehensive"],
value="standard",
label="Detail Level",
info="Controls the depth and breadth of the report"
)
report_query_type = gr.Dropdown(
choices=["auto-detect", "factual", "exploratory", "comparative", "code"],
value="auto-detect",
label="Query Type",
info="Type of query determines the report structure"
)
model_descriptions = self.get_model_descriptions()
report_custom_model = gr.Dropdown(
choices=list(self.model_name_to_description.keys()),
value=None,
label="Custom Model (Optional)",
info="Select a custom model for report generation"
)
with gr.Row():
with gr.Column():
gr.Markdown("### Advanced Settings")
with gr.Row():
with gr.Column():
with gr.Accordion("Search Parameters", open=False):
with gr.Row():
initial_results_slider = gr.Slider(
minimum=5,
maximum=50,
value=10,
step=5,
label="Initial Results Per Engine",
info="Number of results to fetch from each search engine"
)
final_results_slider = gr.Slider(
minimum=3,
maximum=30,
value=7,
step=1,
label="Final Results After Reranking",
info="Number of results to keep after reranking"
)
with gr.Accordion("Processing Options", open=False):
with gr.Row():
report_process_thinking = gr.Checkbox(
label="Process Thinking Tags",
value=False,
info="Process <thinking> tags in model output"
)
with gr.Row():
report_button = gr.Button("Generate Report", variant="primary", size="lg")
# Note: We've removed the redundant progress indicators here
# The built-in Gradio progress tracking (gr.Progress) is used instead
# This is passed to the generate_report method and handles progress updates
gr.Examples(
examples=[
["What are the latest advancements in quantum computing?"],
["Compare transformer and RNN architectures for NLP tasks"],
["Explain the environmental impact of electric vehicles"],
["Explain the potential relationship between creatine supplementation and muscle loss due to GLP1-ar drugs for weight loss."],
["What recent actions has Trump taken regarding tariffs?"],
["What are the recent papers on large language model alignment?"],
["What are the main research findings on climate change adaptation strategies in agriculture?"]
],
inputs=report_query_input
)
with gr.Row():
with gr.Column():
report_output = gr.Markdown(label="Generated Report")
with gr.Row():
with gr.Column():
report_file_output = gr.Textbox(
label="Report saved to file",
interactive=False
)
# Add information about detail levels and query types
detail_levels_info = ""
for level, description in self.detail_level_manager.get_available_detail_levels():
detail_levels_info += f"- **{level}**: {description}\n"
query_types_info = """
- **auto-detect**: Automatically determine the query type based on the query text
- **factual**: For queries seeking specific information (e.g., "What is...", "How does...")
- **exploratory**: For queries investigating a topic broadly (e.g., "Tell me about...")
- **comparative**: For queries comparing multiple items (e.g., "Compare X and Y", "Differences between...")
- **code**: For queries related to programming, software development, or technical implementation
"""
gr.Markdown(f"### Detail Levels\n{detail_levels_info}")
gr.Markdown(f"### Query Types\n{query_types_info}")
# Report Management Tab
with gr.TabItem("Manage Reports"):
with gr.Row():
gr.Markdown("## Report Management")
with gr.Row():
gr.Markdown("Select reports to download or delete. You can also filter and sort the reports.")
# Create a state to store the current reports
reports_state = gr.State([])
# Only include one view of the reports with a clean selection interface
with gr.Row():
with gr.Column():
gr.Markdown("### Reports")
# Get the reports data
reports_data = self._get_reports_for_display()
# This hidden table is just used to store the data
reports_table = gr.Dataframe(
headers=["ID", "Query", "Model", "Detail Level", "Created", "Size", "Filename"],
datatype=["str", "str", "str", "str", "str", "str", "str"],
value=reports_data,
visible=False, # Hide this table
interactive=False
)
# Get the choices for the checkbox group
initial_choices = self._get_report_choices(reports_data)
print(f"Initial choices generated: {len(initial_choices)}")
if not initial_choices:
initial_choices = ["No reports available"]
# Use a cleaner component approach with better styling
gr.Markdown("##### Select reports below for download or deletion")
# Create a completely custom HTML solution for maximum control
# Prepare the HTML for the checkboxes
html_choices = []
for i, choice in enumerate(initial_choices):
html_choices.append(f'<div style="padding: 5px; margin-bottom: 8px;">')
html_choices.append(f'<label style="display: block; width: 100%; cursor: pointer; color: #eee;">')
html_choices.append(f'<input type="checkbox" id="report-{i}" name="report" value="{choice}"> {choice}')
html_choices.append('</label>')
html_choices.append('</div>')
# Create the HTML string with all the checkbox markup and JavaScript functionality
html_content = f"""
<div style="border: 1px solid #555; border-radius: 5px; margin-bottom: 15px; background-color: #2d2d2d; color: #eee;">
<div style="padding: 10px; border-bottom: 1px solid #555; background-color: #3a3a3a;">
<label style="display: block; font-weight: bold; cursor: pointer;">
<input type="checkbox" id="select-all-checkbox" onclick="toggleAllReports()"> Check/Uncheck All
</label>
</div>
<div id="reports-container" style="max-height: 500px; overflow-y: auto; padding: 10px;">
{''.join(html_choices)}
</div>
</div>
<script>
// Toggle all checkboxes
function toggleAllReports() {{
const checkAll = document.getElementById('select-all-checkbox');
const checkboxes = document.getElementsByName('report');
for (let i = 0; i < checkboxes.length; i++) {{
checkboxes[i].checked = checkAll.checked;
}}
updateHiddenField();
}}
// Get selected values and update the hidden field
function updateHiddenField() {{
const checkboxes = document.getElementsByName('report');
const selected = [];
for (let i = 0; i < checkboxes.length; i++) {{
if (checkboxes[i].checked) {{
selected.push(checkboxes[i].value);
}}
}}
// Find the hidden field and set its value
// This needs to match the ID we give to the gr.CheckboxGroup below
const hiddenField = document.querySelector('#reports-hidden-value textarea');
if (hiddenField) {{
// Make sure we always have valid JSON, even if empty
hiddenField.value = JSON.stringify(selected);
console.log('Updated hidden field with: ' + hiddenField.value);
// Trigger a change event to notify Gradio
const event = new Event('input', {{ bubbles: true }});
hiddenField.dispatchEvent(event);
}}
}}
// Add event listeners to all checkbox changes
document.addEventListener('DOMContentLoaded', function() {{
const checkboxes = document.getElementsByName('report');
for (let i = 0; i < checkboxes.length; i++) {{
checkboxes[i].addEventListener('change', updateHiddenField);
}}
}});
</script>
"""
# Create HTML component with our custom checkbox implementation
custom_html = gr.HTML(html_content)
# Create a hidden Textbox to store the selected values as JSON
reports_checkboxes = gr.Textbox(
value="[]", # Empty array as initial value
visible=False, # Hide this
elem_id="reports-hidden-value"
)
gr.Markdown("*Check the boxes next to the reports you want to manage*")
# Buttons for report management
with gr.Row():
with gr.Column(scale=1):
refresh_button = gr.Button("Refresh List")
with gr.Column(scale=1):
download_button = gr.Button("Download Selected")
with gr.Column(scale=1):
delete_button = gr.Button("Delete Selected", variant="stop")
with gr.Column(scale=2):
cleanup_days = gr.Slider(
minimum=0,
maximum=90,
value=30,
step=1,
label="Delete Reports Older Than (Days)",
info="Set to 0 to disable automatic cleanup"
)
cleanup_button = gr.Button("Clean Up Old Reports")
# File download component
with gr.Row():
file_output = gr.File(
label="Downloaded Reports",
file_count="multiple",
type="filepath",
interactive=False
)
# Status message
with gr.Row():
status_message = gr.Markdown("")
# Migration button for existing reports
with gr.Row():
with gr.Column():
gr.Markdown("### Migrate Existing Reports")
gr.Markdown("Use this button to migrate existing reports from the root directory to the new reports directory structure.")
migrate_button = gr.Button("Migrate Existing Reports", variant="primary")
# Set up event handlers
# Update the progress tracking in the generate_report method
async def generate_report_with_progress(query, detail_level, query_type, model_name, process_thinking, initial_results, final_results):
# Set up progress tracking
progress_data = gr.Progress(track_tqdm=True)
# Debug the model selection
print(f"Model selected from UI dropdown: {model_name}")
# Call the original generate_report method
result = await self.generate_report(
query,
detail_level,
query_type,
model_name,
None, # results_file is now None since we removed the search tab
process_thinking,
initial_results,
final_results
)
return result
report_button.click(
fn=lambda q, d, t, m, p, i, f: asyncio.run(generate_report_with_progress(q, d, t, m, p, i, f)),
inputs=[report_query_input, report_detail_level, report_query_type, report_custom_model,
report_process_thinking, initial_results_slider, final_results_slider],
outputs=[report_output, report_file_output]
)
# Report Management Tab Event Handlers
def refresh_reports():
reports_data = self._get_reports_for_display()
choices = self._get_report_choices(reports_data)
return reports_data, choices
refresh_button.click(
fn=refresh_reports,
inputs=[],
outputs=[reports_table, reports_checkboxes]
)
# Add wrapper to parse JSON and handle download
def download_with_logging(selected_json):
try:
# Parse the JSON string from the hidden textbox
import json
print(f"Raw selected_json: '{selected_json}'")
# Make sure we have valid JSON before parsing
if not selected_json or selected_json.strip() == "":
selected = []
else:
# Handle potential edge cases by cleaning up the input
cleaned_json = selected_json.strip()
if not (cleaned_json.startswith('[') and cleaned_json.endswith(']')):
cleaned_json = f"[{cleaned_json}]"
selected = json.loads(cleaned_json)
print(f"Download button clicked with selections: {selected}")
files = self._download_selected_reports(selected)
print(f"Files prepared for download: {len(files)}")
return files
except Exception as e:
print(f"Error processing selections for download: {e}")
import traceback
traceback.print_exc()
return []
# Connect download button directly to our handler
download_button.click(
fn=download_with_logging,
inputs=reports_checkboxes, # Now contains JSON string of selections
outputs=file_output
)
# No need for toggle functionality as it's handled by JavaScript in the HTML component
# Add logging wrapper for delete function
def delete_with_logging(selected):
print(f"Delete button clicked with selections: {selected}")
updated_table, updated_choices, message = self._delete_selected_reports(selected)
print(f"After deletion: {len(updated_table)} reports, {len(updated_choices)} choices")
return updated_table, updated_choices, message
# Update delete handler to parse JSON with improved error handling
def delete_with_reset(selected_json):
try:
# Parse the JSON string from the hidden textbox
import json
print(f"Raw selected_json for delete: '{selected_json}'")
# Make sure we have valid JSON before parsing
if not selected_json or selected_json.strip() == "":
selected = []
else:
# Handle potential edge cases by cleaning up the input
cleaned_json = selected_json.strip()
if not (cleaned_json.startswith('[') and cleaned_json.endswith(']')):
cleaned_json = f"[{cleaned_json}]"
selected = json.loads(cleaned_json)
print(f"Delete button clicked with selections: {selected}")
updated_table, updated_choices, message = self._delete_selected_reports(selected)
print(f"After deletion: {len(updated_table)} reports, {len(updated_choices)} choices")
# Generate new HTML after deletion
html_choices = []
for i, choice in enumerate(updated_choices):
html_choices.append(f'<div style="padding: 5px; margin-bottom: 8px;">')
html_choices.append(f'<label style="display: block; width: 100%; cursor: pointer;">')
html_choices.append(f'<input type="checkbox" id="report-{i}" name="report" value="{choice}"> {choice}')
html_choices.append('</label>')
html_choices.append('</div>')
html_content = f"""
<div style="border: 1px solid #ddd; border-radius: 5px; margin-bottom: 15px;">
<div style="padding: 10px; border-bottom: 1px solid #eee; background-color: #f8f8f8;">
<label style="display: block; font-weight: bold; cursor: pointer;">
<input type="checkbox" id="select-all-checkbox" onclick="toggleAllReports()"> Check/Uncheck All
</label>
</div>
<div id="reports-container" style="max-height: 500px; overflow-y: auto; padding: 10px;">
{''.join(html_choices)}
</div>
</div>
<script>
// Toggle all checkboxes
function toggleAllReports() {{
const checkAll = document.getElementById('select-all-checkbox');
const checkboxes = document.getElementsByName('report');
for (let i = 0; i < checkboxes.length; i++) {{
checkboxes[i].checked = checkAll.checked;
}}
updateHiddenField();
}}
// Get selected values and update the hidden field
function updateHiddenField() {{
const checkboxes = document.getElementsByName('report');
const selected = [];
for (let i = 0; i < checkboxes.length; i++) {{
if (checkboxes[i].checked) {{
selected.push(checkboxes[i].value);
}}
}}
// Find the hidden field and set its value
const hiddenField = document.querySelector('#reports-hidden-value textarea');
if (hiddenField) {{
hiddenField.value = JSON.stringify(selected);
// Trigger a change event to notify Gradio
const event = new Event('input', {{ bubbles: true }});
hiddenField.dispatchEvent(event);
}}
}}
// Add event listeners to all checkbox changes
document.addEventListener('DOMContentLoaded', function() {{
const checkboxes = document.getElementsByName('report');
for (let i = 0; i < checkboxes.length; i++) {{
checkboxes[i].addEventListener('change', updateHiddenField);
}}
}});
</script>
"""
# Reset hidden field
return updated_table, html_content, "[]", message
except Exception as e:
print(f"Error processing selections: {e}")
return reports_table, custom_html.value, "[]", f"Error: {str(e)}"
delete_button.click(
fn=delete_with_reset,
inputs=reports_checkboxes,
outputs=[reports_table, custom_html, reports_checkboxes, status_message]
).then(
fn=lambda msg: f"{msg} Selected reports deleted successfully.",
inputs=[status_message],
outputs=[status_message]
)
def cleanup_with_refresh(days):
updated_table = self._cleanup_old_reports(days)
choices = self._get_report_choices(updated_table)
message = f"Reports older than {days} days have been deleted."
print(message)
return updated_table, choices, message
# Note: We need to make sure this runs properly and updates both the table and checkboxes
# The built-in Gradio progress tracking (gr.Progress) is used instead
# This is passed to the generate_report method and handles progress updates
cleanup_button.click(
fn=cleanup_with_refresh,
inputs=cleanup_days,
outputs=[reports_table, reports_checkboxes, status_message]
).then(
# Add a then function to ensure the UI updates properly
fn=lambda: "Report list has been refreshed.",
inputs=[],
outputs=[status_message]
)
# Migration button event handler
def migrate_and_refresh():
print("Starting migration of existing reports...")
status = self.migrate_existing_reports()
print("Migration completed, refreshing display...")
reports_data = self._get_reports_for_display()
print(f"Got {len(reports_data)} reports for display")
choices = self._get_report_choices(reports_data)
print(f"Generated {len(choices)} choices for selection")
return status, reports_data, choices
migrate_button.click(
fn=migrate_and_refresh,
inputs=[],
outputs=[status_message, reports_table, reports_checkboxes]
).then(
# Add a confirmation message after migration completes
fn=lambda msg: f"{msg} Report list has been refreshed.",
inputs=[status_message],
outputs=[status_message]
)
# Initialize the checkboxes when the table is first loaded
# reports_table.change(
# fn=lambda table: self._get_report_choices(table),
# inputs=reports_table,
# outputs=reports_checkboxes
# )
# Initialize both the table and checkboxes on page load
def init_reports_ui():
print("Initializing reports UI...")
reports_data = self._get_reports_for_display()
choices = self._get_report_choices(reports_data)
# Log the actual choices for debugging
print(f"Initializing reports UI with {len(reports_data)} reports and {len(choices)} choices")
for i, choice in enumerate(choices[:5]):
print(f"Sample choice {i}: {choice}")
if len(choices) > 5:
print(f"...and {len(choices) - 5} more choices")
status = "Reports management initialized successfully."
return reports_data, choices, status
interface.load(
fn=init_reports_ui,
inputs=[],
outputs=[reports_table, reports_checkboxes, status_message]
)
return interface
def launch(self, **kwargs):
"""
Launch the Gradio interface.
Args:
**kwargs: Keyword arguments to pass to gr.Interface.launch()
"""
interface = self.create_interface()
interface.launch(**kwargs)
def main():
"""Main function to launch the Gradio interface."""
# Create interface and initialize async components
interface = GradioInterface()
# Run the async initialization in the event loop
loop = asyncio.get_event_loop()
loop.run_until_complete(interface.async_init())
# Launch the interface
interface.launch(share=True)
if __name__ == "__main__":
main()