boto3 is the official AWS SDK for Python. pip install boto3. import boto3. Client (low-level): s3 = boto3.client("s3", region_name="us-east-1"). Resource (ORM-like): s3 = boto3.resource("s3"). Session with profile: session = boto3.Session(profile_name="prod"). S3 upload: s3.upload_file("local.csv", "my-bucket", "prefix/data.csv"). S3 download: s3.download_file("bucket", "key", "local.csv"). S3 put object: s3.put_object(Bucket="bucket", Key="key", Body=b"data", ContentType="text/plain"). S3 get object: obj = s3.get_object(Bucket="bucket", Key="key"), data = obj["Body"].read(). List S3: paginator = s3.get_paginator("list_objects_v2"), pages = paginator.paginate(Bucket="bucket", Prefix="prefix/"). Presigned URL: url = s3.generate_presigned_url("get_object", Params={"Bucket":"b","Key":"k"}, ExpiresIn=3600). DynamoDB put: table = dynamodb.Table("users"), table.put_item(Item={"pk": "user#123", "sk": "profile", "name": "Alice"}). DynamoDB get: resp = table.get_item(Key={"pk":"user#123","sk":"profile"}), item = resp["Item"]. DynamoDB query: table.query(KeyConditionExpression=Key("pk").eq("user#123")). Lambda invoke: resp = lambda_.invoke(FunctionName="my-fn", Payload=json.dumps({"key":"val"})). SQS send: sqs.send_message(QueueUrl=url, MessageBody=json.dumps(event)). SSM secret: ssm.get_parameter(Name="/prod/db/password", WithDecryption=True)["Parameter"]["Value"]. Error: except ClientError as e: code = e.response["Error"]["Code"]. Bedrock: bedrock.invoke_model(modelId="anthropic.claude-3-5-sonnet-20241022-v2:0", body=json.dumps({...})). Claude Code generates boto3 S3 pipelines, DynamoDB data access layers, Lambda triggers, and SSM secret managers.
CLAUDE.md for boto3
## boto3 Stack
- Version: boto3 >= 1.34
- Client: boto3.client("service", region_name="us-east-1") — low-level API
- Resource: boto3.resource("s3") — higher-level ORM-like interface
- Session: boto3.Session(profile_name, region_name) for multi-account
- S3: upload_file/download_file | put_object/get_object | paginator for listing
- DynamoDB: resource.Table(name) → put_item/get_item/query/scan/batch_writer
- Errors: from botocore.exceptions import ClientError — check Error.Code
- Credentials: env vars AWS_ACCESS_KEY_ID/SECRET | ~/.aws/credentials | IAM role
boto3 AWS SDK Pipeline
# cloud/boto3_pipeline.py — AWS SDK operations with boto3
from __future__ import annotations
import json
import os
from io import BytesIO
from pathlib import Path
from typing import Any, Generator, Optional
import boto3
from botocore.exceptions import ClientError
from boto3.s3.transfer import TransferConfig
# ── 0. Session factory ────────────────────────────────────────────────────────
def make_session(
profile: str = None,
region: str = "us-east-1",
role_arn: str = None,
) -> boto3.Session:
"""
Create a boto3 Session, optionally assuming an IAM role.
Uses credential chain: env vars → ~/.aws/credentials → instance role.
"""
session = boto3.Session(
profile_name=profile,
region_name=region,
)
if role_arn:
sts = session.client("sts")
resp = sts.assume_role(
RoleArn=role_arn,
RoleSessionName="boto3-pipeline-session",
)
creds = resp["Credentials"]
session = boto3.Session(
aws_access_key_id=creds["AccessKeyId"],
aws_secret_access_key=creds["SecretAccessKey"],
aws_session_token=creds["SessionToken"],
region_name=region,
)
return session
# ── 1. S3 operations ──────────────────────────────────────────────────────────
class S3Client:
"""
High-level S3 wrapper with multipart upload, pagination, and presigned URLs.
"""
def __init__(self, bucket: str, session: boto3.Session = None):
self.bucket = bucket
self._s3 = (session or boto3.Session()).client("s3")
self._xfer = TransferConfig(
multipart_threshold=8 * 1024 * 1024, # 8 MB
multipart_chunksize=8 * 1024 * 1024,
max_concurrency=10,
use_threads=True,
)
def upload(self, local_path: str, key: str, extra: dict = None) -> str:
"""Upload a local file to S3. Returns the S3 URI."""
try:
self._s3.upload_file(
local_path, self.bucket, key,
ExtraArgs=extra or {},
Config=self._xfer,
)
return f"s3://{self.bucket}/{key}"
except ClientError as e:
raise RuntimeError(f"S3 upload failed: {e.response['Error']['Message']}") from e
def upload_bytes(self, data: bytes, key: str, content_type: str = "application/octet-stream") -> str:
"""Upload bytes directly to S3 without a local file."""
self._s3.put_object(Bucket=self.bucket, Key=key,
Body=data, ContentType=content_type)
return f"s3://{self.bucket}/{key}"
def download(self, key: str, local_path: str) -> None:
"""Download S3 object to a local file."""
Path(local_path).parent.mkdir(parents=True, exist_ok=True)
self._s3.download_file(self.bucket, key, local_path, Config=self._xfer)
def download_bytes(self, key: str) -> bytes:
"""Download S3 object to bytes in memory."""
obj = self._s3.get_object(Bucket=self.bucket, Key=key)
return obj["Body"].read()
def list_keys(self, prefix: str = "", max_keys: int = None) -> list[str]:
"""List all S3 keys under prefix using pagination."""
paginator = self._s3.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=self.bucket, Prefix=prefix)
keys = []
for page in pages:
for obj in page.get("Contents", []):
keys.append(obj["Key"])
if max_keys and len(keys) >= max_keys:
return keys
return keys
def exists(self, key: str) -> bool:
"""Check if an S3 key exists."""
try:
self._s3.head_object(Bucket=self.bucket, Key=key)
return True
except ClientError as e:
if e.response["Error"]["Code"] == "404":
return False
raise
def delete(self, key: str) -> None:
"""Delete a single S3 object."""
self._s3.delete_object(Bucket=self.bucket, Key=key)
def batch_delete(self, keys: list[str]) -> int:
"""Delete up to 1000 keys in one API call. Returns count deleted."""
if not keys:
return 0
objects = [{"Key": k} for k in keys[:1000]]
resp = self._s3.delete_objects(Bucket=self.bucket, Delete={"Objects": objects})
return len(resp.get("Deleted", []))
def presigned_url(self, key: str, expires: int = 3600, method: str = "get_object") -> str:
"""Generate a presigned URL for GET or PUT access."""
return self._s3.generate_presigned_url(
method, Params={"Bucket": self.bucket, "Key": key}, ExpiresIn=expires
)
def copy(self, src_key: str, dst_key: str, dst_bucket: str = None) -> None:
"""Copy an S3 object within or across buckets."""
self._s3.copy_object(
CopySource={"Bucket": self.bucket, "Key": src_key},
Bucket=dst_bucket or self.bucket,
Key=dst_key,
)
# ── 2. DynamoDB operations ────────────────────────────────────────────────────
class DynamoDBTable:
"""
DynamoDB Table wrapper with single-item and batch operations.
"""
def __init__(self, table_name: str, session: boto3.Session = None):
dynamo = (session or boto3.Session()).resource("dynamodb")
self.table = dynamo.Table(table_name)
def put(self, item: dict, condition: str = None) -> None:
"""Put a single item. item must include all key attributes."""
kwargs = {"Item": item}
if condition:
kwargs["ConditionExpression"] = condition
try:
self.table.put_item(**kwargs)
except ClientError as e:
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
raise ValueError(f"Condition failed: {condition}") from e
raise
def get(self, key: dict) -> dict | None:
"""Get item by primary key. Returns None if not found."""
resp = self.table.get_item(Key=key)
return resp.get("Item")
def update(
self,
key: dict,
updates: dict, # {"field": new_value, ...}
remove: list[str] = None, # fields to remove
) -> dict:
"""
Update specific attributes of an item.
Returns the updated item attributes.
"""
from boto3.dynamodb.conditions import Attr
set_parts = []
remove_parts = []
expr_names = {}
expr_values = {}
for i, (field, value) in enumerate(updates.items()):
alias = f"#f{i}"
vkey = f":v{i}"
expr_names[alias] = field
expr_values[vkey] = value
set_parts.append(f"{alias} = {vkey}")
update_expr = "SET " + ", ".join(set_parts) if set_parts else ""
if remove:
for i, field in enumerate(remove):
alias = f"#r{i}"
expr_names[alias] = field
remove_parts.append(alias)
update_expr += (" REMOVE " if update_expr else "REMOVE ") + ", ".join(remove_parts)
resp = self.table.update_item(
Key=key,
UpdateExpression=update_expr,
ExpressionAttributeNames=expr_names,
ExpressionAttributeValues=expr_values,
ReturnValues="ALL_NEW",
)
return resp.get("Attributes", {})
def delete(self, key: dict) -> None:
"""Delete item by primary key."""
self.table.delete_item(Key=key)
def query_pk(
self,
pk_name: str,
pk_value: Any,
sk_name: str = None,
sk_prefix: str = None,
limit: int = None,
ascending: bool = True,
) -> list[dict]:
"""Query items by partition key with optional sort key prefix."""
from boto3.dynamodb.conditions import Key
key_cond = Key(pk_name).eq(pk_value)
if sk_name and sk_prefix:
key_cond = key_cond & Key(sk_name).begins_with(sk_prefix)
kwargs: dict = {"KeyConditionExpression": key_cond, "ScanIndexForward": ascending}
if limit:
kwargs["Limit"] = limit
items = []
while True:
resp = self.table.query(**kwargs)
items.extend(resp.get("Items", []))
last_key = resp.get("LastEvaluatedKey")
if not last_key or (limit and len(items) >= limit):
break
kwargs["ExclusiveStartKey"] = last_key
return items
def batch_write(self, items: list[dict]) -> int:
"""Batch write up to 25 items at a time. Returns count written."""
written = 0
with self.table.batch_writer() as bw:
for item in items:
bw.put_item(Item=item)
written += 1
return written
# ── 3. SSM Parameter Store ───────────────────────────────────────────────────
def get_secret(name: str, region: str = "us-east-1") -> str:
"""Retrieve a SecureString from SSM Parameter Store."""
ssm = boto3.client("ssm", region_name=region)
try:
return ssm.get_parameter(Name=name, WithDecryption=True)["Parameter"]["Value"]
except ClientError as e:
raise RuntimeError(f"SSM get failed [{name}]: {e.response['Error']['Code']}") from e
def get_secrets_batch(names: list[str], region: str = "us-east-1") -> dict[str, str]:
"""Retrieve multiple SSM parameters in batches of 10."""
ssm = boto3.client("ssm", region_name=region)
result = {}
for i in range(0, len(names), 10):
batch = names[i:i+10]
resp = ssm.get_parameters(Names=batch, WithDecryption=True)
for param in resp["Parameters"]:
result[param["Name"]] = param["Value"]
return result
# ── 4. SQS ────────────────────────────────────────────────────────────────────
class SQSQueue:
"""Simple SQS producer/consumer wrapper."""
def __init__(self, queue_url: str, session: boto3.Session = None):
self.url = queue_url
self._sqs = (session or boto3.Session()).client("sqs")
def send(self, payload: dict, delay: int = 0, group_id: str = None) -> str:
"""Send a message. Returns MessageId."""
kwargs: dict = {
"QueueUrl": self.url,
"MessageBody": json.dumps(payload),
"DelaySeconds": delay,
}
if group_id: # FIFO queue
kwargs["MessageGroupId"] = group_id
resp = self._sqs.send_message(**kwargs)
return resp["MessageId"]
def receive(self, max_messages: int = 10, wait_seconds: int = 20) -> list[dict]:
"""Long-poll for messages. Returns list of message dicts with Body and ReceiptHandle."""
resp = self._sqs.receive_message(
QueueUrl=self.url,
MaxNumberOfMessages=min(max_messages, 10),
WaitTimeSeconds=wait_seconds,
AttributeNames=["All"],
)
return resp.get("Messages", [])
def delete(self, receipt_handle: str) -> None:
"""Acknowledge and delete a processed message."""
self._sqs.delete_message(QueueUrl=self.url, ReceiptHandle=receipt_handle)
def process_messages(
self,
handler_fn,
max_messages: int = 10,
wait_seconds: int = 20,
) -> int:
"""
Poll, process, and delete messages in a loop.
handler_fn receives the parsed JSON body.
Returns count processed.
"""
messages = self.receive(max_messages, wait_seconds)
count = 0
for msg in messages:
try:
body = json.loads(msg["Body"])
handler_fn(body)
self.delete(msg["ReceiptHandle"])
count += 1
except Exception as e:
print(f"Message processing error: {e}")
return count
# ── 5. Lambda invoke ──────────────────────────────────────────────────────────
def invoke_lambda(
function_name: str,
payload: dict,
invocation_type: str = "RequestResponse", # "RequestResponse" | "Event"
region: str = "us-east-1",
) -> dict | None:
"""
Invoke a Lambda function synchronously or asynchronously.
Returns parsed response payload (sync only).
"""
lam = boto3.client("lambda", region_name=region)
resp = lam.invoke(
FunctionName=function_name,
InvocationType=invocation_type,
Payload=json.dumps(payload),
)
if invocation_type == "Event":
return None
raw = resp["Payload"].read()
return json.loads(raw)
# ── Demo ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("boto3 AWS SDK Demo")
print("=" * 50)
print("\nThis demo requires AWS credentials.")
print("Set AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION")
print("or use 'aws configure' to set up ~/.aws/credentials\n")
print("S3 example:")
print(" s3 = S3Client('my-bucket')")
print(" uri = s3.upload('data.csv', 'path/data.csv')")
print(" keys = s3.list_keys(prefix='path/')")
print(" url = s3.presigned_url('path/data.csv', expires=3600)")
print("\nDynamoDB example:")
print(" db = DynamoDBTable('users')")
print(" db.put({'pk': 'user#1', 'sk': 'profile', 'name': 'Alice'})")
print(" item = db.get({'pk': 'user#1', 'sk': 'profile'})")
print(" items = db.query_pk('pk', 'user#1', 'sk', 'profile')")
print("\nSSM secrets:")
print(" db_pass = get_secret('/prod/db/password')")
print("\nSQS queue:")
print(" q = SQSQueue('https://sqs.us-east-1.amazonaws.com/123/my-queue')")
print(" q.send({'event': 'user_signup', 'user_id': '42'})")
print(" count = q.process_messages(handler_fn=process_event)")
For the requests + direct API alternative — the AWS API requires SigV4 signature computation for every request while boto3 handles credential rotation, region endpoint discovery, request signing, and retry logic with jitter automatically, and TransferConfig multipart upload with parallel chunks uploads a 10 GB file in minutes that a single requests.put would timeout on. For the AWS CLI alternative for scripting — the AWS CLI requires subprocess calls and JSON parsing while boto3’s paginator.paginate() handles API pagination transparently, the DynamoDBTable.batch_writer() context manager buffers and retries unprocessed items automatically, and SSM.get_parameter(WithDecryption=True) retrieves encrypted secrets without shell variable exposure. The Claude Skills 360 bundle includes boto3 skill sets covering Session with assume_role, S3 upload/download with multipart, list with paginator, presigned URLs, batch delete, DynamoDB put/get/query/update/batch_write, SSM parameter batch retrieval, SQS send/receive/process loop, and Lambda synchronous and async invoke. Start with the free tier to try AWS SDK code generation.