New server shutdown feature added.

This commit is contained in:
Steve White 2025-04-22 12:31:35 -05:00
parent 24952c6748
commit 121600194c
20 changed files with 1499 additions and 1 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
.aider*
.ra-aid

9
.note/code_structure.md Normal file
View File

@ -0,0 +1,9 @@
# Code Structure
- `src/mcpssh/`: Main server and protocol logic.
- `tests/`: Test modules.
- `test_mcp.py`, `test_ssh.py`: Standalone test scripts.
- `.note/`: Project memory bank and documentation.
- `scp_plan.md`: Implementation plan for SCP features.
*Update this file as the codebase evolves or new modules are added.*

15
.note/current_focus.md Normal file
View File

@ -0,0 +1,15 @@
# Current Focus
## Active Work
- Implemented SCP/SFTP support in SSHSession and SSHServerMCP per `scp_plan.md`.
- Added file transfer, directory listing, removal, and disk usage methods.
- Updated interfaces documentation.
## Recent Changes
- Extended `SSHSession` and `SSHServerMCP` with SFTP/SCP logic and new tool methods.
- Added FileTransferParams Pydantic model.
## Next Steps
- Add/expand tests for file transfer edge cases (size, space, conflict, errors).
- Update README and user documentation.
- Review and refactor as needed after initial testing.

6
.note/decision_log.md Normal file
View File

@ -0,0 +1,6 @@
# Decision Log
- 2025-04-22: Adopted MEMORY BANK workflow for persistent project context and continuity.
- 2025-04-22: Prioritized creation of `.note/` documentation before implementing new features.
*Log all significant technical and architectural decisions here with rationale and date.*

View File

@ -0,0 +1,16 @@
# Development Standards
## Coding Conventions
- Follow PEP8 for Python code.
- Use descriptive variable and function names.
- Write clear, concise docstrings for all public functions and classes.
## Patterns
- Modular design: keep logic separated by responsibility.
- Use dependency injection where possible.
- Write tests for all new features and bug fixes.
## Documentation
- Update `.note/` files with every significant change.
- Log all key decisions in `decision_log.md`.
- Maintain up-to-date interfaces documentation.

26
.note/interfaces.md Normal file
View File

@ -0,0 +1,26 @@
# Interfaces
## SSHSession (src/mcpssh/server.py)
- connect() -> bool
- close() -> None
- execute_command(command: str) -> Dict[str, Any]
- upload_file(local_path: str, remote_path: str, max_size: int = 1GB, on_conflict: str = 'FAIL') -> Dict[str, Any]
- download_file(remote_path: str, local_path: str, max_size: int = 1GB, on_conflict: str = 'FAIL') -> Dict[str, Any]
- list_directory(remote_path: str) -> Dict[str, Any]
- remove_file(remote_path: str) -> Dict[str, Any]
- get_disk_usage(remote_path: str = '.') -> Dict[str, Any]
## SSHServerMCP (src/mcpssh/server.py)
- ssh_connect(params: SSHConnectionParams) -> Dict[str, Any]
- ssh_execute(params: CommandParams) -> Dict[str, Any]
- ssh_disconnect() -> Dict[str, Any]
- scp_upload(params: FileTransferParams) -> Dict[str, Any]
- scp_download(params: FileTransferParams) -> Dict[str, Any]
- scp_listdir(remote_path: str) -> Dict[str, Any]
- scp_remove(remote_path: str) -> Dict[str, Any]
- scp_disk_usage(remote_path: str = '.') -> Dict[str, Any]
## Pydantic Models
- SSHConnectionParams
- CommandParams
- FileTransferParams (new)

51
.note/mcp_review.md Normal file
View File

@ -0,0 +1,51 @@
You are an expert code reviewer specializing in MCP (Multi-Capability Provider) servers. Your task is to evaluate MCP server implementations and provide comprehensive, actionable feedback based on the checklist below.
## MCP Server Review Checklist
### 1. Functionality & Correctness
- [ ] **Core Logic:** Does the server correctly implement its intended functionality (e.g., SSH connection, command execution, file transfer)?
- [ ] **Tool Implementation:** Are the methods backing the MCP tools implemented correctly and handle expected inputs/outputs?
- [ ] **Edge Cases:** Are potential edge cases and failure modes handled gracefully (e.g., connection errors, file not found, invalid commands)?
### 2. MCP Integration & Tool Definition
- [ ] **Tool Registration:** **CRITICAL:** Are *all* intended MCP tools explicitly registered with the MCP server framework (e.g., using `server.add_tool`)?
- [ ] **Tool Naming & Descriptions:** Are tool names clear, descriptive, and potentially prefixed correctly if required?
- [ ] **Parameter Definition:** Are Pydantic models (or equivalent) used effectively to define clear input parameters for each tool?
- [ ] **Return Values:** Do tools return clear and consistent success/failure indicators and results?
### 3. Security
- [ ] **Credential Handling:** Are sensitive credentials (API keys, passwords, SSH keys) handled securely (e.g., read from environment variables or secure configuration, *not* hardcoded or accepted directly via tool parameters)?
- [ ] **Input Validation:** Are inputs received via tool parameters properly validated to prevent injection attacks or unexpected behavior?
- [ ] **Resource Access:** Does the server appropriately limit access to system resources based on its intended scope?
### 4. Configuration & Deployment
- [ ] **Configuration:** Is the server configuration (e.g., hostname, port, keys) managed effectively (env vars, config files)? Is it clearly documented?
- [ ] **Dependencies:** Are dependencies managed correctly (e.g., `pyproject.toml`, `requirements.txt`)?
- [ ] **Logging:** Is logging implemented effectively for debugging and monitoring?
### 5. Code Quality & Maintainability
- [ ] **Readability:** Is the code well-structured, readable, and appropriately commented?
- [ ] **Error Handling:** Is error handling robust? Are errors logged effectively and reported back to the MCP client appropriately?
- [ ] **State Management:** If the server maintains state (e.g., connections), is it managed correctly (setup, teardown, concurrency)?
- [ ] **Testing:** Are there sufficient unit or integration tests for the core logic and tool interactions?
## Feedback Structure
Provide your review feedback structured as follows:
1. **Overall Assessment**: Brief summary of the server's quality and readiness.
2. **Checks Passed**: List checklist items that are well-implemented.
3. **Areas for Improvement**: Detail checklist items that need attention, categorized by severity (Critical, Major, Minor), with specific examples and suggestions.
4. **Questions**: Any clarifying questions.
## Guidelines
- Be thorough and constructive.
- Prioritize feedback based on impact (Security > Correctness > Maintainability).
- Provide specific code references where applicable.
- Maintain a professional and respectful tone.

17
.note/project_overview.md Normal file
View File

@ -0,0 +1,17 @@
# Project Overview
## Purpose
This project is for MCP SSH management and automation. It includes server code, tests, and plans for SCP (Secure Copy Protocol) functionality, as referenced in `scp_plan.md`.
## Goals
- Implement robust SSH and SCP automation for MCP.
- Follow structured development and documentation practices, as outlined in the MEMORY BANK rules.
## High-Level Architecture
- `src/mcpssh/`: Main server and logic implementation.
- `tests/`, `test_mcp.py`, `test_ssh.py`: Testing suites.
- `.note/`: Memory Bank and documentation for development continuity.
---
*This overview will be updated as the project evolves and as more details from `scp_plan.md` and other documentation are integrated.*

View File

