Zarr stores N-dimensional arrays as chunked, compressed data. pip install zarr. import zarr. Array: z = zarr.zeros((10000, 10000), chunks=(1000, 1000), dtype="float32"). Open: z = zarr.open("data.zarr", mode="r+"). Create: zarr.open("arr.zarr", mode="w", shape=(1000, 200), chunks=(100, 200), dtype="float32", compressor=zarr.Blosc(cname="zstd", clevel=3)). Slice: data = z[0:100, :]. Write: z[0:100, :] = np.random.rand(100, 200). Group: root = zarr.open_group("store.zarr", mode="w"), root.create_dataset("train/X", shape=..., chunks=...), root["train/X"][:] = X. Attrs: z.attrs["units"] = "K", dict(z.attrs). Copy: zarr.copy(src, dst_group, name="arr"). S3: import s3fs; store = s3fs.S3Map("s3://bucket/path.zarr"), z = zarr.open(store, mode="r"). Consolidate: zarr.consolidate_metadata("store.zarr"), z = zarr.open_consolidated("store.zarr"). Compressors: zarr.Blosc(cname="lz4", clevel=1, shuffle=zarr.Blosc.BITSHUFFLE) — fast. zarr.GZip(level=6). zarr.Zstd(level=3). Info: z.info. z.nchunks, z.nchunks_initialized. Nested: zarr.NestedDirectoryStore("path"). Zip: zarr.ZipStore("data.zip") — read-only portable. Fill: zarr.open_array(..., fill_value=-9999). Rechunk: use rechunker package. xarray integration: xr.open_zarr("store.zarr"). Claude Code generates Zarr chunked array stores, cloud dataset pipelines, and HDF5-alternative hierarchical data structures.
CLAUDE.md for Zarr
## Zarr Stack
- Version: zarr >= 2.18 (zarr v3 API available as zarr >= 3.0)
- Array: zarr.open/zeros/ones/empty with shape, chunks, dtype, compressor
- Chunks: tune to I/O pattern — (time, lat, lon): (100, lat, lon) for time slices
- Compressor: Blosc(cname="lz4", clevel=1) fast read | Blosc("zstd",3) small size
- Groups: zarr.open_group → group.create_dataset / group[key]
- Cloud: s3fs.S3Map | gcsfs.GCSMap → zarr.open(store)
- Consolidate: zarr.consolidate_metadata before sharing to cloud
Zarr Chunked Array Storage Pipeline
# data/zarr_pipeline.py — chunked N-D array storage with Zarr
from __future__ import annotations
from pathlib import Path
from typing import Any
import numpy as np
import zarr
from zarr.codecs import BloscCodec # zarr v3 API; fall back to zarr.Blosc for v2
# ── Compatibility shim (zarr v2 vs v3) ───────────────────────────────────────
def make_blosc(cname: str = "lz4", clevel: int = 3, shuffle: int = 1):
"""Return a Blosc compressor compatible with zarr v2 or v3."""
try:
return zarr.Blosc(cname=cname, clevel=clevel, shuffle=shuffle)
except AttributeError:
from zarr.codecs import BloscCodec
return BloscCodec(cname=cname, clevel=clevel, shuffle=shuffle)
# ── 0. Array creation helpers ────────────────────────────────────────────────
def create_array(
store_path: str,
shape: tuple,
chunks: tuple,
dtype: str = "float32",
compressor = None,
fill_value: float | int = 0,
overwrite: bool = False,
attrs: dict = None,
) -> zarr.Array:
"""
Create a Zarr array with sensible defaults.
chunks should match the access pattern — use time-chunked for time-series queries.
"""
compressor = compressor or make_blosc("lz4", clevel=1)
z = zarr.open_array(
store_path,
mode="w" if overwrite else "w-",
shape=shape,
chunks=chunks,
dtype=dtype,
compressor=compressor,
fill_value=fill_value,
)
if attrs:
z.attrs.update(attrs)
return z
def create_group_store(
store_path: str,
overwrite: bool = False,
) -> zarr.Group:
"""Create a Zarr group (directory store) for hierarchical data."""
return zarr.open_group(store_path, mode="w" if overwrite else "a")
# ── 1. Climate dataset builder ────────────────────────────────────────────────
def build_climate_store(
store_path: str,
n_times: int,
n_lat: int,
n_lon: int,
chunk_time: int = 100,
overwrite: bool = True,
) -> zarr.Group:
"""
Create a hierarchical Zarr store for climate data.
layout:
/coords/time (n_times,)
/coords/lat (n_lat,)
/coords/lon (n_lon,)
/vars/temperature (n_times, n_lat, n_lon)
/vars/precipitation (n_times, n_lat, n_lon)
"""
root = zarr.open_group(store_path, mode="w" if overwrite else "a")
coords = root.require_group("coords")
coords.create_dataset("time", shape=(n_times,), chunks=(n_times,), dtype="int64",
attrs={"units": "days since 1900-01-01", "calendar": "proleptic_gregorian"})
coords.create_dataset("lat", shape=(n_lat,), chunks=(n_lat,), dtype="float64",
attrs={"units": "degrees_north"})
coords.create_dataset("lon", shape=(n_lon,), chunks=(n_lon,), dtype="float64",
attrs={"units": "degrees_east"})
fast_c = make_blosc("lz4", clevel=1, shuffle=1)
small_c = make_blosc("zstd", clevel=3, shuffle=1)
chunk_3d = (chunk_time, n_lat, n_lon)
variables = root.require_group("vars")
variables.create_dataset(
"temperature",
shape=(n_times, n_lat, n_lon),
chunks=chunk_3d,
dtype="float32",
compressor=fast_c,
fill_value=np.nan,
attrs={"units": "K", "long_name": "Air temperature at 2m", "_FillValue": "NaN"},
)
variables.create_dataset(
"precipitation",
shape=(n_times, n_lat, n_lon),
chunks=chunk_3d,
dtype="float32",
compressor=small_c,
fill_value=np.nan,
attrs={"units": "mm/day", "long_name": "Daily precipitation"},
)
root.attrs["created_by"] = "zarr_pipeline.py"
root.attrs["Conventions"] = "CF-1.8"
return root
def write_climate_data(
root: zarr.Group,
times: np.ndarray,
lats: np.ndarray,
lons: np.ndarray,
temp: np.ndarray,
precip: np.ndarray,
chunk_size: int = 100,
) -> None:
"""Write data to the climate store in time-chunked slices."""
root["coords/time"][:] = times
root["coords/lat"][:] = lats
root["coords/lon"][:] = lons
n_times = len(times)
for t_start in range(0, n_times, chunk_size):
t_end = min(t_start + chunk_size, n_times)
root["vars/temperature"][t_start:t_end] = temp[t_start:t_end]
root["vars/precipitation"][t_start:t_end] = precip[t_start:t_end]
# ── 2. ML dataset store ───────────────────────────────────────────────────────
def build_ml_dataset_store(
store_path: str,
n_train: int,
n_val: int,
n_test: int,
feature_dim: int,
overwrite: bool = True,
) -> zarr.Group:
"""
Zarr store for a large ML dataset (image features, labels, splits).
Suitable for out-of-core dataloader without loading all to RAM.
"""
root = zarr.open_group(store_path, mode="w" if overwrite else "a")
compressor = make_blosc("lz4", clevel=1)
for split, n in [("train", n_train), ("val", n_val), ("test", n_test)]:
grp = root.require_group(split)
grp.create_dataset("X", shape=(n, feature_dim), chunks=(512, feature_dim),
dtype="float32", compressor=compressor)
grp.create_dataset("y", shape=(n,), chunks=(512,), dtype="int32",
compressor=make_blosc("lz4", clevel=1))
grp.attrs["n_samples"] = n
root.attrs["feature_dim"] = feature_dim
root.attrs["splits"] = ["train", "val", "test"]
return root
# ── 3. Cloud-compatible store ─────────────────────────────────────────────────
def open_s3_store(
bucket: str,
path: str,
mode: str = "r",
anon: bool = False,
region: str = "us-east-1",
) -> zarr.Group:
"""
Open a Zarr store on S3.
Requires: pip install s3fs
Returns a consolidated group for fast metadata reads.
"""
import s3fs
fs = s3fs.S3FileSystem(anon=anon, client_kwargs={"region_name": region})
store = s3fs.S3Map(root=f"s3://{bucket}/{path}", s3=fs, check=mode != "w")
if mode == "r":
try:
return zarr.open_consolidated(store, mode="r")
except KeyError:
return zarr.open_group(store, mode="r")
return zarr.open_group(store, mode=mode)
def consolidate_store(store_path: str) -> None:
"""
Consolidate metadata into a single .zmetadata file.
Run this before uploading to cloud — reduces metadata reads from O(N) to O(1).
"""
zarr.consolidate_metadata(store_path)
print(f"Consolidated metadata: {store_path}/.zmetadata")
# ── 4. Copy and rechunk ───────────────────────────────────────────────────────
def rechunk_array(
source_path: str,
dest_path: str,
new_chunks: dict, # {"time": 365, "lat": 36, "lon": 72}
overwrite: bool = False,
) -> None:
"""
Copy a Zarr array with different chunk layout.
For large arrays, use the `rechunker` package instead:
from rechunker import rechunk
plan = rechunk(source, target_chunks, max_mem="4GB", target_store=dest)
plan.execute()
"""
src = zarr.open_group(source_path, mode="r")
dst = zarr.open_group(dest_path, mode="w" if overwrite else "w-")
for name in src:
arr = src[name]
chunks = tuple(new_chunks.get(d, s) for d, s in zip(
arr.attrs.get("_dims", [f"dim{i}" for i in range(arr.ndim)]),
arr.shape,
))
zarr.copy(arr, dst, name=name, chunks=chunks, if_exists="replace")
print(f"Rechunked {source_path} → {dest_path}")
# ── 5. Out-of-core ML loader ──────────────────────────────────────────────────
class ZarrDataset:
"""
Minimal PyTorch-compatible Zarr dataset for out-of-core ML.
Reads data chunk by chunk — compatible with DataLoader(num_workers>0).
"""
def __init__(
self,
store_path: str,
split: str = "train",
transform = None,
):
root = zarr.open_group(store_path, mode="r")
grp = root[split]
self.X = grp["X"]
self.y = grp["y"]
self.tfm = transform
def __len__(self) -> int:
return self.X.shape[0]
def __getitem__(self, idx: int):
x = self.X[idx] # reads one chunk from disk
y = int(self.y[idx])
x = np.array(x, dtype=np.float32)
if self.tfm:
x = self.tfm(x)
return x, y
# ── Demo ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import tempfile, os
print("Zarr Chunked Array Storage Demo")
print("=" * 50)
with tempfile.TemporaryDirectory() as tmp:
# ── Climate dataset store
store_p = os.path.join(tmp, "climate.zarr")
N_T, N_LAT, N_LON = 365, 18, 36
root = build_climate_store(store_p, N_T, N_LAT, N_LON, chunk_time=100)
times = np.arange(N_T, dtype=np.int64)
lats = np.linspace(-85, 85, N_LAT)
lons = np.linspace(-175, 175, N_LON)
temp = (283.15 + np.random.randn(N_T, N_LAT, N_LON)).astype("float32")
precip = np.abs(np.random.randn(N_T, N_LAT, N_LON)).astype("float32")
write_climate_data(root, times, lats, lons, temp, precip)
print(f"\nClimate store: {root.tree()}")
print(f"temperature shape: {root['vars/temperature'].shape}")
print(f"chunks: {root['vars/temperature'].chunks}")
print(f"compressor: {root['vars/temperature'].compressor}")
nbytes_stored = root["vars/temperature"].nbytes_stored
nbytes_raw = root["vars/temperature"].nbytes
ratio = nbytes_raw / max(nbytes_stored, 1)
print(f"compression ratio: {ratio:.1f}x")
# Partial read (no full load)
time_slice = root["vars/temperature"][0:30, :, :]
print(f"\nPartial read [0:30, :, :] → {time_slice.shape}")
# Consolidate metadata
consolidate_store(store_p)
# ── ML dataset
ml_p = os.path.join(tmp, "ml_dataset.zarr")
ml_root = build_ml_dataset_store(ml_p, n_train=1000, n_val=200,
n_test=200, feature_dim=128)
ml_root["train/X"][:] = np.random.randn(1000, 128).astype("float32")
ml_root["train/y"][:] = np.random.randint(0, 10, 1000).astype("int32")
ds = ZarrDataset(ml_p, split="train")
x, y = ds[0]
print(f"\nZarrDataset[0]: x.shape={x.shape}, y={y}, len={len(ds)}")
For the NumPy memmap alternative — np.memmap supports only 2D arrays with a single dtype and fixed chunks while Zarr handles arbitrary N-D arrays with per-array chunk tuning, mixed dtypes within a group, and transparent compression ratios of 5–20× for scientific float32 data, and zarr.open_consolidated(s3_store) reads all metadata from a single file rather than thousands of .zarray HTTP requests that make cold opening an HDF5 file on S3 take minutes. For the HDF5 / h5py alternative — HDF5 files require a single writer process (SWMR for concurrent reads) and can corrupt on crash while Zarr’s chunk-level atomic writes allow multiple processes to write non-overlapping chunks concurrently, fsspec and s3fs mount S3/GCS/Azure directly as Zarr stores without downloading files, and zarr.consolidate_metadata reduces cloud metadata overhead from one HTTP request per chunk to a single .zmetadata read, making remote dataset opening 100× faster. The Claude Skills 360 bundle includes Zarr skill sets covering array creation with chunks/dtype/compressor, Blosc LZ4/Zstd compression, open_group hierarchical stores, chunked write loops, S3Map cloud store, consolidate_metadata, zarr.copy rechunking, out-of-core ZarrDataset pattern, and xarray interop. Start with the free tier to try chunked array storage code generation.