Claude Code for cloudpickle: Serialize Python Functions for Distributed Computing — Claude Skills 360 Blog
Blog / AI / Claude Code for cloudpickle: Serialize Python Functions for Distributed Computing
AI

Claude Code for cloudpickle: Serialize Python Functions for Distributed Computing

Published: May 22, 2028
Read time: 5 min read
By: Claude Skills 360

cloudpickle serializes Python functions and closures for distributed computing. pip install cloudpickle. Serialize: import cloudpickle; data = cloudpickle.dumps(lambda x: x*2). Load: fn = cloudpickle.loads(data). Lambda: cloudpickle.dumps(lambda x: x.strip().lower()) — works where pickle fails. Closure: closures capturing outer variables are fully serialized. Class: dynamically-defined classes and instances. cloudpickle.register_pickle_by_value(module) — send module functions by value not reference (needed when workers don’t have the module). Protocol: cloudpickle.dumps(fn, protocol=5). Multiprocessing: patch pool to use cloudpickle for task serialization. concurrent.futures: serialize task with cloudpickle, send bytes, deserialize in worker. Dask: import dask; dask.delayed(lambda x: x**2)(5).compute() — uses cloudpickle internally. Ray remote: @ray.remote functions use cloudpickle. cloudpickle.CloudPickler — Pickler subclass you can extend. Joblib: from joblib import Parallel, delayed; Parallel(n_jobs=4)(delayed(fn)(x) for x in data) — uses cloudpickle. pickle.PicklingError on failure. inspect.getclosurevars(fn) — debug closure variables. Claude Code generates cloudpickle distributed task serializers, Ray/Dask function pipelines, and parallel computing helpers.

CLAUDE.md for cloudpickle

## cloudpickle Stack
- Version: cloudpickle >= 3.0 | pip install cloudpickle
- Serialize: cloudpickle.dumps(fn) / cloudpickle.loads(data)
- Lambda/closure: handles where stdlib pickle raises AttributeError
- Register: cloudpickle.register_pickle_by_value(module) for local modules
- Distributed: Dask/Ray use cloudpickle internally; use directly for custom serialization

cloudpickle Distributed Serialization Pipeline

# app/distribute.py — cloudpickle serialize, distribute, Dask/Ray/futures patterns
from __future__ import annotations

import hashlib
import inspect
import logging
import pickle
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import wraps
from typing import Any, Callable, TypeVar

import cloudpickle


log = logging.getLogger(__name__)
T = TypeVar("T")


# ─────────────────────────────────────────────────────────────────────────────
# 1. Core serialize / deserialize
# ─────────────────────────────────────────────────────────────────────────────

def serialize(
    obj: Any,
    protocol: int | None = None,
) -> bytes:
    """
    Serialize any Python callable, lambda, or closure to bytes.
    cloudpickle handles objects that stdlib pickle cannot:
    - lambda functions
    - closures capturing outer variables
    - dynamically-created functions and classes
    - partial functions with non-serializable defaults

    Example:
        transform = lambda s: s.strip().lower().replace(" ", "_")
        data = serialize(transform)
        fn   = deserialize(data)
        print(fn("  Hello World  "))  # "hello_world"
    """
    if protocol is not None:
        return cloudpickle.dumps(obj, protocol=protocol)
    return cloudpickle.dumps(obj)


def deserialize(data: bytes) -> Any:
    """
    Deserialize bytes produced by cloudpickle.dumps() or serialize().
    Uses standard pickle.loads (cloudpickle-serialized data is unpicklable by stdlib).
    """
    return pickle.loads(data)


def can_serialize(obj: Any) -> bool:
    """
    Check whether cloudpickle can serialize an object.
    Returns True if serialization succeeds, False otherwise.

    Example:
        fns = [lambda x: x*2, my_function, my_class_instance]
        safe_fns = [f for f in fns if can_serialize(f)]
    """
    try:
        cloudpickle.dumps(obj)
        return True
    except Exception:
        return False


def serialization_size(obj: Any) -> int | None:
    """Return the byte size of the serialized object, or None if not serializable."""
    try:
        return len(cloudpickle.dumps(obj))
    except Exception:
        return None


# ─────────────────────────────────────────────────────────────────────────────
# 2. Task packaging helpers
# ─────────────────────────────────────────────────────────────────────────────

def pack_task(fn: Callable, *args: Any, **kwargs: Any) -> bytes:
    """
    Package a function plus its arguments as a single serialized bytes object.
    Suitable for sending to a remote worker that deserializes and calls it.

    Example:
        task_bytes = pack_task(process_file, "/data/input.csv", encoding="utf-8")
        result_bytes = worker_execute(task_bytes)  # on the remote worker
        result = deserialize(result_bytes)
    """
    payload = {"fn": fn, "args": args, "kwargs": kwargs}
    return cloudpickle.dumps(payload)