@ -0,0 +1,82 @@
# Review: SCP/SFTP Implementation vs Test Plan
## 1. Summary Assessment
The implementation of SCP/SFTP functionality in the codebase generally aligns well with the requirements and scenarios described in `scp_plan.md`. The core file transfer operations, pre-transfer validations, disk space checks, conflict resolution strategies, and Pydantic models are all implemented as specified. There is evidence of error handling for disk space, file size, and transfer interruptions. However, some important planned test cases—particularly for failures, edge cases, and cleanup behaviors—are not yet present or comprehensive in the test suite. The implementation quality is solid, though a few opportunities for further alignment and robustness exist around partial transfer recovery and negative paths.
## 2. Alignment Strengths
- **Core Feature Fulfillment:** All planned file transfer operations (`upload_file`, `download_file`, `list_directory`, `remove_file`, `get_disk_usage`) are implemented with proper hooks via `SSHSession` and `SSHServerMCP` tool methods.
- **Validation Logic:** Pre-transfer file size and filesystem space checks are present and generally follow the plan (including use of `df -k` remotely and stat calls locally).
- **Conflict Resolution:** The three planned behaviors (`FAIL`, `OVERWRITE`, `RENAME`) are implemented and configurable via parameters.
- **Parameter Model:** Enhanced Pydantic models for transfer parameters match the plan.
- **Disk and Size Error Handling:** Clear, explicit error returns for disk/size problems, with informative messages.
- **Partial Transfer Cleanup:** Mechanism exists for removing partial files on failure, in line with the plan.
- **Documentation:** The README and code comments provide good coverage of usage and behavior.
## 3. Alignment Gaps
### Critical
- **Test Coverage for Edge Cases:**
- Tests for file transfer edge cases (size limits, disk space, conflict strategies, interruptions, etc.) are missing or incomplete in the provided test files (`test_server.py`).
- No tests for partial transfer recovery or correct cleanup/notification on interruption/failure.
### Major
- **Automatic Resume:**
- Resume of interrupted transfers is mentioned in the plan as "if feasible," but there is no implementation or related documentation explaining the limitation or fallback behavior.
- **Negative Path Robustness:**
- Not all negative/exception paths (e.g., SFTP disconnects, mid-transfer interruptions, permission errors) are fully tested.
### Minor
- **Partial File Extension Option:**
- Plan mentions moving incomplete files to `.partial` or temp, but the code simply deletes partials without offering retention/rename as a config option.
- **Error Details:**
- While errors are returned, sometimes they could include additional actionable detail (how many bytes were successfully transferred, or guidance for retry/cleanup).
- **Environmental & Config Testing:**
- Tests relying only on environment/config variable injection are limited; more direct parameterization could improve flexibility.
## 4. Implementation Quality
- **Code Structure:** Clean separation of concerns (SSH session vs. API/server glue); logical organization.
- **Readability:** Good use of logging, helpful error messages, and Pydantic models.
- **Extensibility:** Facility for prefixed tool names and multi-server operation is robust.
- **Error Handling:** Generally graceful; improves reliability and usability for clients.
- **Documentation:** Above average both in code and README.
## 5. Coverage Analysis
- **Positive Paths:** Connection and command execution have solid but basic tests (mocked, deterministic).
- **File Transfer:** No dedicated direct tests for SCP/SFTP upload/download behavior with parameter edge cases.
- **Cleanup and Edge Cases:** No automated tests for transfer interruption, disk full, precondition fail, or partial files.
- **API/Model Alignment:** Parameters (`max_size`, `on_conflict`, etc.) align, though tests do not fully exercise them.
## 6. Recommendations
1. **Test Expansion:**
- Add test cases for all file transfers, explicitly testing:
- Size limit enforcement (upload/download fails as expected for large files)
- Disk space checks on both source and destination
- All three conflict modes (FAIL, OVERWRITE, RENAME)
- Partial transfer cleanup after interruption (can mock/force error during transfer)
- Return values and error messages for negative scenarios
- Handling and explicit notification of interruptions/errors
2. **Partial/Resumed Transfers:**
- If not planning to implement resume support, explicitly document the limitation and ensure client is notified accordingly.
- Optionally, enable configurable retention or renaming of incomplete files rather than always deleting.
3. **Negative Path Coverage:**
- Test and handle remote permission errors, disconnects, and underlying SFTP failures, especially during file manipulations.
4. **Parameterization:**
- Consider adding more flexible ways to set parameters for transfers/tests (beyond environment only).
5. **Documentation:**
- Expand documentation on error behaviors, retry/cleanup advice, and feature limitations.
## 7. Questions
1. Is partial transfer resume intended to be supported, or should this be explicitly documented as a limitation?
2. Should the cleanup of partial files always be enforced, or made configurable (as in the plan)?
3. Are there plans to more fully test disk space, conflict, and interruption scenarios via integration or system tests?
4. Will negative/exception-path error reporting be enhanced to include more diagnostic and recovery information for the client?

95
.note/review_scp_plan.md Normal file
View File

@ -0,0 +1,95 @@
### **Review of `scp_plan.md`**
---
#### 1. **Summary Assessment**
The plan to implement secure file transfer (SFTP-based "SCP") features in the `SSHSession` class demonstrates solid technical grounding and attention to both robustness and usability. The proposal comprehensively addresses the core file transfer operations, introduces essential safety checks (file size, disk space), and provides flexible conflict resolution strategies. The use of existing dependencies (Paramiko) is efficient, and focus areas such as API design, configurability, and thorough test coverage indicate a professional approach. Some refinement in validation logic, error handling, and edge case management could further strengthen the plan.
---
#### 2. **Strengths**
- **Comprehensive Scope:** Covers upload, download, listing, removal, and disk usage across both local and remote endpoints.
- **Safety First:** Pre-transfer validation for file size and disk usage on both sides is well considered.
- **Configurable Limits:** Sensible defaults but clear support for overrides via parameters, environment, and global config.
- **Good API Design:** Strong Pydantic models for parameter validation and concise endpoint design.
- **Flexible Conflict Resolution:** Defaults to fail-safety; supports overwrite and auto-renaming—addresses real user needs.
- **Testing & Documentation:** Explicit commitment for thorough tests (including recovery scenarios) and user-facing documentation.
---
#### 3. **Concerns**
**Critical**
- **Security:**
- Use of shell commands (e.g., `df` via SSH): Ensure that command inputs are never user-controlled to avoid command injection vulnerabilities.
- SFTP inherently avoids remote code execution, but any SSH command interaction must be sanitized.
- **Partial Transfer Handling:**
- The plan mentions "partial transfer recovery" in tests but lacks detail. Does implementation include automatic resume, cleanup, or user notification of failed/incomplete transfers?
**Major**
- **Concurrency/Race Conditions:**
- If remote or local filesystems change between validation and actual transfer, there may be unexpected failures. Plan should acknowledge and, if possible, mitigate these race conditions.
- **Atomicity of Overwrite/Remove:**
- "Overwrite" mode on download/upload might briefly delete and recreate files, potentially exposing users to partial file states or data loss if the transfer fails mid-way.
- **API Usability:**
- The abstraction between "local" and "remote" is clean, but how does the API surface or communicate detailed errors (e.g., which check failed, size vs. quota, etc.) to clients?
**Minor**
- **Performance:**
- Invoking `df` for every transfer may introduce overhead for bulk/batch transfers. Consider caching or batching where feasible.
- **Default Path Handling:**
- The design should clarify how relative paths, symlinks, or remote home directories are handled and validated for both security and usability.
- **Test Case Breadth:**
- Include non-ASCII file names, permission errors, and edge cases (remote root vs user paths) in test coverage.
---
#### 4. **Verification Gaps**
- **No explicit description of retry or recovery strategies** for interrupted transfers.
- **No examples of error messages or API response formats** for common failures.
- **No confirmation of support for large file/streaming transfers, or binary vs. text modes.**
- **No mention of handling platform-dependent remote filesystems** (e.g., Windows paths if used in mixed environments).
---
#### 5. **Suggestions**
- **Expand error handling design:**
- Provide a clear error model/API contract describing possible failure modes and how clients should handle them.
- **Clarify recovery from transfer interruptions:**
- If resume is not supported, specify cleanup and retry semantics.
- **Enhance atomicity for critical operations:**
- Consider using temporary filenames or staging directories for overwrite/rename modes, followed by atomic rename, to minimize data loss risk.
- **Security hardening:**
- Explicitly sanitize and/or hard-code all remote command invocations.
- **Test coverage:**
- Add explicit test cases for edge conditions (unusual paths, permissions, symlinks, high latency, etc.).
- **Performance optimization:**
- If expecting many sequential transfers, optionally support limiting `df` checks to once per batch, not per file (with clear documentation).
- **Documentation:**
- Include a migration/compatibility note if this alters or replaces any previous transfer behavior.
---
#### 6. **Questions**
1. **Partial Transfer Handling:** Will interrupted or failed transfers leave incomplete files? Is "resume" supported, or does the process always start from scratch?
2. **User Feedback:** How are clients notified of specific failures? Are errors granular enough to allow automated remediation?
3. **Symlink/Special File Handling:** What is the intended behavior when transferring symlinks, devices, or other non-regular files?
4. **Concurrency:** Is there support for parallel transfers, and if yes, how do resource checks and race conditions factor in?
5. **Remote File System Variance:** Any plans for handling environments where `df` output may not be consistent (e.g., non-Linux UNIX, restricted shells)?
---
*Overall, this is a robust and well-structured implementation roadmap. Addressing noted concerns—especially around atomicity, recovery, and security—will help ensure high-quality delivery and user trust.*

