#!/usr/bin/env python """ Query to Report Script This script demonstrates the full workflow from query to report, taking a user query and generating a comprehensive report saved in Markdown format. """ import os import sys import json import asyncio import logging import argparse from datetime import datetime from typing import Dict, List, Any, Optional # Add parent directory to path to import modules sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from query.query_processor import get_query_processor from execution.search_executor import SearchExecutor from ranking.jina_reranker import get_jina_reranker from report.report_generator import get_report_generator, initialize_report_generator from report.report_detail_levels import get_report_detail_level_manager # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) async def query_to_report( query: str, output_file: str, search_engines: Optional[List[str]] = None, num_results: int = 10, token_budget: Optional[int] = None, chunk_size: Optional[int] = None, overlap_size: Optional[int] = None, detail_level: str = "standard", use_mock: bool = False ) -> str: """ Execute the full workflow from query to report. Args: query: User query output_file: Path to save the report search_engines: List of search engines to use num_results: Number of results to return per search engine token_budget: Maximum number of tokens to use for report generation chunk_size: Maximum number of tokens per chunk overlap_size: Number of tokens to overlap between chunks detail_level: Level of detail for the report (brief, standard, detailed, comprehensive) use_mock: If True, use mock data instead of making actual API calls Returns: Path to the generated report """ logger.info(f"Processing query: {query}") logger.info(f"Detail level: {detail_level}") # Step 1: Process the query query_processor = get_query_processor() structured_query = await query_processor.process_query(query) # Add timestamp structured_query['timestamp'] = datetime.now().isoformat() logger.info(f"Query processed. Type: {structured_query['type']}, Intent: {structured_query['intent']}") logger.info(f"Enhanced query: {structured_query['enhanced_query']}") # Step 2: Generate search queries for different engines if search_engines is None: search_executor = SearchExecutor() search_engines = search_executor.get_available_search_engines() if not use_mock: # Generate search queries for each engine search_queries = await query_processor.generate_search_queries( structured_query, search_engines or list(search_executor.available_handlers.keys()) ) structured_query['search_queries'] = search_queries logger.info(f"Generated search queries for engines: {', '.join(search_queries.keys())}") else: # Use mock data structured_query = await query_processor.generate_search_queries(structured_query, search_engines) logger.info(f"Generated search queries for engines: {', '.join(search_engines)}") # Step 3: Execute search search_executor = SearchExecutor() # If detail level is specified, adjust num_results based on the detail level if detail_level and not num_results: detail_level_manager = get_report_detail_level_manager() config = detail_level_manager.get_detail_level_config(detail_level) num_results = config.get("num_results", 10) logger.info(f"Using {num_results} results per search engine based on detail level: {detail_level}") search_results = search_executor.execute_search( structured_query, search_engines=search_engines, num_results=num_results ) # Flatten search results flattened_results = [] for engine, results in search_results.items(): for result in results: # Add the search engine to the result result['engine'] = engine flattened_results.append(result) logger.info(f"Search executed. Got {len(flattened_results)} results from {len(search_results)} engines") # Step 4: Rerank results reranker = get_jina_reranker() # Extract text from results for reranking documents_for_reranking = [] for result in flattened_results: # Use snippet or title as the document text doc_text = result.get('snippet', result.get('title', '')) documents_for_reranking.append(doc_text) # Rerank the documents reranked_indices = reranker.rerank( query=structured_query['enhanced_query'], documents=documents_for_reranking ) # Map the reranked indices back to the original results reranked_results = [] for item in reranked_indices: if 'index' in item and item['index'] < len(flattened_results): original_result = flattened_results[item['index']] # Add the reranking score to the result original_result['score'] = item['score'] reranked_results.append(original_result) logger.info(f"Results reranked. Got {len(reranked_results)} reranked results") # Step 5: Initialize report generator await initialize_report_generator() report_generator = get_report_generator() # Step 6: Generate report logger.info(f"Generating report with detail level: {detail_level}...") report = await report_generator.generate_report( search_results=reranked_results, query=query, token_budget=token_budget, chunk_size=chunk_size, overlap_size=overlap_size, detail_level=detail_level ) logger.info(f"Report generated. Length: {len(report)} characters") # Step 7: Save report to file with open(output_file, 'w', encoding='utf-8') as f: f.write(report) logger.info(f"Report saved to: {output_file}") return output_file def main(): """Main function to parse arguments and run the workflow.""" parser = argparse.ArgumentParser(description='Generate a report from a query') parser.add_argument('query', help='The query to process') parser.add_argument('--output', '-o', default='report.md', help='Output file path') parser.add_argument('--search-engines', '-s', nargs='+', help='Search engines to use') parser.add_argument('--num-results', '-n', type=int, help='Number of results per search engine') parser.add_argument('--token-budget', '-t', type=int, help='Maximum number of tokens for report generation') parser.add_argument('--chunk-size', '-c', type=int, help='Maximum tokens per chunk') parser.add_argument('--overlap-size', '-l', type=int, help='Tokens to overlap between chunks') parser.add_argument('--detail-level', '-d', type=str, default='standard', choices=['brief', 'standard', 'detailed', 'comprehensive'], help='Level of detail for the report') parser.add_argument('--use-mock', '-m', action='store_true', help='Use mock data instead of API calls') parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging') parser.add_argument('--list-detail-levels', action='store_true', help='List available detail levels with descriptions and exit') args = parser.parse_args() # Set log level if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # List detail levels if requested if args.list_detail_levels: detail_level_manager = get_report_detail_level_manager() detail_levels = detail_level_manager.get_available_detail_levels() print("Available detail levels:") for level, description in detail_levels: print(f" {level}: {description}") return # Run the workflow asyncio.run(query_to_report( query=args.query, output_file=args.output, search_engines=args.search_engines, num_results=args.num_results, token_budget=args.token_budget, chunk_size=args.chunk_size, overlap_size=args.overlap_size, detail_level=args.detail_level, use_mock=args.use_mock )) if __name__ == "__main__": main()