more-itertools extends Python’s itertools with 200+ recipes and new utilities. pip install more-itertools. Chunked: from more_itertools import chunked; list(chunked([1,2,3,4,5], 2)) → [[1,2],[3,4],[5]]. Batched: batched(range(7), 3) → groups of 3 (Python 3.12+ also in itertools). Sliced: sliced([1..10], 3). Windowed: windowed(seq, 3) — sliding window. Pairwise: pairwise([1,2,3]) → (1,2),(2,3). First: first([3,4,5]) → 3. Last: last([1,2,3]) → 3. One: one([42]) → 42. Only: only([42]) → 42. Flatten: more_itertools.flatten([[1,2],[3]]). Collapse: collapse([[1,[2,3]],4]). Roundrobin: roundrobin([1,2],[3],[4,5,6]). Interleave: more_itertools.interleave([1,2],[3,4]). Peekable: p = peekable([1,2,3]); p.peek(). Seekable: s = seekable(range(5)); s.seek(0). Bucket: b = bucket(range(10), key=even); list(b[True]). Groupby_transform: groupby_transform(seq, key, value). Unique_everseen: unique_everseen([1,1,2,2,3]). Unique_justseen: unique_justseen([1,1,2,3,3]) — run-length dedup. Partition: odd, even = partition(lambda x: x%2, range(10)). Divide: list(divide(3, range(9))). Zip_equal: zip_equal([1,2],[3,4]) — raises on length mismatch. Mark_ends: mark_ends([1,2,3]) → (True,False,1),(False,False,2),(False,True,3). Spy: head, it = spy(range(100), 3). Prepend: prepend(0, range(1,5)). take, tail, strip_func, before_and_after. Claude Code generates more-itertools sequence processors, sliding window analytics, and iterator pipelines.
CLAUDE.md for more-itertools
## more-itertools Stack
- Version: more-itertools >= 10.0 | pip install more-itertools
- Chunks: chunked(seq, n) | batched(seq, n) | divide(n, seq) | sliced(seq, n)
- Windows: windowed(seq, n) | pairwise(seq) | triplewise(seq)
- Dedup: unique_everseen(seq) | unique_justseen(seq) — global vs run-length
- Extract: first/last/one/only | nth(seq, n) | nth_or_last
- Advanced: peekable(seq) | seekable(seq) | bucket(seq, key) | spy(seq, n)
- Merge: roundrobin | interleave_longest | zip_equal | mark_ends | prepend
more-itertools Sequence Pipeline
# app/iter_utils.py — more-itertools chunking, windows, dedup, extract, and merge
from __future__ import annotations
from typing import Any, Callable, Iterable, Iterator, TypeVar
import more_itertools as mit
T = TypeVar("T")
# ─────────────────────────────────────────────────────────────────────────────
# 1. Chunking and splitting
# ─────────────────────────────────────────────────────────────────────────────
def chunks(seq: Iterable[T], size: int) -> list[list[T]]:
"""Split seq into lists of at most size elements. Last chunk may be shorter."""
return [list(c) for c in mit.chunked(seq, size)]
def equal_chunks(seq: Iterable[T], n: int) -> list[list[T]]:
"""
Split seq into exactly n chunks of (nearly) equal size.
Uses more_itertools.divide — all chunks have the same length or differ by 1.
"""
return [list(c) for c in mit.divide(n, seq)]
def sliced_list(seq: list[T], size: int) -> list[list[T]]:
"""Slice a concrete list into slices of size. Avoids copying."""
return [list(seq[i:i+size]) for i in range(0, len(seq), size)]
def batch_iter(seq: Iterable[T], size: int) -> Iterator[tuple[T, ...]]:
"""Yield tuples of up to size items."""
return mit.batched(seq, size)
# ─────────────────────────────────────────────────────────────────────────────
# 2. Sliding windows
# ─────────────────────────────────────────────────────────────────────────────
def windows(seq: Iterable[T], n: int, fillvalue: T | None = None) -> list[tuple]:
"""
Sliding windows of width n.
fillvalue pads shorter windows at the tail (default None pads too).
"""
return list(mit.windowed(seq, n, fillvalue=fillvalue))
def pairs(seq: Iterable[T]) -> list[tuple[T, T]]:
"""Adjacent pairs: (seq[0],seq[1]), (seq[1],seq[2]), ..."""
return list(mit.pairwise(seq))
def triples(seq: Iterable[T]) -> list[tuple[T, T, T]]:
"""Adjacent triples."""
return list(mit.triplewise(seq))
def moving_average(seq: Iterable[float], n: int) -> list[float]:
"""Compute simple moving average with window size n."""
ws = windows(seq, n)
return [sum(w) / n for w in ws if None not in w]
def compute_diffs(seq: Iterable[float]) -> list[float]:
"""Return first differences: seq[i+1] - seq[i]."""
return [b - a for a, b in pairs(seq)]
def detect_changes(seq: Iterable[T]) -> list[tuple[int, T, T]]:
"""Return list of (index, from_value, to_value) wherever seq changes."""
return [
(i + 1, a, b)
for i, (a, b) in enumerate(pairs(seq))
if a != b
]
# ─────────────────────────────────────────────────────────────────────────────
# 3. Deduplication
# ─────────────────────────────────────────────────────────────────────────────
def unique(seq: Iterable[T], key: Callable | None = None) -> list[T]:
"""Remove all duplicate values, preserving first-seen order."""
return list(mit.unique_everseen(seq, key=key))
def unique_consecutive(seq: Iterable[T], key: Callable | None = None) -> list[T]:
"""
Remove consecutive duplicate values (run-length dedup).
[1,1,2,3,3,3,2] → [1,2,3,2]
"""
return list(mit.unique_justseen(seq, key=key))
def run_lengths(seq: Iterable[T]) -> list[tuple[T, int]]:
"""
Compute (value, count) run-length encoding.
[1,1,2,3,3] → [(1,2),(2,1),(3,2)]
"""
return [(val, sum(1 for _ in grp))
for val, grp in mit.run_length.encode(seq)]
# ─────────────────────────────────────────────────────────────────────────────
# 4. Extraction helpers
# ─────────────────────────────────────────────────────────────────────────────
def first(seq: Iterable[T], default: T | None = None) -> T | None:
"""Return first item or default."""
return mit.first(seq, default=default)
def last(seq: Iterable[T], default: T | None = None) -> T | None:
"""Return last item or default."""
return mit.last(seq, default=default)
def nth(seq: Iterable[T], n: int, default: T | None = None) -> T | None:
"""Return the nth item (0-indexed) or default."""
return mit.nth(seq, n, default=default)
def one(seq: Iterable[T], too_short=None, too_long=None) -> T:
"""
Return the single item in seq.
Raises ValueError (or custom exception) if seq has != 1 item.
"""
return mit.one(seq, too_short=too_short, too_long=too_long)
def only(seq: Iterable[T], default: T | None = None) -> T | None:
"""Return the single item if seq has exactly one, else default."""
return mit.only(seq, default=default)
def head_tail(seq: Iterable[T], n: int = 1) -> tuple[list[T], Iterator[T]]:
"""
Peek at the first n items without consuming the iterator.
Returns (head_list, full_iterator_from_start).
"""
return mit.spy(seq, n)
# ─────────────────────────────────────────────────────────────────────────────
# 5. Partitioning and classification
# ─────────────────────────────────────────────────────────────────────────────
def split(seq: Iterable[T], predicate: Callable[[T], bool]) -> tuple[list[T], list[T]]:
"""
Partition seq into (matches, non-matches) based on predicate.
Returns (true_items, false_items).
"""
no_match, match = mit.partition(predicate, seq)
return list(match), list(no_match)
def classify(seq: Iterable[T], key: Callable[[T], Any]) -> dict[Any, list[T]]:
"""
Classify items into groups by key function.
Unlike itertools.groupby, does not require sorted input.
"""
b = mit.bucket(seq, key=key)
all_keys = set(key(x) for x in seq) # noqa — bucket exhausts seq
# bucket is lazy; re-iterate for keys
results: dict[Any, list[T]] = {}
for k in all_keys:
results[k] = list(b[k])
return results
def before_and_after_split(
seq: Iterable[T], predicate: Callable[[T], bool]
) -> tuple[list[T], list[T]]:
"""
Split seq into before-first-True and from-first-True.
"""
before, after = mit.before_and_after(predicate, seq)
return list(before), list(after)
def mark(seq: Iterable[T]) -> list[tuple[bool, bool, T]]:
"""
Annotate each item with (is_first, is_last).
Useful for rendering templates: add separator except after last item.
"""
return [(first, last, val) for first, last, val in mit.mark_ends(seq)]
# ─────────────────────────────────────────────────────────────────────────────
# 6. Merging iterables
# ─────────────────────────────────────────────────────────────────────────────
def roundrobin(*seqs: Iterable[T]) -> list[T]:
"""Interleave seqs round-robin until all are exhausted."""
return list(mit.roundrobin(*seqs))
def interleave(*seqs: Iterable[T], fillvalue: T | None = None) -> list[T | None]:
"""Interleave seqs, padding shorter ones with fillvalue."""
return list(mit.interleave_longest(*seqs, fillvalue=fillvalue))
def safe_zip(*seqs: Iterable[T]) -> list[tuple[T, ...]]:
"""
Zip sequences, raising ValueError if lengths differ.
Use instead of zip() when lengths should always match.
"""
return list(mit.zip_equal(*seqs))
def prepend_item(item: T, seq: Iterable[T]) -> list[T]:
"""Return list with item prepended."""
return list(mit.prepend(item, seq))
def flatten_once(seq: Iterable[Iterable[T]]) -> list[T]:
"""Flatten one level of nesting."""
return list(mit.flatten(seq))
def flatten_deep(seq: Iterable) -> list:
"""Recursively flatten all levels of nesting."""
return list(mit.collapse(seq))
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== chunks ===")
print(chunks(range(10), 3))
print("\n=== equal_chunks ===")
print(equal_chunks(range(10), 3))
print("\n=== sliding windows + moving avg ===")
prices = [10, 12, 11, 14, 13, 15, 16]
avg3 = moving_average(prices, 3)
print("prices:", prices)
print("3-avg: ", [round(v, 2) for v in avg3])
print("\n=== pairs + diffs ===")
vals = [100, 103, 98, 105, 110]
print("diffs:", compute_diffs(vals))
print("\n=== detect_changes ===")
statuses = ["ok","ok","fail","fail","ok","ok","fail"]
print("changes:", detect_changes(statuses))
print("\n=== unique + unique_consecutive ===")
data = [1, 1, 2, 3, 2, 1, 2, 2]
print("unique: ", unique(data))
print("unique_consec:", unique_consecutive(data))
print("\n=== run_lengths ===")
print(run_lengths("aaabbbccddddee"))
print("\n=== extraction ===")
items = [10, 20, 30, 40]
print("first:", first(items), "| last:", last(items))
print("nth(2):", nth(items, 2))
print("one([42]):", one([42]))
print("\n=== split + mark ===")
nums = list(range(10))
evens, odds = split(nums, lambda x: x % 2 == 0)
print("evens:", evens, " odds:", odds)
for is_first, is_last, val in mark(["a", "b", "c"]):
sep = "" if is_last else ", "
print(f" (first={is_first}, last={is_last}) {val!r}{sep}", end="")
print()
print("\n=== roundrobin + safe_zip ===")
print("roundrobin:", roundrobin([1,4], [2,5], [3,6,7]))
print("safe_zip: ", safe_zip([1,2,3], [4,5,6]))
print("\n=== flatten ===")
nested = [[1,2],[3,[4,5]],[[6,[7]]]]
print("flatten_once:", flatten_once([[1,2],[3,4],[5]]))
print("flatten_deep:", flatten_deep(nested))
print("\n=== head_tail spy ===")
gen = (x*x for x in range(1, 6))
head, full = head_tail(gen, 2)
print("peek:", head, "| full:", list(full))
For the itertools stdlib alternative — itertools has ~20 functions covering the basics (chain, islice, groupby, product); more-itertools has 200+ recipes and fills gaps like chunked, windowed, pairwise, unique_everseen, roundrobin, mark_ends, and zip_equal that you’d otherwise need to write yourself. For the toolz alternative — toolz focuses on functional programming patterns (pipe, compose, curry, merge_with) and dict/groupby operations; more-itertools focuses on iterator manipulation (windows, chunking, run-lengths, head/tail inspection) — they’re complementary; use toolz for functional composition and more-itertools for sequence manipulation. The Claude Skills 360 bundle includes more-itertools skill sets covering chunks/batch/divide/sliced, windows/pairs/triples/moving_average, compute_diffs/detect_changes, unique/unique_consecutive/run_lengths, first/last/nth/one/only/head_tail extraction, split/classify/before_and_after_split/mark, roundrobin/interleave/safe_zip/prepend_item, flatten_once/flatten_deep, and full demo with analytics examples. Start with the free tier to try iterator pipeline code generation.