88
.note/scp_plan.md Normal file
View File

@ -0,0 +1,88 @@
# SCP Feature Implementation Plan
## 1. Core Functionality
- [ ] Add SFTP client to `SSHSession` class using **Paramiko's SFTPClient** (already a dependency)
- [ ] Implement file transfer methods with enhanced safety:
- `upload_file(local_path, remote_path)`
- Check local file size before transfer
- Verify remote filesystem capacity (using `df` command via SSH)
- Default 1GB transfer limit (configurable)
- Conflict resolution: auto-rename (default) / overwrite / fail options
- `download_file(remote_path, local_path)`
- Check remote file size first
- Verify local disk space
- Same conflict resolution options
- `list_directory(remote_path)`
- `remove_file(remote_path)`
- `get_disk_usage()` - Helper for capacity checks
## 2. File Size Handling
- [ ] Pre-transfer validation:
- Local: `os.stat()` for size
- Remote: Execute `df -k` via SSH and parse output
- [ ] Configurable limits:
- Max single file size (default: 1GB)
- Min free space requirement (default: 2x file size)
- [ ] Graceful error handling for:
- Insufficient space
- Size limit exceeded
- Transfer interruptions
## 3. Conflict Resolution
- [ ] Transfer modes:
- `FAIL` (default): Raise error on existing file
- `OVERWRITE`: Replace existing file
- `RENAME`: Add timestamp suffix (e.g., `file(20250422).ext`)
- [ ] Configurable via:
- Transfer parameters
- Environment variables
- Global defaults
## 4. API Endpoints
- [ ] Enhanced Pydantic models:
```python
class FileTransferParams(BaseModel):
local_path: str
remote_path: str
max_size: Optional[int] = None # bytes
on_conflict: Literal["FAIL", "OVERWRITE", "RENAME"] = "FAIL"
```
## 5. Testing Strategy
- [ ] Add test cases for:
- Size limit enforcement
- Disk space verification
- Conflict resolution modes
- Partial transfer recovery
## 6. Partial Transfer Handling
- [ ] **Automatic Resume:**
- If feasible, support resuming interrupted transfers using SFTP partial file support (seek, append). If not supported, document this limitation clearly.
- [ ] **Cleanup of Incomplete Files:**
- On transfer failure, automatically remove any partially transferred files (or move to a `.partial` or temp location). Provide a configuration option to retain or clean up partial files.
- [ ] **User Notification:**
- Always return clear, explicit error status if a transfer does not complete successfully.
- Include details such as bytes transferred, error message, and suggested next steps (retry, cleanup, etc.).
- [ ] **Testing:**
- Add test cases for interrupted connections, disk full scenarios, and manual aborts to verify correct cleanup and notification.
## 7. Documentation
- [ ] Update README with new SCP capabilities
- [ ] Add example usage
## Implementation Steps
1. Extend `SSHSession` with SFTP support using Paramiko
2. Add new tool methods to `SSHServerMCP`
3. Implement tests for edge cases (size, conflicts, errors)
4. Update documentation
## Estimated Timeline
- Core implementation: 2 days
- Testing: 1 day
- Documentation: 0.5 day
---
**Notes:**
- SFTP operations will use Paramiko's `SFTPClient`.
- File size and free space will be checked before transfer on both local and remote filesystems.
- Conflict behavior (fail, overwrite, rename) is configurable per transfer.

7
.note/session_log.md Normal file
View File

@ -0,0 +1,7 @@
# Session Log
## 2025-04-22
- Initialized MEMORY BANK `.note/` directory and required documentation files.
- Preparing to review and implement `scp_plan.md`.
*Add a new entry for each development session.*

View File

@ -0,0 +1,54 @@
# System Prompt for Test Plan and Code Implementation Reviewer
You are an expert code and test plan reviewer with extensive experience in software engineering, quality assurance, and test-driven development. Your task is to evaluate both a test plan document and the associated codebase to ensure alignment, completeness, and effectiveness of the implementation against the planned changes.
## Your Analysis Process
1. **Test Plan Understanding**: First, carefully analyze the provided test plan document to understand:
- The testing objectives and scope
- Test scenarios and cases outlined
- Expected outcomes and success criteria
- Testing methodologies and approaches
- Edge cases and error handling considerations
2. **Code Implementation Evaluation**: Examine the codebase to assess:
- Implementation completeness relative to the test plan requirements
- Proper test coverage of all specified scenarios
- Alignment between code functionality and test plan objectives
- Adherence to testing best practices
- Actual handling of edge cases compared to planned handling
3. **Alignment Assessment**: Evaluate the correspondence between:
- Planned test scenarios and implemented test cases
- Expected behaviors in the plan and actual code implementation
- Test coverage breadth and depth compared to outlined requirements
- Error handling approaches versus implemented error management
## Feedback Structure
Provide your analysis in the following format:
1. **Summary Assessment**: Brief overview of the alignment between test plan and implementation
2. **Alignment Strengths**: Areas where implementation effectively fulfills the test plan requirements
3. **Alignment Gaps**: Identify discrepancies between plan and implementation, categorized by severity:
- Critical: Missing or incorrectly implemented test cases for core functionality
- Major: Significant deviations from planned testing approaches
- Minor: Small differences or optimization opportunities
4. **Implementation Quality**: Assessment of the code quality independent of the test plan
5. **Coverage Analysis**: Evaluation of test coverage against the outlined requirements
6. **Recommendations**: Specific, actionable suggestions to improve alignment and implementation
7. **Questions**: Clarifying questions about ambiguous aspects of either the plan or implementation
## Guidelines
- Perform a systematic comparison between test plan requirements and actual implementation
- Identify both present elements and missing components
- Evaluate the effectiveness of implemented tests in validating the intended functionality
- Consider both functional correctness and adherence to testing best practices
- Assess whether the implementation goes beyond the test plan in beneficial ways
- Identify any implementation shortcuts that compromise the test plan's intent
- Prioritize feedback based on potential impact on software quality and reliability
Always maintain a professional, analytical tone that focuses on improving the alignment between planned testing and actual implementation, rather than criticizing the authors of either component.
Please review scp_plan.md and the codebase for SCP/SFTP implementation, and save your comments to review_implementation.md.

