Fabric automates remote SSH tasks. pip install fabric. Connect: from fabric import Connection; c = Connection("user@host"). Run: c.run("ls -la"). Sudo: c.sudo("systemctl restart nginx"). Upload: c.put("local.txt", "/remote/path/file.txt"). Download: c.get("/remote/log.txt", "local_log.txt"). Local: c.local("pytest tests/"). cd context: with c.cd("/var/www"): c.run("git pull"). Prefix: with c.prefix("source venv/bin/activate"): c.run("pip install -r requirements.txt"). Config: Connection("host", connect_kwargs={"key_filename": "~/.ssh/id_rsa"}). Password: connect_kwargs={"password": "pw"}. Port: Connection("host", port=2222). Gateway (jump host): Connection("target", gateway=Connection("bastion")). Group serial: from fabric import SerialGroup; g = SerialGroup("h1","h2"); g.run("uptime"). Group threaded: from fabric import ThreadedGroup; ThreadedGroup("h1","h2").run("df -h"). Result: r = c.run("pwd"); r.stdout.strip(). Warn only: c.run("cmd", warn=True). Hide output: c.run("cmd", hide=True). Responder (interactive): from fabric import Responder; c.sudo("cmd", watchers=[Responder(r"password:", "mypass\n")]). Context manager: with Connection("host") as c: c.run("..."). fabfile.py: define tasks as @task def deploy(c): c.run(...). Claude Code generates Fabric deploy scripts, rolling server updates, and multi-host automation.
CLAUDE.md for Fabric
## Fabric Stack
- Version: fabric >= 3.2 | pip install fabric
- Connect: c = Connection("user@host") | Connection("host", user="u", port=22)
- Run: c.run("cmd") | c.sudo("cmd") | c.local("cmd")
- Files: c.put("local", "remote") | c.get("remote", "local")
- Context: with c.cd("/path"): | with c.prefix("source venv/bin/activate"):
- Groups: SerialGroup("h1","h2").run("cmd") | ThreadedGroup for parallel
Fabric Remote Automation Pipeline
# app/deploy.py — Fabric Connection, run/sudo/put/get, deploy tasks, multi-host
from __future__ import annotations
import sys
from contextlib import contextmanager
from pathlib import Path
from typing import Callable
from fabric import Connection, SerialGroup, ThreadedGroup
from invoke import task, Collection
# ─────────────────────────────────────────────────────────────────────────────
# 1. Connection helpers
# ─────────────────────────────────────────────────────────────────────────────
def connect(
host: str,
user: str | None = None,
port: int = 22,
key_file: str | None = None,
password: str | None = None,
gateway: str | None = None,
) -> Connection:
"""
Create a Fabric Connection.
gateway: hostname of SSH bastion/jump host.
Example:
c = connect("prod01.example.com", user="deploy", key_file="~/.ssh/deploy_key")
"""
connect_kwargs: dict = {}
if key_file:
connect_kwargs["key_filename"] = str(Path(key_file).expanduser())
if password:
connect_kwargs["password"] = password
kwargs: dict = {"port": port}
if user:
kwargs["user"] = user
if connect_kwargs:
kwargs["connect_kwargs"] = connect_kwargs
if gateway:
kwargs["gateway"] = Connection(gateway)
return Connection(host, **kwargs)
def run(c: Connection, cmd: str, hide: bool = True, warn: bool = False) -> str:
"""Run a remote command and return stdout as a stripped string."""
result = c.run(cmd, hide=hide, warn=warn)
return result.stdout.strip()
def sudo(c: Connection, cmd: str, hide: bool = True, warn: bool = False) -> str:
"""Run a remote command with sudo and return stdout."""
result = c.sudo(cmd, hide=hide, warn=warn)
return result.stdout.strip()
def local(c: Connection, cmd: str, hide: bool = False) -> str:
"""Run a local command and return stdout."""
result = c.local(cmd, hide=hide)
return result.stdout.strip()
# ─────────────────────────────────────────────────────────────────────────────
# 2. File transfer helpers
# ─────────────────────────────────────────────────────────────────────────────
def upload(c: Connection, local_path: str | Path, remote_path: str) -> None:
"""
Upload a local file to the remote host.
Creates parent directories on the remote if needed.
"""
remote = Path(remote_path)
c.run(f"mkdir -p {remote.parent}", hide=True)
c.put(str(local_path), str(remote_path))
def download(c: Connection, remote_path: str, local_path: str | Path) -> None:
"""Download a remote file to the local machine."""
c.get(str(remote_path), str(local_path))
def upload_dir(
c: Connection,
local_dir: str | Path,
remote_dir: str,
exclude: list[str] | None = None,
) -> int:
"""
Recursively upload a local directory using rsync (requires rsync on both ends).
Returns number of files transferred (from rsync output).
exclude: patterns passed to --exclude (e.g. ["*.pyc", "__pycache__"]).
"""
excludes = ""
for pat in (exclude or []):
excludes += f" --exclude='{pat}'"
c.local(
f"rsync -avz --delete{excludes} {local_dir}/ "
f"{c.user or ''}@{c.host}:{remote_dir}/",
hide=True,
)
return 0 # rsync output parsing omitted for brevity
# ─────────────────────────────────────────────────────────────────────────────
# 3. Deployment tasks
# ─────────────────────────────────────────────────────────────────────────────
def deploy_app(
c: Connection,
repo_dir: str,
branch: str = "main",
requirements: bool = True,
migrate: bool = True,
service: str | None = None,
python: str = "python3",
) -> dict:
"""
Standard Python web app deploy:
1. git pull
2. pip install -r requirements.txt
3. database migrate
4. restart service
Returns dict of step results.
"""
steps: dict = {}
with c.cd(repo_dir):
# 1. Pull latest code
print(f" [{c.host}] git pull origin {branch}")
steps["git_pull"] = run(c, f"git pull origin {branch}")
# 2. Install dependencies
if requirements:
print(f" [{c.host}] pip install")
with c.prefix(f"{python} -m venv .venv && source .venv/bin/activate"):
steps["pip"] = run(c, "pip install -r requirements.txt -q")
# 3. Migrate
if migrate:
print(f" [{c.host}] migrate")
with c.prefix("source .venv/bin/activate"):
steps["migrate"] = run(c, f"{python} manage.py migrate --noinput", warn=True)
# 4. Restart service
if service:
print(f" [{c.host}] restart {service}")
steps["restart"] = sudo(c, f"systemctl restart {service}", warn=True)
return steps
def rolling_deploy(
hosts: list[str],
repo_dir: str,
branch: str = "main",
service: str | None = None,
user: str | None = None,
key_file: str | None = None,
health_check: Callable[[Connection], bool] | None = None,
stop_on_failure: bool = True,
) -> dict[str, dict]:
"""
Deploy to multiple hosts one at a time (rolling update).
Runs health_check after each host; stops on failure if stop_on_failure=True.
Returns {host: steps_dict}.
"""
results: dict[str, dict] = {}
for host in hosts:
print(f"\n=== Deploying to {host} ===")
try:
c = connect(host, user=user, key_file=key_file)
steps = deploy_app(c, repo_dir, branch=branch, service=service)
if health_check and not health_check(c):
steps["health"] = "FAILED"
print(f" [{host}] health check FAILED")
if stop_on_failure:
results[host] = steps
break
else:
steps["health"] = "ok"
results[host] = steps
except Exception as e:
results[host] = {"error": str(e)}
print(f" [{host}] ERROR: {e}")
if stop_on_failure:
break
return results
# ─────────────────────────────────────────────────────────────────────────────
# 4. Multi-host operations
# ─────────────────────────────────────────────────────────────────────────────
def run_on_all(
hosts: list[str],
cmd: str,
parallel: bool = False,
user: str | None = None,
) -> dict[str, str]:
"""
Run a shell command on multiple hosts.
parallel=True: uses ThreadedGroup (concurrent).
Returns {host: stdout}.
"""
host_strs = [f"{user}@{h}" if user else h for h in hosts]
GroupCls = ThreadedGroup if parallel else SerialGroup
group = GroupCls(*host_strs)
results = {}
try:
for conn, result in group.run(cmd, hide=True).items():
results[conn.host] = result.stdout.strip()
except Exception as e:
print(f"Group run error: {e}", file=sys.stderr)
return results
def tail_logs(
c: Connection,
log_path: str = "/var/log/app/app.log",
lines: int = 50,
) -> str:
"""Fetch the last N lines of a remote log file."""
return run(c, f"tail -n {lines} {log_path}", warn=True)
def check_disk(c: Connection, path: str = "/") -> dict[str, str]:
"""Return disk usage for a path on the remote host."""
raw = run(c, f"df -h {path} | tail -1")
parts = raw.split()
if len(parts) >= 5:
return {"filesystem": parts[0], "size": parts[1],
"used": parts[2], "avail": parts[3], "use%": parts[4]}
return {"raw": raw}
# ─────────────────────────────────────────────────────────────────────────────
# 5. Invoke task integration
# ─────────────────────────────────────────────────────────────────────────────
# These tasks can be run with: invoke deploy --host prod01
# or bundled in a fabfile.py:
# from deploy import ns → fab deploy --host prod01
@task
def deploy_task(ctx, host, branch="main", service="myapp", user="deploy"):
"""Deploy app to a single host."""
c = connect(host, user=user)
steps = deploy_app(c, "/var/www/myapp", branch=branch, service=service)
for step, out in steps.items():
print(f" {step}: {str(out)[:80]}")
@task
def status(ctx, host, user="deploy"):
"""Check service and disk status on a host."""
c = connect(host, user=user)
print("Uptime:", run(c, "uptime"))
print("Disk: ", check_disk(c))
ns = Collection("app")
ns.add_task(deploy_task, "deploy")
ns.add_task(status)
# ─────────────────────────────────────────────────────────────────────────────
# Demo (local dry-run — connects to localhost if SSH is set up)
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
# Demonstrate local command execution (no SSH needed)
from invoke import Context
ctx = Context()
print("=== Local commands via invoke ===")
result = ctx.run("echo 'Hello from local'", hide=True)
print(f" local run: {result.stdout.strip()!r}")
result = ctx.run("uname -s", hide=True)
print(f" OS: {result.stdout.strip()}")
print("\n=== SSH usage examples ===")
print(" c = Connection('user@host')")
print(" with c: c.run('uptime')")
print(" c.put('dist.tar.gz', '/tmp/dist.tar.gz')")
print(" with c.cd('/var/www'): c.sudo('systemctl restart app')")
For the paramiko alternative — paramiko provides the low-level SSH2 protocol implementation (SFTP client, channel management, host key verification); Fabric builds on paramiko (and invoke) to provide a higher-level task-running API with cd(), prefix(), put(), and group execution — use paramiko when you need fine-grained protocol control, Fabric when you need readable deployment scripts with context managers and multi-host support. For the ansible alternative — Ansible uses YAML playbooks and an agentless push model for idempotent configuration management at scale; Fabric is a Python library that you call from code — it is better for dynamic deployment logic, conditional branching, and integration with Python CI pipelines where you want to programmatically construct which hosts to update and in what order. The Claude Skills 360 bundle includes Fabric skill sets covering connect() with key_file/gateway/password, run/sudo/local helpers, upload()/download()/upload_dir() rsync, deploy_app() git-pull/pip/migrate/restart, rolling_deploy() with health check, run_on_all() serial/parallel group, tail_logs()/check_disk() monitoring helpers, and invoke @task integration for fabfile.py. Start with the free tier to try SSH deployment automation code generation.