def execute_task(task_bytes: bytes) -> Any:
    """
    Execute a packed task created by pack_task().
    Call this on the worker side.

    Example:
        def worker(task_bytes):
            return execute_task(task_bytes)
    """
    payload = pickle.loads(task_bytes)
    return payload["fn"](*payload["args"], **payload["kwargs"])


def pack_result(result: Any) -> bytes:
    """Serialize a task result for transmission back to the caller."""
    return cloudpickle.dumps(result)


def unpack_result(result_bytes: bytes) -> Any:
    """Deserialize a result packed by pack_result()."""
    return pickle.loads(result_bytes)


# ─────────────────────────────────────────────────────────────────────────────
# 3. concurrent.futures integration
# ─────────────────────────────────────────────────────────────────────────────

def _run_packed(task_bytes: bytes) -> bytes:
    """Subprocess worker: deserialize task, execute, return serialized result."""
    try:
        result = execute_task(task_bytes)
        return pack_result(result)
    except Exception as e:
        return pack_result({"__error__": True, "message": str(e), "type": type(e).__name__})


def parallel_map(
    fn: Callable,
    items: list,
    max_workers: int = 4,
    timeout: float | None = None,
    ordered: bool = True,
) -> list:
    """
    Run fn over items in parallel using ProcessPoolExecutor with cloudpickle.
    Supports lambdas and closures that stdlib Pool.map() cannot handle.

    ordered=True: preserve input order in output.
    ordered=False: yield results as they complete (faster for unequal workloads).

    Example:
        results = parallel_map(lambda x: x**2, range(1000), max_workers=8)

        # With closure:
        multiplier = 7
        results = parallel_map(lambda x: x * multiplier, range(10), max_workers=4)
    """
    task_bytes_list = [pack_task(fn, item) for item in items]

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        if ordered:
            futures = [executor.submit(_run_packed, tb) for tb in task_bytes_list]
            results = []
            for fut in futures:
                raw = fut.result(timeout=timeout)
                result = unpack_result(raw)
                if isinstance(result, dict) and result.get("__error__"):
                    raise RuntimeError(f"Worker error: {result['type']}: {result['message']}")
                results.append(result)
            return results
        else:
            futures = {executor.submit(_run_packed, tb): i
                       for i, tb in enumerate(task_bytes_list)}
            results = [None] * len(items)
            for fut in as_completed(futures):
                idx = futures[fut]
                raw = fut.result(timeout=timeout)
                result = unpack_result(raw)
                if isinstance(result, dict) and result.get("__error__"):
                    raise RuntimeError(f"Worker error: {result['type']}: {result['message']}")
                results[idx] = result
            return results


def parallel_starmap(
    fn: Callable,
    args_list: list[tuple],
    max_workers: int = 4,
) -> list:
    """
    Like parallel_map() but each item is a tuple of args unpacked into fn.

    Example:
        results = parallel_starmap(lambda a, b: a + b, [(1,2), (3,4), (5,6)])
        # [3, 7, 11]
    """
    task_bytes_list = [pack_task(fn, *args) for args in args_list]
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(_run_packed, tb) for tb in task_bytes_list]
        results = []
        for fut in futures:
            raw = fut.result()
            result = unpack_result(raw)
            if isinstance(result, dict) and result.get("__error__"):
                raise RuntimeError(f"Worker error: {result['type']}: {result['message']}")
            results.append(result)
    return results


# ─────────────────────────────────────────────────────────────────────────────
# 4. Function registry (serialize-once, reuse)
# ─────────────────────────────────────────────────────────────────────────────

class FunctionRegistry:
    """
    Registry that serializes functions once and reuses bytes for many tasks.
    Useful when sending the same function to many workers.

    Usage:
        registry = FunctionRegistry()
        registry.register("transform", lambda s: s.strip().lower())
        bytes_payload = registry.task_bytes("transform", "  Hello  ")
        result = execute_task(bytes_payload)   # on worker
    """

    def __init__(self) -> None:
        self._fns: dict[str, bytes] = {}

    def register(self, name: str, fn: Callable) -> None:
        """Serialize and register a function by name."""
        self._fns[name] = cloudpickle.dumps(fn)

    def task_bytes(self, name: str, *args: Any, **kwargs: Any) -> bytes:
        """Return packed task bytes for a registered function."""
        if name not in self._fns:
            raise KeyError(f"Function '{name}' not registered")
        fn = pickle.loads(self._fns[name])
        return pack_task(fn, *args, **kwargs)

    def names(self) -> list[str]:
        return list(self._fns.keys())

    def size(self, name: str) -> int:
        return len(self._fns[name])


