cloudpickle serializes Python functions and closures for distributed computing. pip install cloudpickle. Serialize: import cloudpickle; data = cloudpickle.dumps(lambda x: x*2). Load: fn = cloudpickle.loads(data). Lambda: cloudpickle.dumps(lambda x: x.strip().lower()) — works where pickle fails. Closure: closures capturing outer variables are fully serialized. Class: dynamically-defined classes and instances. cloudpickle.register_pickle_by_value(module) — send module functions by value not reference (needed when workers don’t have the module). Protocol: cloudpickle.dumps(fn, protocol=5). Multiprocessing: patch pool to use cloudpickle for task serialization. concurrent.futures: serialize task with cloudpickle, send bytes, deserialize in worker. Dask: import dask; dask.delayed(lambda x: x**2)(5).compute() — uses cloudpickle internally. Ray remote: @ray.remote functions use cloudpickle. cloudpickle.CloudPickler — Pickler subclass you can extend. Joblib: from joblib import Parallel, delayed; Parallel(n_jobs=4)(delayed(fn)(x) for x in data) — uses cloudpickle. pickle.PicklingError on failure. inspect.getclosurevars(fn) — debug closure variables. Claude Code generates cloudpickle distributed task serializers, Ray/Dask function pipelines, and parallel computing helpers.
CLAUDE.md for cloudpickle
## cloudpickle Stack
- Version: cloudpickle >= 3.0 | pip install cloudpickle
- Serialize: cloudpickle.dumps(fn) / cloudpickle.loads(data)
- Lambda/closure: handles where stdlib pickle raises AttributeError
- Register: cloudpickle.register_pickle_by_value(module) for local modules
- Distributed: Dask/Ray use cloudpickle internally; use directly for custom serialization
cloudpickle Distributed Serialization Pipeline
# app/distribute.py — cloudpickle serialize, distribute, Dask/Ray/futures patterns
from __future__ import annotations
import hashlib
import inspect
import logging
import pickle
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import wraps
from typing import Any, Callable, TypeVar
import cloudpickle
log = logging.getLogger(__name__)
T = TypeVar("T")
# ─────────────────────────────────────────────────────────────────────────────
# 1. Core serialize / deserialize
# ─────────────────────────────────────────────────────────────────────────────
def serialize(
obj: Any,
protocol: int | None = None,
) -> bytes:
"""
Serialize any Python callable, lambda, or closure to bytes.
cloudpickle handles objects that stdlib pickle cannot:
- lambda functions
- closures capturing outer variables
- dynamically-created functions and classes
- partial functions with non-serializable defaults
Example:
transform = lambda s: s.strip().lower().replace(" ", "_")
data = serialize(transform)
fn = deserialize(data)
print(fn(" Hello World ")) # "hello_world"
"""
if protocol is not None:
return cloudpickle.dumps(obj, protocol=protocol)
return cloudpickle.dumps(obj)
def deserialize(data: bytes) -> Any:
"""
Deserialize bytes produced by cloudpickle.dumps() or serialize().
Uses standard pickle.loads (cloudpickle-serialized data is unpicklable by stdlib).
"""
return pickle.loads(data)
def can_serialize(obj: Any) -> bool:
"""
Check whether cloudpickle can serialize an object.
Returns True if serialization succeeds, False otherwise.
Example:
fns = [lambda x: x*2, my_function, my_class_instance]
safe_fns = [f for f in fns if can_serialize(f)]
"""
try:
cloudpickle.dumps(obj)
return True
except Exception:
return False
def serialization_size(obj: Any) -> int | None:
"""Return the byte size of the serialized object, or None if not serializable."""
try:
return len(cloudpickle.dumps(obj))
except Exception:
return None
# ─────────────────────────────────────────────────────────────────────────────
# 2. Task packaging helpers
# ─────────────────────────────────────────────────────────────────────────────
def pack_task(fn: Callable, *args: Any, **kwargs: Any) -> bytes:
"""
Package a function plus its arguments as a single serialized bytes object.
Suitable for sending to a remote worker that deserializes and calls it.
Example:
task_bytes = pack_task(process_file, "/data/input.csv", encoding="utf-8")
result_bytes = worker_execute(task_bytes) # on the remote worker
result = deserialize(result_bytes)
"""
payload = {"fn": fn, "args": args, "kwargs": kwargs}
return cloudpickle.dumps(payload)
def execute_task(task_bytes: bytes) -> Any:
"""
Execute a packed task created by pack_task().
Call this on the worker side.
Example:
def worker(task_bytes):
return execute_task(task_bytes)
"""
payload = pickle.loads(task_bytes)
return payload["fn"](*payload["args"], **payload["kwargs"])
def pack_result(result: Any) -> bytes:
"""Serialize a task result for transmission back to the caller."""
return cloudpickle.dumps(result)
def unpack_result(result_bytes: bytes) -> Any:
"""Deserialize a result packed by pack_result()."""
return pickle.loads(result_bytes)
# ─────────────────────────────────────────────────────────────────────────────
# 3. concurrent.futures integration
# ─────────────────────────────────────────────────────────────────────────────
def _run_packed(task_bytes: bytes) -> bytes:
"""Subprocess worker: deserialize task, execute, return serialized result."""
try:
result = execute_task(task_bytes)
return pack_result(result)
except Exception as e:
return pack_result({"__error__": True, "message": str(e), "type": type(e).__name__})
def parallel_map(
fn: Callable,
items: list,
max_workers: int = 4,
timeout: float | None = None,
ordered: bool = True,
) -> list:
"""
Run fn over items in parallel using ProcessPoolExecutor with cloudpickle.
Supports lambdas and closures that stdlib Pool.map() cannot handle.
ordered=True: preserve input order in output.
ordered=False: yield results as they complete (faster for unequal workloads).
Example:
results = parallel_map(lambda x: x**2, range(1000), max_workers=8)
# With closure:
multiplier = 7
results = parallel_map(lambda x: x * multiplier, range(10), max_workers=4)
"""
task_bytes_list = [pack_task(fn, item) for item in items]
with ProcessPoolExecutor(max_workers=max_workers) as executor:
if ordered:
futures = [executor.submit(_run_packed, tb) for tb in task_bytes_list]
results = []
for fut in futures:
raw = fut.result(timeout=timeout)
result = unpack_result(raw)
if isinstance(result, dict) and result.get("__error__"):
raise RuntimeError(f"Worker error: {result['type']}: {result['message']}")
results.append(result)
return results
else:
futures = {executor.submit(_run_packed, tb): i
for i, tb in enumerate(task_bytes_list)}
results = [None] * len(items)
for fut in as_completed(futures):
idx = futures[fut]
raw = fut.result(timeout=timeout)
result = unpack_result(raw)
if isinstance(result, dict) and result.get("__error__"):
raise RuntimeError(f"Worker error: {result['type']}: {result['message']}")
results[idx] = result
return results
def parallel_starmap(
fn: Callable,
args_list: list[tuple],
max_workers: int = 4,
) -> list:
"""
Like parallel_map() but each item is a tuple of args unpacked into fn.
Example:
results = parallel_starmap(lambda a, b: a + b, [(1,2), (3,4), (5,6)])
# [3, 7, 11]
"""
task_bytes_list = [pack_task(fn, *args) for args in args_list]
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(_run_packed, tb) for tb in task_bytes_list]
results = []
for fut in futures:
raw = fut.result()
result = unpack_result(raw)
if isinstance(result, dict) and result.get("__error__"):
raise RuntimeError(f"Worker error: {result['type']}: {result['message']}")
results.append(result)
return results
# ─────────────────────────────────────────────────────────────────────────────
# 4. Function registry (serialize-once, reuse)
# ─────────────────────────────────────────────────────────────────────────────
class FunctionRegistry:
"""
Registry that serializes functions once and reuses bytes for many tasks.
Useful when sending the same function to many workers.
Usage:
registry = FunctionRegistry()
registry.register("transform", lambda s: s.strip().lower())
bytes_payload = registry.task_bytes("transform", " Hello ")
result = execute_task(bytes_payload) # on worker
"""
def __init__(self) -> None:
self._fns: dict[str, bytes] = {}
def register(self, name: str, fn: Callable) -> None:
"""Serialize and register a function by name."""
self._fns[name] = cloudpickle.dumps(fn)
def task_bytes(self, name: str, *args: Any, **kwargs: Any) -> bytes:
"""Return packed task bytes for a registered function."""
if name not in self._fns:
raise KeyError(f"Function '{name}' not registered")
fn = pickle.loads(self._fns[name])
return pack_task(fn, *args, **kwargs)
def names(self) -> list[str]:
return list(self._fns.keys())
def size(self, name: str) -> int:
return len(self._fns[name])
# ─────────────────────────────────────────────────────────────────────────────
# 5. Dask / Ray patterns (code examples)
# ─────────────────────────────────────────────────────────────────────────────
DASK_EXAMPLE = '''
# Dask uses cloudpickle internally — lambdas and closures work naturally
import dask
from dask.distributed import Client
client = Client() # local cluster
# Lambda in delayed:
results = dask.compute(*[dask.delayed(lambda x: x**2)(i) for i in range(10)])
# Closure in delayed:
threshold = 50
filter_fn = lambda x: x if x > threshold else None
futures = client.map(filter_fn, range(100))
filtered = [r for r in client.gather(futures) if r is not None]
# register_pickle_by_value for local modules (when workers can't import them):
import cloudpickle, mymodule
cloudpickle.register_pickle_by_value(mymodule)
# Now mymodule functions are serialized by value, not import reference
'''
RAY_EXAMPLE = '''
# Ray uses cloudpickle for @remote functions — lambdas work in tasks
import ray
ray.init()
@ray.remote
def process(data):
return sorted(data, key=lambda x: -x)
# Closures in tasks:
multiplier = 3.5
@ray.remote
def scale(values):
return [v * multiplier for v in values]
refs = [scale.remote([1, 2, 3]), scale.remote([4, 5, 6])]
print(ray.get(refs)) # [[3.5, 7.0, 10.5], [14.0, 17.5, 21.0]]
'''
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== Lambda serialization ===")
fn = lambda x: x ** 2 + 1
data = serialize(fn)
fn2 = deserialize(data)
print(f" fn(5) = {fn(5)}, deserialized fn(5) = {fn2(5)}")
print(f" Serialized size: {len(data)} bytes")
print("\n=== Closure serialization ===")
base_rate = 1.08
apply_tax = lambda price: round(price * base_rate, 2)
data2 = serialize(apply_tax)
fn3 = deserialize(data2)
print(f" apply_tax(100) = {apply_tax(100)}, deserialized = {fn3(100)}")
print("\n=== pack_task / execute_task ===")
task_bytes = pack_task(lambda a, b: a * b + 1, 6, 7)
result = execute_task(task_bytes)
print(f" 6 * 7 + 1 = {result}")
print("\n=== FunctionRegistry ===")
reg = FunctionRegistry()
reg.register("normalize", lambda s: s.strip().lower().replace(" ", "_"))
reg.register("square", lambda x: x ** 2)
for name in reg.names():
print(f" '{name}': {reg.size(name)} bytes")
result2 = execute_task(reg.task_bytes("normalize", " Hello World "))
print(f" normalize(' Hello World ') = '{result2}'")
print("\n=== parallel_map (lambda) ===")
start = time.perf_counter()
results = parallel_map(lambda x: x ** 2, list(range(20)), max_workers=4)
elapsed_ms = (time.perf_counter() - start) * 1000
print(f" Mapped 20 items in {elapsed_ms:.1f}ms: {results[:5]}...")
print("\n=== can_serialize checks ===")
items = [
lambda x: x,
lambda: None,
[1, 2, 3],
{"key": "val"},
sum, # builtin
]
for item in items:
name = getattr(item, "__name__", repr(item)[:25])
print(f" {name:25s}: {can_serialize(item)}")
For the dill alternative — dill and cloudpickle both serialize lambdas and closures; dill’s scope is broader (session dumps, generators, stack frames, tracebacks) while cloudpickle focuses specifically on functions and classes for distributed execution (Dask, Ray, Spark) — cloudpickle has tighter integration with distributed frameworks and is the default serializer in Dask/distributed, Ray, and Apache Spark PySpark. For the pickle (stdlib) alternative — stdlib pickle serializes instances of imported classes and module-level functions by reference (recording only the module and name, not the code); cloudpickle serializes the function’s bytecode, defaults, and closure variables so it can be reconstructed on a worker that may not have the same codebase — essential for dynamic functions, interactive notebooks, and task queues that send logic alongside data. The Claude Skills 360 bundle includes cloudpickle skill sets covering serialize()/deserialize() with protocol control, can_serialize()/serialization_size() safety checks, pack_task()/execute_task() for worker dispatch, pack_result()/unpack_result() for round-trips, parallel_map() lambda-safe ProcessPoolExecutor, parallel_starmap() for multi-arg tasks, FunctionRegistry serialize-once pattern, and Dask/Ray cloudpickle integration examples. Start with the free tier to try distributed function serialization code generation.