asyncssh is a pure-Python asyncio SSH client and server library. pip install asyncssh. Connect: async with asyncssh.connect("host", username="user", password="pass") as conn:. Run command: result = await conn.run("ls -la"); result.stdout. Key auth: asyncssh.connect(host, username="user", client_keys=["~/.ssh/id_rsa"]). known_hosts: asyncssh.connect(host, known_hosts=None) (disable) or known_hosts="~/.ssh/known_hosts". SSHClient: subclass with connection_made/connection_lost/auth_completed. SSHServer: asyncssh.create_server(MyServer, "", 8022). SSHServerSession: handle_input/eof_received/exit_status_requested. SFTP get: async with conn.start_client_session() as sftp: await sftp.get("remote.txt", "local.txt"). SFTP put: await sftp.put("local.txt", "remote.txt"). listdir: await sftp.listdir("path"). mkdir/remove/stat/rename: sftp methods. forward_local_port: await conn.forward_local_port("", local_port, remote_host, remote_port). Tunnel: async with conn.forward_local_port(...) as listener:. process: proc = await conn.create_process("cmd"); await proc.communicate(input_data). Multiple hosts: asyncssh.connect_to(). gather: await asyncio.gather(*[run_cmd(h) for h in hosts]). Timeout: asyncio.wait_for(conn.run("cmd"), timeout=30). Claude Code generates asyncssh automation scripts, parallel SSH runners, SFTP sync tools, and SSH tunnel managers.
CLAUDE.md for asyncssh
## asyncssh Stack
- Version: asyncssh >= 2.14 | pip install asyncssh
- Connect: async with asyncssh.connect(host, username=u, password=p) as conn:
- Run: result = await conn.run("cmd") | result.stdout | result.exit_status
- SFTP: async with conn.start_client_session() as sftp: await sftp.get/put/listdir
- Key auth: client_keys=["~/.ssh/id_rsa"] | known_hosts=None to skip verification
- Parallel: asyncio.gather(*[run_on(host, cmd) for host in hosts])
asyncssh SSH Automation Pipeline
# app/ssh.py — asyncssh connect, run, SFTP, tunnel, parallel, server
from __future__ import annotations
import asyncio
import logging
import os
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, AsyncIterator, Callable
import asyncssh
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 1. Connection helpers
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class SSHConfig:
host: str
username: str
port: int = 22
password: str | None = None
client_keys: list[str] = field(default_factory=list)
known_hosts: str | None = None # None = disable verification
connect_timeout: float = 15.0
keepalive_interval: float = 30.0
@asynccontextmanager
async def ssh_connect(cfg: SSHConfig) -> AsyncIterator[asyncssh.SSHClientConnection]:
"""
Async context manager for an SSH connection.
Example:
async with ssh_connect(SSHConfig("myhost", "deploy")) as conn:
result = await conn.run("hostname")
print(result.stdout.strip())
"""
connect_kwargs: dict[str, Any] = {
"host": cfg.host,
"port": cfg.port,
"username": cfg.username,
"known_hosts": cfg.known_hosts,
"connect_timeout": cfg.connect_timeout,
"keepalive_interval": cfg.keepalive_interval,
}
if cfg.password:
connect_kwargs["password"] = cfg.password
if cfg.client_keys:
connect_kwargs["client_keys"] = cfg.client_keys
async with asyncssh.connect(**connect_kwargs) as conn:
yield conn
# ─────────────────────────────────────────────────────────────────────────────
# 2. Command execution
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class CommandResult:
host: str
command: str
stdout: str
stderr: str
exit_status: int
@property
def ok(self) -> bool:
return self.exit_status == 0
def __str__(self) -> str:
return self.stdout.strip()
async def run_command(
conn: asyncssh.SSHClientConnection,
command: str,
host: str = "",
timeout: float | None = None,
check: bool = False,
env: dict[str, str] | None = None,
) -> CommandResult:
"""
Run a command over an established SSH connection.
Example:
result = await run_command(conn, "df -h /", check=True)
print(result.stdout)
"""
kwargs: dict[str, Any] = {}
if timeout:
kwargs["timeout"] = timeout
if env:
kwargs["env"] = env
result = await conn.run(command, **kwargs)
cmd_result = CommandResult(
host=host or str(conn._host), # type: ignore[attr-defined]
command=command,
stdout=result.stdout or "",
stderr=result.stderr or "",
exit_status=result.exit_status or 0,
)
if check and not cmd_result.ok:
raise RuntimeError(
f"Command failed on {cmd_result.host}: {command!r}\n"
f"exit={cmd_result.exit_status}\nstderr={cmd_result.stderr}"
)
return cmd_result
async def run_script(
conn: asyncssh.SSHClientConnection,
script: str,
interpreter: str = "/bin/bash",
host: str = "",
) -> CommandResult:
"""
Run a multi-line shell script via stdin piping.
Example:
result = await run_script(conn, '''
set -e
cd /app
git pull
systemctl restart myapp
''')
"""
process = await conn.create_process(interpreter)
stdout, stderr = await process.communicate(script)
return CommandResult(
host=host,
command=f"<script via {interpreter}>",
stdout=stdout or "",
stderr=stderr or "",
exit_status=process.exit_status or 0,
)
async def run_commands(
conn: asyncssh.SSHClientConnection,
commands: list[str],
host: str = "",
stop_on_error: bool = True,
) -> list[CommandResult]:
"""
Run multiple commands sequentially on a connection.
Example:
results = await run_commands(conn, ["mkdir -p /app", "git clone ...", "make install"])
"""
results: list[CommandResult] = []
for cmd in commands:
result = await run_command(conn, cmd, host=host)
results.append(result)
if stop_on_error and not result.ok:
log.error("Command failed: %s (exit %d)", cmd, result.exit_status)
break
return results
# ─────────────────────────────────────────────────────────────────────────────
# 3. One-shot helpers (connect + run + disconnect)
# ─────────────────────────────────────────────────────────────────────────────
async def quick_run(
cfg: SSHConfig,
command: str,
timeout: float = 30.0,
) -> CommandResult:
"""
Open a connection, run one command, close.
Example:
result = await quick_run(SSHConfig("host","user"), "uptime")
print(result.stdout)
"""
async with ssh_connect(cfg) as conn:
return await run_command(conn, command, host=cfg.host, timeout=timeout)
async def parallel_run(
configs: list[SSHConfig],
command: str,
timeout: float = 30.0,
concurrency: int = 10,
) -> list[CommandResult]:
"""
Run the same command on multiple hosts in parallel.
Example:
configs = [SSHConfig(h, "ubuntu") for h in ["10.0.0.1","10.0.0.2","10.0.0.3"]]
results = await parallel_run(configs, "uptime")
for r in results:
print(f"{r.host}: {r.stdout.strip()}")
"""
semaphore = asyncio.Semaphore(concurrency)
async def _one(cfg: SSHConfig) -> CommandResult:
async with semaphore:
try:
return await quick_run(cfg, command, timeout=timeout)
except Exception as exc:
return CommandResult(
host=cfg.host, command=command,
stdout="", stderr=str(exc), exit_status=-1,
)
return list(await asyncio.gather(*[_one(c) for c in configs]))
# ─────────────────────────────────────────────────────────────────────────────
# 4. SFTP helpers
# ─────────────────────────────────────────────────────────────────────────────
async def sftp_upload(
conn: asyncssh.SSHClientConnection,
local_path: str | Path,
remote_path: str,
preserve: bool = False,
) -> None:
"""
Upload a local file to the remote host.
Example:
async with ssh_connect(cfg) as conn:
await sftp_upload(conn, "dist/app.tar.gz", "/srv/releases/app.tar.gz")
"""
async with conn.start_client_session(asyncssh.SFTPClient) as sftp:
await sftp.put(str(local_path), remote_path, preserve=preserve)
async def sftp_download(
conn: asyncssh.SSHClientConnection,
remote_path: str,
local_path: str | Path,
preserve: bool = False,
) -> Path:
"""
Download a file from the remote host.
Example:
await sftp_download(conn, "/var/log/app.log", "logs/app.log")
"""
local = Path(local_path)
local.parent.mkdir(parents=True, exist_ok=True)
async with conn.start_client_session(asyncssh.SFTPClient) as sftp:
await sftp.get(remote_path, str(local), preserve=preserve)
return local
async def sftp_list(
conn: asyncssh.SSHClientConnection,
remote_dir: str,
) -> list[dict]:
"""
List files in a remote directory.
Example:
files = await sftp_list(conn, "/srv/releases/")
names = [f["name"] for f in files]
"""
async with conn.start_client_session(asyncssh.SFTPClient) as sftp:
entries = await sftp.readdir(remote_dir)
return [
{
"name": e.filename,
"size": e.attrs.size,
"mtime": e.attrs.mtime,
"is_dir": e.attrs.type == asyncssh.FILETYPE_DIRECTORY,
}
for e in entries
]
async def sftp_sync(
conn: asyncssh.SSHClientConnection,
local_dir: str | Path,
remote_dir: str,
pattern: str = "**/*",
) -> list[str]:
"""
Upload all matching local files to the remote directory.
Returns list of uploaded remote paths.
Example:
uploaded = await sftp_sync(conn, "dist/", "/var/www/html/", "**/*.html")
"""
local = Path(local_dir)
uploaded: list[str] = []
async with conn.start_client_session(asyncssh.SFTPClient) as sftp:
for src in local.glob(pattern):
if src.is_file():
relative = src.relative_to(local)
remote = f"{remote_dir.rstrip('/')}/{relative.as_posix()}"
remote_parent = "/".join(remote.split("/")[:-1])
try:
await sftp.mkdir(remote_parent)
except asyncssh.SFTPError:
pass # directory already exists
await sftp.put(str(src), remote)
uploaded.append(remote)
return uploaded
# ─────────────────────────────────────────────────────────────────────────────
# 5. Port forwarding / tunneling
# ─────────────────────────────────────────────────────────────────────────────
@asynccontextmanager
async def ssh_tunnel(
cfg: SSHConfig,
remote_host: str,
remote_port: int,
local_port: int = 0,
) -> AsyncIterator[tuple[asyncssh.SSHClientConnection, int]]:
"""
Create an SSH tunnel and yield (connection, local_port).
If local_port=0, the OS picks an available port.
Example:
async with ssh_tunnel(cfg, "internal-db.private", 5432) as (conn, port):
# Connect to localhost:port — traffic tunnels through SSH to internal-db:5432
engine = create_engine(f"postgresql://user:pass@localhost:{port}/mydb")
"""
async with ssh_connect(cfg) as conn:
listener = await conn.forward_local_port(
"", local_port, remote_host, remote_port
)
bound_port = listener.get_port()
log.info(
"SSH tunnel: localhost:%d → %s:%d via %s",
bound_port, remote_host, remote_port, cfg.host,
)
try:
yield conn, bound_port
finally:
listener.close()
await listener.wait_closed()
# ─────────────────────────────────────────────────────────────────────────────
# 6. Deploy helper
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class DeployConfig:
ssh: SSHConfig
artifact: str | Path # local tarball or directory
remote_dir: str # e.g. "/srv/releases"
deploy_name: str # e.g. "app-v1.2.3"
post_deploy: list[str] = field(default_factory=list) # commands to run after upload
async def deploy(dcfg: DeployConfig) -> list[CommandResult]:
"""
Upload an artifact and run post-deploy commands on the remote.
Example:
results = await deploy(DeployConfig(
ssh=SSHConfig("prod.example.com", "deploy"),
artifact="dist/app-1.2.3.tar.gz",
remote_dir="/srv/releases",
deploy_name="app-1.2.3",
post_deploy=[
"cd /srv && tar -xzf releases/app-1.2.3.tar.gz",
"ln -sfn /srv/app-1.2.3 /srv/app-current",
"systemctl restart app",
],
))
"""
async with ssh_connect(dcfg.ssh) as conn:
remote_path = f"{dcfg.remote_dir.rstrip('/')}/{Path(str(dcfg.artifact)).name}"
# Ensure release directory exists
await run_command(conn, f"mkdir -p {dcfg.remote_dir}", host=dcfg.ssh.host)
# Upload
log.info("Uploading %s → %s:%s", dcfg.artifact, dcfg.ssh.host, remote_path)
await sftp_upload(conn, dcfg.artifact, remote_path)
# Post-deploy commands
return await run_commands(conn, dcfg.post_deploy, host=dcfg.ssh.host, stop_on_error=True)
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
async def _demo() -> None:
"""
Demo: connect to localhost (requires SSH running and key auth configured).
Safe to run — uses localhost only.
"""
cfg = SSHConfig(
host="127.0.0.1",
username=os.getenv("USER", "user"),
known_hosts=None, # skip known_hosts for demo
connect_timeout=5.0,
)
print("=== asyncssh demo (localhost) ===")
print("NOTE: requires SSH server on localhost with key auth for current user")
try:
async with ssh_connect(cfg) as conn:
print("\n--- run_command ---")
r = await run_command(conn, "echo hello && uname -s", host=cfg.host)
print(f" stdout: {r.stdout.strip()!r}")
print(f" exit: {r.exit_status}")
print("\n--- run_commands ---")
results = await run_commands(conn, ["pwd", "whoami"], host=cfg.host)
for res in results:
print(f" {res.command!r:12s}: {res.stdout.strip()!r}")
except (asyncssh.Error, OSError) as exc:
print(f" SSH connection failed (expected in demo): {exc}")
print(" In production: configure host, username, and key/password.")
print("\n=== parallel_run pattern ===")
print(" configs = [SSHConfig(h, 'ubuntu') for h in hosts]")
print(" results = await parallel_run(configs, 'uptime', concurrency=20)")
print("\n=== SFTP pattern ===")
print(" async with ssh_connect(cfg) as conn:")
print(" await sftp_upload(conn, 'dist/app.tar.gz', '/srv/releases/app.tar.gz')")
print(" files = await sftp_list(conn, '/srv/releases/')")
print("\n=== tunnel pattern ===")
print(" async with ssh_tunnel(cfg, 'internal-db', 5432) as (conn, port):")
print(" # connect to localhost:port")
if __name__ == "__main__":
asyncio.run(_demo())
For the paramiko alternative — paramiko is the most widely used Python SSH library, synchronous/threaded, with broad compatibility across Python versions and SSH implementations; asyncssh is async-native (asyncio), typically 3–5× faster for high-concurrency workloads (100+ parallel connections), and supports newer SSH extensions like OpenSSH agent forwarding and SFTP v6 — use paramiko when integrating with existing synchronous codebases or when async is not an option, asyncssh for async services, parallel automation scripts, and new greenfield SSH tooling. For the fabric alternative — Fabric is a higher-level deployment library built on top of paramiko that provides connection pools, task decorators (@task), role-based host targeting, and a CLI runner; asyncssh is a lower-level protocol library without the deployment-workflow abstractions — use Fabric when you want a deployment framework with task definitions and fab deploy CLI conventions, asyncssh when you need raw protocol control, custom SSH servers, port forwarding, or high-concurrency parallel execution. The Claude Skills 360 bundle includes asyncssh skill sets covering SSHConfig/ssh_connect() context manager, run_command()/run_script()/run_commands(), quick_run()/parallel_run() with semaphore concurrency, sftp_upload()/sftp_download()/sftp_list()/sftp_sync(), ssh_tunnel() port forwarding context manager, and deploy() full artifact-upload-and-restart pipeline. Start with the free tier to try async SSH automation and DevOps scripting code generation.