View File

@ -0,0 +1,3 @@
"""MCP SSH Server module."""
__version__ = "0.1.0"

View File

@ -0,0 +1,6 @@
"""Main entry point for the MCP SSH server."""
from .server import main
if __name__ == "__main__":
main()

501
build/lib/mcpssh/server.py Normal file
View File

@ -0,0 +1,501 @@
"""MCP SSH Server implementation."""
import json
import logging
import os
import sys
from typing import Dict, List, Optional, Any, Iterator
import paramiko
from pydantic import BaseModel, Field
from mcp import types
from mcp.server import FastMCP
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SSHSession:
"""SSH Session that connects to a remote server and supports SFTP file operations."""
def __init__(self, hostname: str, port: int, username: str, key_filename: str):
self.hostname = hostname
self.port = port
self.username = username
self.key_filename = key_filename
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.connected = False
self.sftp = None
def connect(self) -> bool:
try:
self.client.connect(
hostname=self.hostname,
port=self.port,
username=self.username,
key_filename=self.key_filename
)
self.sftp = self.client.open_sftp()
self.connected = True
return True
except Exception as e:
logger.error(f"SSH connection error: {e}")
return False
def execute_command(self, command: str) -> Dict[str, Any]:
if not self.connected:
if not self.connect():
return {"stdout": "", "stderr": "Not connected to SSH server", "exit_code": -1}
try:
stdin, stdout, stderr = self.client.exec_command(command)
exit_code = stdout.channel.recv_exit_status()
return {
"stdout": stdout.read().decode("utf-8"),
"stderr": stderr.read().decode("utf-8"),
"exit_code": exit_code
}
except Exception as e:
logger.error(f"Command execution error: {e}")
return {"stdout": "", "stderr": str(e), "exit_code": -1}
def close(self) -> None:
if self.sftp:
self.sftp.close()
self.sftp = None
if self.connected:
self.client.close()
self.connected = False
# --- SFTP/SCP Methods ---
def get_disk_usage(self, remote_path: str = ".") -> Dict[str, Any]:
"""Get disk usage on the remote system using 'df -k'."""
result = self.execute_command(f"df -k {remote_path}")
if result["exit_code"] != 0:
return {"success": False, "error": result["stderr"]}
try:
lines = result["stdout"].splitlines()
if len(lines) < 2:
return {"success": False, "error": "Unexpected df output"}
parts = lines[1].split()
total_kb = int(parts[1])
used_kb = int(parts[2])
avail_kb = int(parts[3])
return {"success": True, "total": total_kb * 1024, "used": used_kb * 1024, "avail": avail_kb * 1024}
except Exception as e:
return {"success": False, "error": str(e)}
def list_directory(self, remote_path: str) -> Dict[str, Any]:
if not self.sftp:
return {"success": False, "error": "Not connected to SFTP"}
try:
files = self.sftp.listdir_attr(remote_path)
return {"success": True, "files": [{"filename": f.filename, "size": f.st_size, "mtime": f.st_mtime, "mode": f.st_mode} for f in files]}
except Exception as e:
return {"success": False, "error": str(e)}
def remove_file(self, remote_path: str) -> Dict[str, Any]:
if not self.sftp:
return {"success": False, "error": "Not connected to SFTP"}
try:
self.sftp.remove(remote_path)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
def upload_file(self, local_path: str, remote_path: str, max_size: int = 1073741824, on_conflict: str = "FAIL") -> Dict[str, Any]:
import os, time
if not self.sftp:
return {"success": False, "error": "Not connected to SFTP"}
try:
local_size = os.stat(local_path).st_size
if local_size > max_size:
return {"success": False, "error": f"File too large: {local_size} bytes > {max_size} bytes"}
disk = self.get_disk_usage(os.path.dirname(remote_path) or ".")
if not disk.get("success"):
return {"success": False, "error": "Failed to check remote disk space: " + disk.get("error", "")}
if disk["avail"] < local_size * 2:
return {"success": False, "error": f"Insufficient remote disk space: need {local_size*2} bytes, have {disk['avail']} bytes"}
# Conflict resolution
try:
self.sftp.stat(remote_path)
file_exists = True
except FileNotFoundError:
file_exists = False
if file_exists:
if on_conflict == "FAIL":
return {"success": False, "error": "Remote file exists"}
elif on_conflict == "OVERWRITE":
pass # allow overwrite
elif on_conflict == "RENAME":
base, ext = os.path.splitext(remote_path)
timestamp = time.strftime("(%Y%m%d%H%M%S)")
remote_path = f"{base}{timestamp}{ext}"
# Upload
try:
self.sftp.put(local_path, remote_path)
return {"success": True, "remote_path": remote_path, "bytes_transferred": local_size}
except Exception as e:
# Cleanup partial file
try:
self.sftp.remove(remote_path)
except Exception:
pass
return {"success": False, "error": f"Transfer failed: {e}"}
except Exception as e:
return {"success": False, "error": str(e)}
def download_file(self, remote_path: str, local_path: str, max_size: int = 1073741824, on_conflict: str = "FAIL") -> Dict[str, Any]:
import os, time
if not self.sftp:
return {"success": False, "error": "Not connected to SFTP"}
try:
fileinfo = self.sftp.stat(remote_path)
remote_size = fileinfo.st_size
if remote_size > max_size:
return {"success": False, "error": f"Remote file too large: {remote_size} bytes > {max_size} bytes"}
st = os.statvfs(os.path.dirname(local_path) or ".")
local_free = st.f_bavail * st.f_frsize
if local_free < remote_size * 2:
return {"success": False, "error": f"Insufficient local disk space: need {remote_size*2} bytes, have {local_free} bytes"}
# Conflict resolution
if os.path.exists(local_path):
if on_conflict == "FAIL":
return {"success": False, "error": "Local file exists"}
elif on_conflict == "OVERWRITE":
pass
elif on_conflict == "RENAME":
base, ext = os.path.splitext(local_path)
timestamp = time.strftime("(%Y%m%d%H%M%S)")
local_path = f"{base}{timestamp}{ext}"
# Download
try:
self.sftp.get(remote_path, local_path)
return {"success": True, "local_path": local_path, "bytes_transferred": remote_size}
except Exception as e:
# Cleanup partial file
try:
os.remove(local_path)
except Exception:
pass
return {"success": False, "error": f"Download failed: {e}"}
except Exception as e:
return {"success": False, "error": str(e)}
"""SSH Session that connects to a remote server."""
def __init__(self, hostname: str, port: int, username: str, key_filename: str):
"""Initialize SSH session.
Args:
hostname: Remote server hostname
port: SSH port
username: SSH username
key_filename: Path to SSH key file
"""
self.hostname = hostname
self.port = port
self.username = username
self.key_filename = key_filename
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.connected = False
def connect(self) -> bool:
"""Connect to SSH server.
Returns:
True if connection successful, False otherwise
"""
try:
self.client.connect(
hostname=self.hostname,
port=self.port,
username=self.username,
key_filename=self.key_filename
)
self.connected = True
return True
except Exception as e:
logger.error(f"SSH connection error: {e}")
return False
def execute_command(self, command: str) -> Dict[str, Any]:
"""Execute a command on the remote server.
Args:
command: Command to execute
Returns:
Dictionary with stdout, stderr, and exit_code
"""
if not self.connected:
if not self.connect():
return {"stdout": "", "stderr": "Not connected to SSH server", "exit_code": -1}
try:
stdin, stdout, stderr = self.client.exec_command(command)
exit_code = stdout.channel.recv_exit_status()
return {
"stdout": stdout.read().decode("utf-8"),
"stderr": stderr.read().decode("utf-8"),
"exit_code": exit_code
}
except Exception as e:
logger.error(f"Command execution error: {e}")
return {"stdout": "", "stderr": str(e), "exit_code": -1}
def close(self) -> None:
"""Close SSH connection."""
if self.connected:
self.client.close()
self.connected = False
class SSHConnectionParams(BaseModel):
"""Parameters for SSH connection."""
hostname: Optional[str] = Field(None, description="SSH server hostname or IP address")
port: Optional[int] = Field(None, description="SSH server port")
username: Optional[str] = Field(None, description="SSH username")
key_filename: Optional[str] = Field(None, description="Path to SSH private key file")
class FileTransferParams(BaseModel):
local_path: str
remote_path: str
max_size: Optional[int] = 1073741824 # 1GB default
on_conflict: str = "FAIL" # FAIL, OVERWRITE, RENAME
class ListDirParams(BaseModel):
remote_path: str
class RemoveFileParams(BaseModel):
remote_path: str
class DiskUsageParams(BaseModel):
remote_path: Optional[str] = "."
class CommandParams(BaseModel):
"""Parameters for executing a command."""
command: str = Field(..., description="Command to execute on remote server")
class SSHServerMCP(FastMCP):
"""MCP server that provides SSH and SFTP access to remote systems."""
def __init__(self, hostname=None, port=None, username=None, key_filename=None, server_name=None, tool_prefix=None):
# Get server name from environment variable or parameter
server_name = server_name or os.environ.get("MCP_SSH_SERVER_NAME")
# Get configuration from environment variables or passed parameters
self.default_hostname = hostname or os.environ.get("MCP_SSH_HOSTNAME")
self.default_port = port or int(os.environ.get("MCP_SSH_PORT", 22))
self.default_username = username or os.environ.get("MCP_SSH_USERNAME")
self.default_key_filename = key_filename or os.environ.get("MCP_SSH_KEY_FILENAME")
self.tool_prefix = tool_prefix or os.environ.get("MCP_SSH_TOOL_PREFIX", "")
connect_name = f"{self.tool_prefix}ssh_connect"
execute_name = f"{self.tool_prefix}ssh_execute"
disconnect_name = f"{self.tool_prefix}ssh_disconnect"
upload_name = f"{self.tool_prefix}scp_upload"
download_name = f"{self.tool_prefix}scp_download"
listdir_name = f"{self.tool_prefix}scp_listdir"
remove_name = f"{self.tool_prefix}scp_remove"
diskusage_name = f"{self.tool_prefix}scp_disk_usage"
self.add_tool(self.ssh_connect, name=connect_name, description=f"Connect to SSH server: {server_name}")
self.add_tool(self.ssh_execute, name=execute_name, description=f"Execute a command on SSH server: {server_name}")
self.add_tool(self.ssh_disconnect, name=disconnect_name, description=f"Disconnect from SSH server: {server_name}")
self.add_tool(self.scp_upload, name=upload_name, description=f"Upload a file via SCP/SFTP: {server_name}")
self.add_tool(self.scp_download, name=download_name, description=f"Download a file via SCP/SFTP: {server_name}")
self.add_tool(self.scp_listdir, name=listdir_name, description=f"List remote directory via SFTP: {server_name}")
self.add_tool(self.scp_remove, name=remove_name, description=f"Remove remote file via SFTP: {server_name}")
self.add_tool(self.scp_disk_usage, name=diskusage_name, description=f"Get remote disk usage via SFTP: {server_name}")
self.ssh_session = None
def ssh_connect(self, params: SSHConnectionParams) -> Dict[str, Any]:
hostname = self.default_hostname
username = self.default_username
key_filename = self.default_key_filename
port = self.default_port if self.default_port else params.port
if not all([hostname, username, key_filename]):
missing = []
if not hostname: missing.append("hostname")
if not username: missing.append("username")
if not key_filename: missing.append("key_filename")
return {"success": False, "message": f"Missing required parameters: {', '.join(missing)}"}
self.ssh_session = SSHSession(
hostname=hostname,
port=port,
username=username,
key_filename=key_filename
)
if self.ssh_session.connect():
return {"success": True, "message": f"Connected to {hostname}"}
else:
return {"success": False, "message": "Failed to connect to SSH server"}
def ssh_execute(self, params: CommandParams) -> Dict[str, Any]:
if not self.ssh_session or not self.ssh_session.connected:
return {"success": False, "message": "Not connected to SSH server"}
return self.ssh_session.execute_command(params.command)
def ssh_disconnect(self) -> Dict[str, Any]:
if not self.ssh_session:
return {"success": True, "message": "Not connected to SSH server"}
self.ssh_session.close()
return {"success": True, "message": "Disconnected from SSH server"}
# --- SFTP/SCP Tool Methods ---
def scp_upload(self, params: FileTransferParams) -> Dict[str, Any]:
if not self.ssh_session or not self.ssh_session.connected:
return {"success": False, "message": "Not connected to SSH server"}
return self.ssh_session.upload_file(
local_path=params.local_path,
remote_path=params.remote_path,
max_size=params.max_size or 1073741824,
on_conflict=params.on_conflict
)
def scp_download(self, params: FileTransferParams) -> Dict[str, Any]:
if not self.ssh_session or not self.ssh_session.connected:
return {"success": False, "message": "Not connected to SSH server"}
return self.ssh_session.download_file(
remote_path=params.remote_path,
local_path=params.local_path,
max_size=params.max_size or 1073741824,
on_conflict=params.on_conflict
)
def scp_listdir(self, params: ListDirParams) -> Dict[str, Any]:
if not self.ssh_session or not self.ssh_session.connected:
return {"success": False, "message": "Not connected to SSH server"}
return self.ssh_session.list_directory(params.remote_path)
def scp_remove(self, params: RemoveFileParams) -> Dict[str, Any]:
if not self.ssh_session or not self.ssh_session.connected:
return {"success": False, "message": "Not connected to SSH server"}
return self.ssh_session.remove_file(params.remote_path)
def scp_disk_usage(self, params: DiskUsageParams) -> Dict[str, Any]:
if not self.ssh_session or not self.ssh_session.connected:
return {"success": False, "message": "Not connected to SSH server"}
return self.ssh_session.get_disk_usage(params.remote_path)
"""MCP server that provides SSH access to remote systems."""
def __init__(self, hostname=None, port=None, username=None, key_filename=None, server_name=None, tool_prefix=None):
"""Initialize SSH server.
Args:
hostname: SSH server hostname from environment or config
port: SSH server port from environment or config
username: SSH username from environment or config
key_filename: Path to SSH key file from environment or config
server_name: Custom name for the server instance (for multiple servers)
tool_prefix: Prefix for tool names (e.g., 'server1_' for 'server1_ssh_connect')
"""
# Get server name from environment variable or parameter
server_name = server_name or os.environ.get("MCP_SSH_SERVER_NAME", "SSH Server")
super().__init__(name=server_name)
self.ssh_session: Optional[SSHSession] = None
# Get configuration from environment variables or passed parameters
self.default_hostname = hostname or os.environ.get("MCP_SSH_HOSTNAME")
self.default_port = port or int(os.environ.get("MCP_SSH_PORT", 22))
self.default_username = username or os.environ.get("MCP_SSH_USERNAME")
self.default_key_filename = key_filename or os.environ.get("MCP_SSH_KEY_FILENAME")
# Get tool prefix from parameter or environment variable
self.tool_prefix = tool_prefix or os.environ.get("MCP_SSH_TOOL_PREFIX", "")
# Register tools with optional prefix
connect_name = f"{self.tool_prefix}ssh_connect"
execute_name = f"{self.tool_prefix}ssh_execute"
disconnect_name = f"{self.tool_prefix}ssh_disconnect"
self.add_tool(self.ssh_connect, name=connect_name, description=f"Connect to SSH server: {server_name}")
self.add_tool(self.ssh_execute, name=execute_name, description=f"Execute a command on SSH server: {server_name}")
self.add_tool(self.ssh_disconnect, name=disconnect_name, description=f"Disconnect from SSH server: {server_name}")
def ssh_connect(self, params: SSHConnectionParams) -> Dict[str, Any]:
"""Connect to an SSH server.
Args:
params: SSH connection parameters (ignored for security-critical fields)
Returns:
Result of connection attempt
"""
# Always use config/environment variables for security-critical fields
hostname = self.default_hostname
username = self.default_username
key_filename = self.default_key_filename
# Only allow port to be optionally specified in the request
port = self.default_port if self.default_port else params.port
# Validate that we have all required parameters
if not all([hostname, username, key_filename]):
missing = []
if not hostname: missing.append("hostname")
if not username: missing.append("username")
if not key_filename: missing.append("key_filename")
return {"success": False, "message": f"Missing required parameters: {', '.join(missing)}"}
self.ssh_session = SSHSession(
hostname=hostname,
port=port,
username=username,
key_filename=key_filename
)
if self.ssh_session.connect():
return {"success": True, "message": f"Connected to {hostname}"}
else:
return {"success": False, "message": "Failed to connect to SSH server"}
def ssh_execute(self, params: CommandParams) -> Dict[str, Any]:
"""Execute a command on the SSH server.
Args:
params: Command parameters
Returns:
Command execution result
"""
if not self.ssh_session or not self.ssh_session.connected:
return {"success": False, "message": "Not connected to SSH server"}
return self.ssh_session.execute_command(params.command)
def ssh_disconnect(self) -> Dict[str, Any]:
"""Disconnect from the SSH server.
Returns:
Disconnect result
"""
if not self.ssh_session:
return {"success": True, "message": "Not connected to SSH server"}
self.ssh_session.close()
return {"success": True, "message": "Disconnected from SSH server"}
def main():
"""Run the MCP SSH server."""
# Get custom server name and tool prefix from environment if specified
server_name = os.environ.get("MCP_SSH_SERVER_NAME")
tool_prefix = os.environ.get("MCP_SSH_TOOL_PREFIX")
# Create server with config from environment variables
server = SSHServerMCP(
server_name=server_name,
tool_prefix=tool_prefix
)
# Run the server with stdio transport
server.run(transport="stdio")
if __name__ == "__main__":
main()