# ─────────────────────────────────────────────────────────────────────────────
# 5. Dask / Ray patterns (code examples)
# ─────────────────────────────────────────────────────────────────────────────

DASK_EXAMPLE = '''
# Dask uses cloudpickle internally — lambdas and closures work naturally
import dask
from dask.distributed import Client

client = Client()  # local cluster

# Lambda in delayed:
results = dask.compute(*[dask.delayed(lambda x: x**2)(i) for i in range(10)])

# Closure in delayed:
threshold = 50
filter_fn = lambda x: x if x > threshold else None
futures   = client.map(filter_fn, range(100))
filtered  = [r for r in client.gather(futures) if r is not None]

# register_pickle_by_value for local modules (when workers can't import them):
import cloudpickle, mymodule
cloudpickle.register_pickle_by_value(mymodule)
# Now mymodule functions are serialized by value, not import reference
'''

RAY_EXAMPLE = '''
# Ray uses cloudpickle for @remote functions — lambdas work in tasks
import ray
ray.init()

@ray.remote
def process(data):
    return sorted(data, key=lambda x: -x)

# Closures in tasks:
multiplier = 3.5
@ray.remote
def scale(values):
    return [v * multiplier for v in values]

refs = [scale.remote([1, 2, 3]), scale.remote([4, 5, 6])]
print(ray.get(refs))  # [[3.5, 7.0, 10.5], [14.0, 17.5, 21.0]]
'''


# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    print("=== Lambda serialization ===")
    fn = lambda x: x ** 2 + 1
    data = serialize(fn)
    fn2  = deserialize(data)
    print(f"  fn(5) = {fn(5)}, deserialized fn(5) = {fn2(5)}")
    print(f"  Serialized size: {len(data)} bytes")

    print("\n=== Closure serialization ===")
    base_rate = 1.08
    apply_tax = lambda price: round(price * base_rate, 2)
    data2 = serialize(apply_tax)
    fn3   = deserialize(data2)
    print(f"  apply_tax(100) = {apply_tax(100)}, deserialized = {fn3(100)}")

    print("\n=== pack_task / execute_task ===")
    task_bytes = pack_task(lambda a, b: a * b + 1, 6, 7)
    result = execute_task(task_bytes)
    print(f"  6 * 7 + 1 = {result}")

    print("\n=== FunctionRegistry ===")
    reg = FunctionRegistry()
    reg.register("normalize", lambda s: s.strip().lower().replace(" ", "_"))
    reg.register("square",    lambda x: x ** 2)
    for name in reg.names():
        print(f"  '{name}': {reg.size(name)} bytes")
    result2 = execute_task(reg.task_bytes("normalize", "  Hello World  "))
    print(f"  normalize('  Hello World  ') = '{result2}'")

    print("\n=== parallel_map (lambda) ===")
    start = time.perf_counter()
    results = parallel_map(lambda x: x ** 2, list(range(20)), max_workers=4)
    elapsed_ms = (time.perf_counter() - start) * 1000
    print(f"  Mapped 20 items in {elapsed_ms:.1f}ms: {results[:5]}...")

    print("\n=== can_serialize checks ===")
    items = [
        lambda x: x,
        lambda: None,
        [1, 2, 3],
        {"key": "val"},
        sum,  # builtin
    ]
    for item in items:
        name = getattr(item, "__name__", repr(item)[:25])
        print(f"  {name:25s}: {can_serialize(item)}")

For the dill alternative — dill and cloudpickle both serialize lambdas and closures; dill’s scope is broader (session dumps, generators, stack frames, tracebacks) while cloudpickle focuses specifically on functions and classes for distributed execution (Dask, Ray, Spark) — cloudpickle has tighter integration with distributed frameworks and is the default serializer in Dask/distributed, Ray, and Apache Spark PySpark. For the pickle (stdlib) alternative — stdlib pickle serializes instances of imported classes and module-level functions by reference (recording only the module and name, not the code); cloudpickle serializes the function’s bytecode, defaults, and closure variables so it can be reconstructed on a worker that may not have the same codebase — essential for dynamic functions, interactive notebooks, and task queues that send logic alongside data. The Claude Skills 360 bundle includes cloudpickle skill sets covering serialize()/deserialize() with protocol control, can_serialize()/serialization_size() safety checks, pack_task()/execute_task() for worker dispatch, pack_result()/unpack_result() for round-trips, parallel_map() lambda-safe ProcessPoolExecutor, parallel_starmap() for multi-arg tasks, FunctionRegistry serialize-once pattern, and Dask/Ray cloudpickle integration examples. Start with the free tier to try distributed function serialization code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free