Python’s graphlib module (Python 3.9+) provides TopologicalSorter — a class for computing a topological ordering of a directed acyclic graph (DAG). from graphlib import TopologicalSorter, CycleError. Build: ts = TopologicalSorter({"C": ["A", "B"], "B": ["A"]}) — dict maps node → set of predecessors (nodes it depends on). Simple use: list(TopologicalSorter(graph).static_order()) → nodes in valid topological order. Parallel use: ts.prepare() — lock graph and find first ready nodes; ts.get_ready() → tuple of nodes with all dependencies satisfied; ts.done(*nodes) — mark nodes complete and unlock their dependents; ts.is_active() → bool — True while work remains. Error: CycleError (subclass of ValueError) raised on prepare() if the graph has a cycle; e.args[1] contains the cycle node list. Extend after build: ts.add(node, *predecessors) — add more nodes before calling prepare(). Claude Code generates build systems, task schedulers, dependency resolvers, parallel pipeline executors, and import order analyzers.
CLAUDE.md for graphlib
## graphlib Stack
- Stdlib: from graphlib import TopologicalSorter, CycleError
- Simple: list(TopologicalSorter({"B":["A"],"C":["A","B"]}).static_order())
- Parallel: ts = TopologicalSorter(graph); ts.prepare()
- while ts.is_active():
- for node in ts.get_ready(): process(node); ts.done(node)
- Cycle: try: ts.prepare() except CycleError as e: print(e.args[1])
- Note: Python 3.9+; predecessors = dependencies (edges point FROM dep TO dependant)
graphlib Topological Sort Pipeline
# app/graphutil.py — sorter, cycle check, parallel runner, dep analysis, layer builder
from __future__ import annotations
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from graphlib import CycleError, TopologicalSorter
from typing import Any, Callable
# ─────────────────────────────────────────────────────────────────────────────
# 1. Core ordering helpers
# ─────────────────────────────────────────────────────────────────────────────
Graph = dict[str, "set[str] | list[str]"]
def topo_sort(graph: Graph) -> list[str]:
"""
Return nodes in topological order (dependencies before dependants).
Raises CycleError if the graph has a cycle.
Example:
order = topo_sort({"build": ["compile"], "compile": ["fetch"], "fetch": []})
print(order) # ['fetch', 'compile', 'build']
"""
return list(TopologicalSorter(graph).static_order())
def has_cycle(graph: Graph) -> bool:
"""
Return True if the graph contains a cycle.
Example:
has_cycle({"A": ["B"], "B": ["C"], "C": ["A"]}) # True
has_cycle({"A": [], "B": ["A"]}) # False
"""
try:
TopologicalSorter(graph).prepare()
return False
except CycleError:
return True
def find_cycle(graph: Graph) -> "list[str] | None":
"""
Return the cycle node list if a cycle exists, else None.
The cycle list comes from CycleError.args[1].
Example:
find_cycle({"A": ["B"], "B": ["C"], "C": ["A"]})
# ['A', 'B', 'C'] (or similar rotation)
"""
try:
TopologicalSorter(graph).prepare()
return None
except CycleError as e:
return list(e.args[1]) if len(e.args) > 1 else None
def topo_layers(graph: Graph) -> list[list[str]]:
"""
Return nodes grouped into parallel layers: all nodes in a layer have
their dependencies satisfied by earlier layers.
Example:
layers = topo_layers({"C": ["A","B"], "D": ["B"], "A": [], "B": []})
# [['A','B'], ['C','D']] (order within each layer is alphabetical)
"""
ts = TopologicalSorter(graph)
ts.prepare()
layers: list[list[str]] = []
while ts.is_active():
ready = sorted(ts.get_ready())
if not ready:
break
layers.append(ready)
ts.done(*ready)
return layers
# ─────────────────────────────────────────────────────────────────────────────
# 2. Graph analysis
# ─────────────────────────────────────────────────────────────────────────────
def transitive_deps(
node: str,
graph: Graph,
_seen: "set[str] | None" = None,
) -> set[str]:
"""
Return all transitive predecessors (direct and indirect) of node.
Example:
g = {"C": {"B"}, "B": {"A"}, "A": set()}
print(transitive_deps("C", g)) # {'A', 'B'}
"""
seen = _seen or set()
for dep in graph.get(node, []):
if dep not in seen:
seen.add(dep)
transitive_deps(dep, graph, seen)
return seen
def direct_dependants(node: str, graph: Graph) -> set[str]:
"""
Return nodes that directly list node as a predecessor.
Example:
g = {"B": {"A"}, "C": {"A", "B"}, "A": set()}
print(direct_dependants("A", g)) # {'B', 'C'}
"""
return {n for n, deps in graph.items() if node in deps}
def root_nodes(graph: Graph) -> set[str]:
"""
Return nodes with no predecessors (i.e. nothing they depend on).
Example:
root_nodes({"B": {"A"}, "C": {"B"}, "A": set()}) # {'A'}
"""
return {n for n, deps in graph.items() if not deps}
def leaf_nodes(graph: Graph) -> set[str]:
"""
Return nodes that nothing else depends on (no outgoing edges).
Example:
leaf_nodes({"B": {"A"}, "C": {"B"}, "A": set()}) # {'C'}
"""
all_deps: set[str] = {d for deps in graph.values() for d in deps}
return {n for n in graph if n not in all_deps}
def ensure_all_nodes(graph: Graph) -> Graph:
"""
Return a copy of graph where every referenced predecessor is also a key.
Prevents KeyError when iterating over a graph with implicit root nodes.
Example:
g = ensure_all_nodes({"B": {"A"}, "C": {"B"}})
# {"B": {"A"}, "C": {"B"}, "A": set()}
"""
full: Graph = {n: set(deps) for n, deps in graph.items()} # type: ignore[assignment]
for deps in list(full.values()):
for d in deps:
if d not in full:
full[d] = set()
return full
# ─────────────────────────────────────────────────────────────────────────────
# 3. Parallel task runner
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class TaskResult:
"""Result of running one task in the parallel runner."""
node: str
ok: bool
value: Any = None
error: str = ""
duration: float = 0.0
def run_parallel(
graph: Graph,
task_fn: Callable[[str], Any],
max_workers: int = 4,
stop_on_error: bool = True,
) -> dict[str, TaskResult]:
"""
Run task_fn(node) for every node, respecting the dependency order.
Nodes in the same layer run in parallel using a thread pool.
Returns {node: TaskResult}.
Example:
def build(name: str):
time.sleep(0.05)
return f"built-{name}"
results = run_parallel(
{"link": {"compile"}, "compile": {"fetch"}, "fetch": set()},
build,
)
for name, r in results.items():
print(f" {name}: ok={r.ok} value={r.value!r}")
"""
graph = ensure_all_nodes(graph)
ts = TopologicalSorter(graph)
ts.prepare()
results: dict[str, TaskResult] = {}
failed: set[str] = set()
aborted = False
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures: dict[Any, str] = {}
def submit_ready() -> None:
for node in ts.get_ready():
if aborted:
ts.done(node)
results[node] = TaskResult(node=node, ok=False, error="aborted")
continue
# Skip if any dependency failed (propagate failure)
if any(d in failed for d in graph.get(node, [])):
ts.done(node)
results[node] = TaskResult(node=node, ok=False, error="dependency failed")
failed.add(node)
continue
fut = executor.submit(_run_one, task_fn, node)
futures[fut] = node
submit_ready()
while futures:
done_fut = next(as_completed(futures))
node = futures.pop(done_fut)
result = done_fut.result()
results[node] = result
if not result.ok:
failed.add(node)
if stop_on_error:
aborted = True
ts.done(node)
submit_ready()
return results
def _run_one(task_fn: Callable[[str], Any], node: str) -> TaskResult:
t0 = time.monotonic()
try:
val = task_fn(node)
return TaskResult(node=node, ok=True, value=val, duration=time.monotonic() - t0)
except Exception as e:
return TaskResult(node=node, ok=False, error=str(e), duration=time.monotonic() - t0)
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== graphlib demo ===")
# ── topo_sort ─────────────────────────────────────────────────────────────
print("\n--- topo_sort ---")
g: Graph = {
"install_deps": set(),
"fetch_source": set(),
"compile": {"fetch_source", "install_deps"},
"test": {"compile"},
"package": {"test"},
"deploy": {"package"},
}
order = topo_sort(g)
print(f" order: {order}")
# ── topo_layers ──────────────────────────────────────────────────────────
print("\n--- topo_layers ---")
layers = topo_layers(g)
for i, layer in enumerate(layers):
print(f" layer {i}: {layer}")
# ── has_cycle / find_cycle ────────────────────────────────────────────────
print("\n--- cycle detection ---")
cyclic: Graph = {"A": {"B"}, "B": {"C"}, "C": {"A"}}
print(f" has_cycle(acyclic): {has_cycle(g)}")
print(f" has_cycle(cyclic): {has_cycle(cyclic)}")
print(f" find_cycle(cyclic): {find_cycle(cyclic)}")
# ── transitive_deps / dependants / roots / leaves ────────────────────────
print("\n--- graph analysis ---")
print(f" transitive_deps('deploy'): {sorted(transitive_deps('deploy', g))}")
print(f" direct_dependants('compile'): {sorted(direct_dependants('compile', g))}")
print(f" root_nodes: {sorted(root_nodes(g))}")
print(f" leaf_nodes: {sorted(leaf_nodes(g))}")
# ── run_parallel ─────────────────────────────────────────────────────────
print("\n--- run_parallel ---")
def mock_task(name: str) -> str:
if name == "compile":
time.sleep(0.05)
return f"done-{name}"
results = run_parallel(g, mock_task, max_workers=4)
for node in order:
r = results[node]
mark = "✓" if r.ok else "✗"
print(f" {mark} {node:20s} {r.value!r} ({r.duration*1000:.1f}ms)")
print("\n=== done ===")
For the networkx (PyPI) alternative — nx.DiGraph provides full graph algorithms (shortest path, connected components, centrality, drawing) alongside topological sort via nx.topological_sort(G) — use networkx when you need graph-theoretic analysis beyond ordering (pathfinding, cycle enumeration, visualization); use graphlib when you only need topological ordering with zero dependencies and the parallel-ready batch API. For the manual Kahn’s algorithm alternative — collections.deque + in-degree counting is the classic textbook approach for topological sort — use graphlib.TopologicalSorter instead of hand-rolling Kahn’s algorithm; the stdlib implementation handles edge cases, provides the prepare()/get_ready()/done() parallel API, and raises CycleError with the offending cycle nodes. The Claude Skills 360 bundle includes graphlib skill sets covering topo_sort()/topo_layers() ordering helpers, has_cycle()/find_cycle() cycle detection, transitive_deps()/direct_dependants()/root_nodes()/leaf_nodes()/ensure_all_nodes() graph analysis, and TaskResult/run_parallel() thread-pool parallel task runner with dependency propagation. Start with the free tier to try topological sorting patterns and graphlib pipeline code generation.