View File

@ -15,6 +15,172 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SSHSession:
"""SSH Session that connects to a remote server and supports SFTP file operations."""
def __init__(self, hostname: str, port: int, username: str, key_filename: str):
self.hostname = hostname
self.port = port
self.username = username
self.key_filename = key_filename
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.connected = False
self.sftp = None
def connect(self) -> bool:
try:
self.client.connect(
hostname=self.hostname,
port=self.port,
username=self.username,
key_filename=self.key_filename
)
self.sftp = self.client.open_sftp()
self.connected = True
return True
except Exception as e:
logger.error(f"SSH connection error: {e}")
return False
def execute_command(self, command: str) -> Dict[str, Any]:
if not self.connected:
if not self.connect():
return {"stdout": "", "stderr": "Not connected to SSH server", "exit_code": -1}
try:
stdin, stdout, stderr = self.client.exec_command(command)
exit_code = stdout.channel.recv_exit_status()
return {
"stdout": stdout.read().decode("utf-8"),
"stderr": stderr.read().decode("utf-8"),
"exit_code": exit_code
}
except Exception as e:
logger.error(f"Command execution error: {e}")
return {"stdout": "", "stderr": str(e), "exit_code": -1}
def close(self) -> None:
if self.sftp:
self.sftp.close()
self.sftp = None
if self.connected:
self.client.close()
self.connected = False
# --- SFTP/SCP Methods ---
def get_disk_usage(self, remote_path: str = ".") -> Dict[str, Any]:
"""Get disk usage on the remote system using 'df -k'."""
result = self.execute_command(f"df -k {remote_path}")
if result["exit_code"] != 0:
return {"success": False, "error": result["stderr"]}
try:
lines = result["stdout"].splitlines()
if len(lines) < 2:
return {"success": False, "error": "Unexpected df output"}
parts = lines[1].split()
total_kb = int(parts[1])
used_kb = int(parts[2])
avail_kb = int(parts[3])
return {"success": True, "total": total_kb * 1024, "used": used_kb * 1024, "avail": avail_kb * 1024}
except Exception as e:
return {"success": False, "error": str(e)}
def list_directory(self, remote_path: str) -> Dict[str, Any]:
if not self.sftp:
return {"success": False, "error": "Not connected to SFTP"}
try:
files = self.sftp.listdir_attr(remote_path)
return {"success": True, "files": [{"filename": f.filename, "size": f.st_size, "mtime": f.st_mtime, "mode": f.st_mode} for f in files]}
except Exception as e:
return {"success": False, "error": str(e)}
def remove_file(self, remote_path: str) -> Dict[str, Any]:
if not self.sftp:
return {"success": False, "error": "Not connected to SFTP"}
try:
self.sftp.remove(remote_path)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
def upload_file(self, local_path: str, remote_path: str, max_size: int = 1073741824, on_conflict: str = "FAIL") -> Dict[str, Any]:
import os, time
if not self.sftp:
return {"success": False, "error": "Not connected to SFTP"}
try:
local_size = os.stat(local_path).st_size
if local_size > max_size:
return {"success": False, "error": f"File too large: {local_size} bytes > {max_size} bytes"}
disk = self.get_disk_usage(os.path.dirname(remote_path) or ".")
if not disk.get("success"):
return {"success": False, "error": "Failed to check remote disk space: " + disk.get("error", "")}
if disk["avail"] < local_size * 2:
return {"success": False, "error": f"Insufficient remote disk space: need {local_size*2} bytes, have {disk['avail']} bytes"}
# Conflict resolution
try:
self.sftp.stat(remote_path)
file_exists = True
except FileNotFoundError:
file_exists = False
if file_exists:
if on_conflict == "FAIL":
return {"success": False, "error": "Remote file exists"}
elif on_conflict == "OVERWRITE":
pass # allow overwrite
elif on_conflict == "RENAME":
base, ext = os.path.splitext(remote_path)
timestamp = time.strftime("(%Y%m%d%H%M%S)")
remote_path = f"{base}{timestamp}{ext}"
# Upload
try:
self.sftp.put(local_path, remote_path)
return {"success": True, "remote_path": remote_path, "bytes_transferred": local_size}
except Exception as e:
# Cleanup partial file
try:
self.sftp.remove(remote_path)
except Exception:
pass
return {"success": False, "error": f"Transfer failed: {e}"}
except Exception as e:
return {"success": False, "error": str(e)}
def download_file(self, remote_path: str, local_path: str, max_size: int = 1073741824, on_conflict: str = "FAIL") -> Dict[str, Any]:
import os, time
if not self.sftp:
return {"success": False, "error": "Not connected to SFTP"}
try:
fileinfo = self.sftp.stat(remote_path)
remote_size = fileinfo.st_size
if remote_size > max_size:
return {"success": False, "error": f"Remote file too large: {remote_size} bytes > {max_size} bytes"}
st = os.statvfs(os.path.dirname(local_path) or ".")
local_free = st.f_bavail * st.f_frsize
if local_free < remote_size * 2:
return {"success": False, "error": f"Insufficient local disk space: need {remote_size*2} bytes, have {local_free} bytes"}
# Conflict resolution
if os.path.exists(local_path):
if on_conflict == "FAIL":
return {"success": False, "error": "Local file exists"}
elif on_conflict == "OVERWRITE":
pass
elif on_conflict == "RENAME":
base, ext = os.path.splitext(local_path)
timestamp = time.strftime("(%Y%m%d%H%M%S)")
local_path = f"{base}{timestamp}{ext}"
# Download
try:
self.sftp.get(remote_path, local_path)
return {"success": True, "local_path": local_path, "bytes_transferred": remote_size}
except Exception as e:
# Cleanup partial file
try:
os.remove(local_path)
except Exception:
pass
return {"success": False, "error": f"Download failed: {e}"}
except Exception as e:
return {"success": False, "error": str(e)}
"""SSH Session that connects to a remote server."""
def __init__(self, hostname: str, port: int, username: str, key_filename: str):
@ -95,6 +261,21 @@ class SSHConnectionParams(BaseModel):
key_filename: Optional[str] = Field(None, description="Path to SSH private key file")
class FileTransferParams(BaseModel):
local_path: str
remote_path: str
max_size: Optional[int] = 1073741824 # 1GB default
on_conflict: str = "FAIL" # FAIL, OVERWRITE, RENAME
class ListDirParams(BaseModel):
remote_path: str
class RemoveFileParams(BaseModel):
remote_path: str
class DiskUsageParams(BaseModel):
remote_path: Optional[str] = "."
class CommandParams(BaseModel):
"""Parameters for executing a command."""
@ -102,6 +283,104 @@ class CommandParams(BaseModel):
class SSHServerMCP(FastMCP):
"""MCP server that provides SSH and SFTP access to remote systems."""
def __init__(self, hostname=None, port=None, username=None, key_filename=None, server_name=None, tool_prefix=None):
# Get server name from environment variable or parameter
server_name = server_name or os.environ.get("MCP_SSH_SERVER_NAME")
# Get configuration from environment variables or passed parameters
self.default_hostname = hostname or os.environ.get("MCP_SSH_HOSTNAME")
self.default_port = port or int(os.environ.get("MCP_SSH_PORT", 22))
self.default_username = username or os.environ.get("MCP_SSH_USERNAME")
self.default_key_filename = key_filename or os.environ.get("MCP_SSH_KEY_FILENAME")
self.tool_prefix = tool_prefix or os.environ.get("MCP_SSH_TOOL_PREFIX", "")
connect_name = f"{self.tool_prefix}ssh_connect"
execute_name = f"{self.tool_prefix}ssh_execute"
disconnect_name = f"{self.tool_prefix}ssh_disconnect"
upload_name = f"{self.tool_prefix}scp_upload"
download_name = f"{self.tool_prefix}scp_download"
listdir_name = f"{self.tool_prefix}scp_listdir"
remove_name = f"{self.tool_prefix}scp_remove"
diskusage_name = f"{self.tool_prefix}scp_diskusage"
self.add_tool(self.ssh_connect, name=connect_name, description=f"Connect to SSH server: {server_name}")
self.add_tool(self.ssh_execute, name=execute_name, description=f"Execute a command on SSH server: {server_name}")
self.add_tool(self.ssh_disconnect, name=disconnect_name, description=f"Disconnect from SSH server: {server_name}")
self.add_tool(self.scp_upload, name=upload_name, description=f"Upload a file via SFTP to {server_name}")
self.add_tool(self.scp_download, name=download_name, description=f"Download a file via SFTP from {server_name}")
self.add_tool(self.scp_listdir, name=listdir_name, description=f"List directory contents via SFTP on {server_name}")
self.add_tool(self.scp_remove, name=remove_name, description=f"Remove a file via SFTP on {server_name}")
self.add_tool(self.scp_diskusage, name=diskusage_name, description=f"Get disk usage via SFTP on {server_name}")
self.ssh_session = None
def ssh_connect(self, params: SSHConnectionParams) -> Dict[str, Any]:
hostname = self.default_hostname
username = self.default_username
key_filename = self.default_key_filename
port = self.default_port if self.default_port else params.port
if not all([hostname, username, key_filename]):
missing = []
if not hostname: missing.append("hostname")
if not username: missing.append("username")
if not key_filename: missing.append("key_filename")
return {"success": False, "message": f"Missing required parameters: {', '.join(missing)}"}
self.ssh_session = SSHSession(
hostname=hostname,
port=port,
username=username,
key_filename=key_filename
)
if self.ssh_session.connect():
return {"success": True, "message": f"Connected to {hostname}"}
else:
return {"success": False, "message": "Failed to connect to SSH server"}
def ssh_execute(self, params: CommandParams) -> Dict[str, Any]:
if not self.ssh_session or not self.ssh_session.connected:
return {"success": False, "message": "Not connected to SSH server"}
return self.ssh_session.execute_command(params.command)
def ssh_disconnect(self) -> Dict[str, Any]:
if not self.ssh_session:
return {"success": True, "message": "Not connected to SSH server"}
self.ssh_session.close()
return {"success": True, "message": "Disconnected from SSH server"}
# --- SFTP/SCP Tool Methods ---
def scp_upload(self, params: FileTransferParams) -> Dict[str, Any]:
if not self.ssh_session or not self.ssh_session.connected:
return {"success": False, "message": "Not connected to SSH server"}
return self.ssh_session.upload_file(
local_path=params.local_path,
remote_path=params.remote_path,
max_size=params.max_size or 1073741824,
on_conflict=params.on_conflict
)
def scp_download(self, params: FileTransferParams) -> Dict[str, Any]:
if not self.ssh_session or not self.ssh_session.connected:
return {"success": False, "message": "Not connected to SSH server"}
return self.ssh_session.download_file(
remote_path=params.remote_path,
local_path=params.local_path,
max_size=params.max_size or 1073741824,
on_conflict=params.on_conflict
)
def scp_listdir(self, params: ListDirParams) -> Dict[str, Any]:
if not self.ssh_session or not self.ssh_session.connected:
return {"success": False, "message": "Not connected to SSH server"}
return self.ssh_session.list_directory(params.remote_path)
def scp_remove(self, params: RemoveFileParams) -> Dict[str, Any]:
if not self.ssh_session or not self.ssh_session.connected:
return {"success": False, "message": "Not connected to SSH server"}
return self.ssh_session.remove_file(params.remote_path)
def scp_diskusage(self, params: DiskUsageParams) -> Dict[str, Any]:
if not self.ssh_session or not self.ssh_session.connected:
return {"success": False, "message": "Not connected to SSH server"}
return self.ssh_session.get_disk_usage(params.remote_path)
"""MCP server that provides SSH access to remote systems."""
def __init__(self, hostname=None, port=None, username=None, key_filename=None, server_name=None, tool_prefix=None):
@ -133,10 +412,20 @@ class SSHServerMCP(FastMCP):
connect_name = f"{self.tool_prefix}ssh_connect"
execute_name = f"{self.tool_prefix}ssh_execute"
disconnect_name = f"{self.tool_prefix}ssh_disconnect"
upload_name = f"{self.tool_prefix}scp_upload"
download_name = f"{self.tool_prefix}scp_download"
listdir_name = f"{self.tool_prefix}scp_listdir"
remove_name = f"{self.tool_prefix}scp_remove"
diskusage_name = f"{self.tool_prefix}scp_diskusage"
self.add_tool(self.ssh_connect, name=connect_name, description=f"Connect to SSH server: {server_name}")
self.add_tool(self.ssh_execute, name=execute_name, description=f"Execute a command on SSH server: {server_name}")
self.add_tool(self.ssh_disconnect, name=disconnect_name, description=f"Disconnect from SSH server: {server_name}")
self.add_tool(self.scp_upload, name=upload_name, description=f"Upload a file via SFTP to {server_name}")
self.add_tool(self.scp_download, name=download_name, description=f"Download a file via SFTP from {server_name}")
self.add_tool(self.scp_listdir, name=listdir_name, description=f"List directory contents via SFTP on {server_name}")
self.add_tool(self.scp_remove, name=remove_name, description=f"Remove a file via SFTP on {server_name}")
self.add_tool(self.scp_diskusage, name=diskusage_name, description=f"Get disk usage via SFTP on {server_name}")
def ssh_connect(self, params: SSHConnectionParams) -> Dict[str, Any]:
"""Connect to an SSH server.
@ -201,9 +490,121 @@ class SSHServerMCP(FastMCP):
self.ssh_session.close()
return {"success": True, "message": "Disconnected from SSH server"}
async def run_stdio_async(self) -> None:
"""Run the server using stdio transport and exit when client disconnects.
The server detects client disconnection by monitoring polling messages.
If several consecutive polling messages are missed, the server assumes
the client has disconnected and shuts down.
"""
from mcp.server.stdio import stdio_server
import asyncio
import sys
import time
import signal
# Register signal handlers for clean shutdown
def handle_signal(signum, frame):
logger.info(f"Received signal {signum}, shutting down")
sys.exit(0)
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
# Track the last time we received a polling message
last_poll_time = time.time()
MISSED_POLLS_THRESHOLD = 3 # Number of missed polls before shutdown
POLL_INTERVAL = 60 # Expected polling interval in seconds
# Create a message interceptor to track polling messages
class MessageTracker:
def __init__(self):
self.last_poll_time = time.time()
def track_message(self, message_data):
try:
# Check if this is a polling message
if isinstance(message_data, dict) and "method" in message_data:
method = message_data.get("method", "")
if method in ["resources/list", "prompts/list"]:
self.last_poll_time = time.time()
logger.debug(f"Received polling message: {method}")
except Exception as e:
logger.error(f"Error tracking message: {e}")
message_tracker = MessageTracker()
# Patch the MCP server to track messages
original_handle_message = self._mcp_server._handle_message
async def patched_handle_message(*args, **kwargs):
# Intercept and track polling messages
# message_data may be the first or second positional argument
message_data = None
if len(args) > 1:
message_data = args[1]
elif args:
message_data = args[0]
else:
message_data = kwargs.get("message_data")
if isinstance(message_data, dict):
message_tracker.track_message(message_data)
# Forward all args to the original handler
return await original_handle_message(*args, **kwargs)
self._mcp_server._handle_message = patched_handle_message
async with stdio_server() as (read_stream, write_stream):
# Create a task for the server run
server_task = asyncio.create_task(
self._mcp_server.run(
read_stream,
write_stream,
self._mcp_server.create_initialization_options(),
raise_exceptions=True,
)
)
# Create a task to monitor for missed polls
async def monitor_polls():
# Give the client time to connect and start polling
await asyncio.sleep(POLL_INTERVAL * 2)
while True:
current_time = time.time()
time_since_last_poll = current_time - message_tracker.last_poll_time
# If we've missed too many polls, shut down
if time_since_last_poll > POLL_INTERVAL * MISSED_POLLS_THRESHOLD:
logger.info(f"No polling messages for {time_since_last_poll:.1f} seconds "
f"(threshold: {POLL_INTERVAL * MISSED_POLLS_THRESHOLD} seconds), "
f"assuming client disconnected")
sys.exit(0)
# Check every 10 seconds
await asyncio.sleep(10)
# Start the poll monitor
monitor_task = asyncio.create_task(monitor_polls())
try:
# Wait for the server task to complete
await server_task
logger.info("MCP server completed normally")
except Exception as e:
# If we get an exception, it's likely because the client disconnected
logger.info(f"MCP server exiting: {e}")
sys.exit(0)
def main():
"""Run the MCP SSH server."""
"""Run the MCP SSH server.
The server will automatically terminate when the client disconnects.
It detects disconnection by monitoring client polling messages,
which normally occur once per minute. If several consecutive polls
are missed, the server assumes the client has disconnected and exits.
"""
# Get custom server name and tool prefix from environment if specified
server_name = os.environ.get("MCP_SSH_SERVER_NAME")
tool_prefix = os.environ.get("MCP_SSH_TOOL_PREFIX")
@ -215,6 +616,8 @@ def main():
)
# Run the server with stdio transport
# The server will automatically terminate when the client disconnects
logger.info("Starting MCP SSH server (will terminate after 3 missed client polls)")
server.run(transport="stdio")

117
test_mcp_timeout.py Executable file
View File

@ -0,0 +1,117 @@
#!/usr/bin/env python
"""
Test script that simulates an MCP client using the SSH server.
This helps verify that the server properly responds to MCP requests.
Includes a timeout to automatically exit after a few seconds.
"""
import json
import os
import sys
import subprocess
import tempfile
import time
import signal
# Set required environment variables for the subprocess
env = os.environ.copy()
env["MCP_SSH_HOSTNAME"] = "10.0.1.232"
env["MCP_SSH_USERNAME"] = "stwhite"
env["MCP_SSH_KEY_FILENAME"] = "~/.ssh/id_ed25519"
# Create temporary files for input/output
with tempfile.NamedTemporaryFile('w+') as input_file, tempfile.NamedTemporaryFile('w+') as output_file:
# Write an MCP request to connect to SSH server
mcp_request = {
"type": "request",
"id": "1",
"method": "ssh_connect",
"params": {}
}
input_file.write(json.dumps(mcp_request) + "\n")
input_file.flush()
# Run the MCP server process with stdin/stdout redirected
try:
print("Starting MCP SSH server...")
# Create a pipe for stdin
r, w = os.pipe()
r_file = os.fdopen(r, 'r')
w_file = os.fdopen(w, 'w')
# Copy the input file content to the pipe
with open(input_file.name, 'r') as f:
content = f.read()
w_file.write(content)
w_file.flush()
process = subprocess.Popen(
[sys.executable, "-m", "mcpssh"],
env=env,
stdin=r_file,
stdout=open(output_file.name, 'w'),
stderr=subprocess.PIPE,
text=True
)
# Give it a moment to start
print("Waiting for server to start...")
time.sleep(2)
# Now simulate client disconnect by closing the write end of the pipe
print("Simulating client disconnect by closing stdin pipe...")
w_file.close()
# Wait for the server to detect the disconnect and terminate
print("Waiting for server to detect disconnect and terminate...")
try:
process.wait(timeout=10)
print(f"Server exited with code: {process.returncode}")
except subprocess.TimeoutExpired:
print("Server did not terminate within timeout, killing it...")
process.kill()
process.wait()
print(f"Server killed, exit code: {process.returncode}")
# Read stderr output
stderr_output = process.stderr.read()
if stderr_output:
print("Server stderr output:")
print(stderr_output)
# Read server's response
output_file.seek(0)
response_lines = output_file.readlines()
if not response_lines:
print("No response received from server. This likely indicates an initialization issue.")
else:
for line in response_lines:
try:
response = json.loads(line.strip())
print("Server response:")
print(json.dumps(response, indent=2))
except json.JSONDecodeError:
print(f"Non-JSON response: {line.strip()}")
except Exception as e:
print(f"Error running MCP server: {e}")
import traceback
traceback.print_exc()
print("\nTest complete.")
print("For Claude Desktop, use this configuration:")
print("""
"mcpssh": {
"command": "/Volumes/SAM2/CODE/MCP/mcpssh/venv/bin/python",
"args": [
"-m",
"mcpssh"
],
"env": {
"MCP_SSH_HOSTNAME": "10.0.1.232",
"MCP_SSH_USERNAME": "stwhite",
"MCP_SSH_KEY_FILENAME": "~/.ssh/id_ed25519"
}
}
""")