#!/usr/bin/env python """ Test script that simulates an MCP client using the SSH server. This helps verify that the server properly responds to MCP requests. Includes a timeout to automatically exit after a few seconds. """ import json import os import sys import subprocess import tempfile import time import signal # Set required environment variables for the subprocess env = os.environ.copy() env["MCP_SSH_HOSTNAME"] = "10.0.1.232" env["MCP_SSH_USERNAME"] = "stwhite" env["MCP_SSH_KEY_FILENAME"] = "~/.ssh/id_ed25519" # Create temporary files for input/output with tempfile.NamedTemporaryFile('w+') as input_file, tempfile.NamedTemporaryFile('w+') as output_file: # Write an MCP request to connect to SSH server mcp_request = { "type": "request", "id": "1", "method": "ssh_connect", "params": {} } input_file.write(json.dumps(mcp_request) + "\n") input_file.flush() # Run the MCP server process with stdin/stdout redirected try: print("Starting MCP SSH server...") # Create a pipe for stdin r, w = os.pipe() r_file = os.fdopen(r, 'r') w_file = os.fdopen(w, 'w') # Copy the input file content to the pipe with open(input_file.name, 'r') as f: content = f.read() w_file.write(content) w_file.flush() process = subprocess.Popen( [sys.executable, "-m", "mcpssh"], env=env, stdin=r_file, stdout=open(output_file.name, 'w'), stderr=subprocess.PIPE, text=True ) # Give it a moment to start print("Waiting for server to start...") time.sleep(2) # Now simulate client disconnect by closing the write end of the pipe print("Simulating client disconnect by closing stdin pipe...") w_file.close() # Wait for the server to detect the disconnect and terminate print("Waiting for server to detect disconnect and terminate...") try: process.wait(timeout=10) print(f"Server exited with code: {process.returncode}") except subprocess.TimeoutExpired: print("Server did not terminate within timeout, killing it...") process.kill() process.wait() print(f"Server killed, exit code: {process.returncode}") # Read stderr output stderr_output = process.stderr.read() if stderr_output: print("Server stderr output:") print(stderr_output) # Read server's response output_file.seek(0) response_lines = output_file.readlines() if not response_lines: print("No response received from server. This likely indicates an initialization issue.") else: for line in response_lines: try: response = json.loads(line.strip()) print("Server response:") print(json.dumps(response, indent=2)) except json.JSONDecodeError: print(f"Non-JSON response: {line.strip()}") except Exception as e: print(f"Error running MCP server: {e}") import traceback traceback.print_exc() print("\nTest complete.") print("For Claude Desktop, use this configuration:") print(""" "mcpssh": { "command": "/Volumes/SAM2/CODE/MCP/mcpssh/venv/bin/python", "args": [ "-m", "mcpssh" ], "env": { "MCP_SSH_HOSTNAME": "10.0.1.232", "MCP_SSH_USERNAME": "stwhite", "MCP_SSH_KEY_FILENAME": "~/.ssh/id_ed25519" } } """)