construct is a declarative binary parser/builder for Python. pip install construct. Struct: from construct import Struct, Int8ub, Int16ub; Header = Struct("magic"/Int16ub, "version"/Int8ub). Parse: obj = Header.parse(b"\x00\x01\x02"). Build: Header.build({"magic": 1, "version": 2}). sizeof: Header.sizeof() → 3. Integer types: Int8ub Int8sb Int16ub Int16sb Int32ub Int32sb Int64ub Int64sb (u=unsigned, s=signed, b=big, l=little). Float: Float32b Float64b. Byte: Byte = Int8ub. Bytes: Bytes(4) — fixed length. GreedyBytes: rest of stream. String: PascalString(Int8ub, "utf8") — length-prefixed. CString: null-terminated. Array: Array(3, Int16ub) → list of 3 ints. GreedyRange: until end. Sequence: Sequence(Int8ub, Int16ub). Switch: Switch(this.type, {0: Format0, 1: Format1}). If: If(this.has_data, Bytes(4)). Computed: Computed(lambda ctx: ctx.length * 2). Rebuild: Rebuild(field, lambda ctx: len(ctx.data)). Enum: Enum(Byte, OK=0, ERR=1). BitStruct: BitStruct("flags"/FlagsEnum(BitsSwapped(Byte), ...)). Flag: Flag — 1-bit boolean. Checksum: from construct.lib import Checksum. Pass: Pass — skip field. Padding: Padding(2). Default: Default(Int8ub, 0). FocusedSeq: FocusedSeq("key", ...). Container: dict-like result object. this: this.field — access sibling. LazyBound: recursive structures. Claude Code generates construct binary protocol parsers, file format handlers, and network packet decoders.
CLAUDE.md for construct
## construct Stack
- Version: construct >= 2.10 | pip install construct
- Struct: Struct("field"/Type, "field2"/Type2)
- Types: Int8ub|Int16ul|Int32sb|Int64ub | Float32b | Bytes(n) | GreedyBytes
- String: PascalString(Int8ub, "utf8") | CString("utf8") | GreedyString("utf8")
- Repeat: Array(n, Type) | PrefixedArray(Int8ub, Type) | GreedyRange(Type)
- Conditional: Switch(this.type, {0: T0, 1: T1}) | If(this.flag, Type)
construct Binary Parsing Pipeline
# app/binary.py — construct Struct definitions, parsing, building, protocols, file formats
from __future__ import annotations
import io
from pathlib import Path
from typing import Any
from construct import (
Array,
Byte,
Bytes,
ByteSwapped,
Computed,
Const,
CString,
Default,
Enum,
Flag,
GreedyBytes,
GreedyRange,
GreedyString,
If,
IfThenElse,
Int8sb,
Int8ub,
Int16sb,
Int16ub,
Int16ul,
Int32sb,
Int32ub,
Int32ul,
Int64ub,
Int64ul,
Optional,
Padding,
Pass,
PascalString,
Prefixed,
PrefixedArray,
Rebuild,
RepeatUntil,
Select,
Sequence,
Struct,
Switch,
this,
Const,
Float32b,
Float64b,
BitStruct,
BitsSwapped,
FlagsEnum,
NullTerminated,
ChecksumError,
RawCopy,
Tunnel,
Compressed,
Checksum,
Container,
ListContainer,
)
# ─────────────────────────────────────────────────────────────────────────────
# 1. Basic type helpers
# ─────────────────────────────────────────────────────────────────────────────
def parse_bytes(definition, data: bytes) -> Container:
"""
Parse bytes using a construct definition.
Example:
result = parse_bytes(MyStruct, raw_bytes)
print(result.field_name)
"""
return definition.parse(data)
def build_bytes(definition, obj: dict | Container) -> bytes:
"""
Serialize a dict/Container to bytes using a construct definition.
Example:
data = build_bytes(MyStruct, {"magic": 0x1234, "version": 1})
"""
return definition.build(obj)
def parse_file(definition, path: str | Path) -> Container:
"""Parse a binary file using a construct definition."""
return definition.parse_file(str(path))
def size_of(definition) -> int | None:
"""
Return the fixed byte size of a definition, or None if variable-length.
"""
try:
return definition.sizeof()
except Exception:
return None
# ─────────────────────────────────────────────────────────────────────────────
# 2. Common protocol primitives
# ─────────────────────────────────────────────────────────────────────────────
# IPv4 packet header (simplified)
IPv4Header = Struct(
"version_ihl" / Byte, # version(4b) + IHL(4b)
"tos" / Byte, # type of service
"total_length" / Int16ub,
"id" / Int16ub,
"flags_offset" / Int16ub, # flags(3b) + fragment offset(13b)
"ttl" / Byte,
"protocol" / Byte, # 6=TCP, 17=UDP, 1=ICMP
"checksum" / Int16ub,
"src_ip" / Bytes(4),
"dst_ip" / Bytes(4),
)
# DNS question record
DNSQuestion = Struct(
"qname" / RepeatUntil(lambda x, lst, ctx: x == 0, Byte),
"qtype" / Int16ub,
"qclass" / Int16ub,
)
# TLV (Type-Length-Value) — common in binary protocols
TLVRecord = Struct(
"type" / Int8ub,
"length" / Int16ub,
"value" / Bytes(this.length),
)
def parse_tlv_stream(data: bytes) -> list[Container]:
"""Parse a stream of TLV records until data is exhausted."""
stream = io.BytesIO(data)
records = []
while stream.tell() < len(data):
try:
rec = TLVRecord.parse_stream(stream)
records.append(rec)
except Exception:
break
return records
# ─────────────────────────────────────────────────────────────────────────────
# 3. Message framing formats
# ─────────────────────────────────────────────────────────────────────────────
# Version-tagged message envelope
MessageTypeEnum = Enum(Byte, PING=0, PONG=1, DATA=2, ACK=3, ERROR=4)
PingPayload = Struct("seq" / Int32ub)
PongPayload = Struct("seq" / Int32ub)
DataPayload = Struct(
"seq" / Int32ub,
"length" / Rebuild(Int16ub, lambda ctx: len(ctx.get("body", b""))),
"body" / Bytes(this.length),
)
ErrorPayload = Struct(
"code" / Int16ub,
"message" / PascalString(Int8ub, "utf8"),
)
Message = Struct(
"magic" / Const(b"\xCA\xFE"),
"version" / Default(Byte, 1),
"type" / MessageTypeEnum,
"payload" / Switch(this.type, {
"PING": PingPayload,
"PONG": PongPayload,
"DATA": DataPayload,
"ACK": Struct("seq" / Int32ub),
"ERROR": ErrorPayload,
}),
)
def encode_message(msg_type: str, **payload_fields) -> bytes:
"""
Build a framed Message.
Example:
raw = encode_message("DATA", seq=1, body=b"hello world")
raw = encode_message("PING", seq=42)
"""
return Message.build({
"version": 1,
"type": msg_type,
"payload": payload_fields,
})
def decode_message(data: bytes) -> Container:
"""
Parse a framed Message from raw bytes.
Example:
msg = decode_message(raw_bytes)
if msg.type == "DATA":
process(msg.payload.body)
"""
return Message.parse(data)
# ─────────────────────────────────────────────────────────────────────────────
# 4. File format: simple binary file
# ─────────────────────────────────────────────────────────────────────────────
# Generic binary file with a header, directory table, and data blobs
FileHeaderStruct = Struct(
"magic" / Const(b"BDAT"),
"version" / Int16ub,
"entry_count" / Int16ub,
"flags" / Default(Int8ub, 0),
"reserved" / Padding(3),
)
FileEntryStruct = Struct(
"name" / PascalString(Int8ub, "utf8"),
"offset" / Int32ub,
"length" / Int32ub,
"crc" / Int32ub,
)
BinaryFile = Struct(
"header" / FileHeaderStruct,
"entries" / Array(this.header.entry_count, FileEntryStruct),
"data" / GreedyBytes,
)
def create_binary_file(entries: dict[str, bytes]) -> bytes:
"""
Create a simple binary container with named blobs.
Example:
container = create_binary_file({
"config": b'{"debug": true}',
"data": raw_data_bytes,
})
"""
import zlib
entry_list = []
blobs = []
offset = 0
for name, blob in entries.items():
entry_list.append({
"name": name,
"offset": offset,
"length": len(blob),
"crc": zlib.crc32(blob) & 0xFFFFFFFF,
})
blobs.append(blob)
offset += len(blob)
return BinaryFile.build({
"header": {
"version": 1,
"entry_count": len(entry_list),
},
"entries": entry_list,
"data": b"".join(blobs),
})
def read_binary_file(data: bytes) -> dict[str, bytes]:
"""
Parse a binary container and return {name: bytes} dict.
Example:
blobs = read_binary_file(container_bytes)
config = json.loads(blobs["config"])
"""
container = BinaryFile.parse(data)
raw = container.data
result = {}
for entry in container.entries:
result[entry.name] = raw[entry.offset: entry.offset + entry.length]
return result
# ─────────────────────────────────────────────────────────────────────────────
# 5. Bit-level parsing
# ─────────────────────────────────────────────────────────────────────────────
# TCP flags byte (simplified)
TCPFlags = BitStruct(
"cwr" / Flag,
"ece" / Flag,
"urg" / Flag,
"ack" / Flag,
"psh" / Flag,
"rst" / Flag,
"syn" / Flag,
"fin" / Flag,
)
def parse_tcp_flags(flags_byte: int) -> Container:
"""
Parse TCP control bits from a single byte.
Example:
flags = parse_tcp_flags(0x12) # SYN+ACK
assert flags.syn and flags.ack
"""
return TCPFlags.parse(bytes([flags_byte]))
def build_tcp_flags(**flags: bool) -> int:
"""
Build a TCP flags byte from named flags.
Example:
byte = build_tcp_flags(syn=True, ack=True) # 0x12
"""
all_flags = {k: False for k in ["cwr","ece","urg","ack","psh","rst","syn","fin"]}
all_flags.update(flags)
encoded = TCPFlags.build(all_flags)
return encoded[0]
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== TLV parsing ===")
# Build TLV records manually
r1 = TLVRecord.build({"type": 1, "length": 4, "value": b"test"})
r2 = TLVRecord.build({"type": 2, "length": 3, "value": b"abc"})
records = parse_tlv_stream(r1 + r2)
for r in records:
print(f" type={r.type} len={r.length} value={r.value!r}")
print("\n=== Message framing ===")
raw_ping = encode_message("PING", seq=1)
print(f" PING bytes ({len(raw_ping)}): {raw_ping.hex()}")
decoded = decode_message(raw_ping)
print(f" Decoded: type={decoded.type} seq={decoded.payload.seq}")
raw_data = encode_message("DATA", seq=7, body=b"hello world")
msg = decode_message(raw_data)
print(f" DATA: seq={msg.payload.seq} body={msg.payload.body!r}")
print("\n=== Binary file container ===")
container = create_binary_file({
"config": b'{"debug": true}',
"model": b"\x00" * 16,
})
print(f" Container size: {len(container)} bytes")
blobs = read_binary_file(container)
for name, blob in blobs.items():
print(f" {name!r}: {len(blob)} bytes")
print("\n=== TCP flags ===")
syn_ack = parse_tcp_flags(0x12)
print(f" 0x12: syn={syn_ack.syn} ack={syn_ack.ack} fin={syn_ack.fin}")
built = build_tcp_flags(syn=True, ack=True)
print(f" syn+ack built: 0x{built:02x}")
print("\n=== IPv4 header size ===")
print(f" IPv4Header.sizeof(): {size_of(IPv4Header)}")
For the struct stdlib alternative — Python’s built-in struct.pack/struct.unpack is fast and zero-dependency but requires manual format string management and positional tuple access; construct defines formats declaratively with named fields, conditional parsing (Switch, If), enums, and build() — use struct for trivial fixed-format parsing or performance-critical hot paths, construct when you’re implementing a complete protocol parser with conditional and variable-length fields. For the bitstruct / bitstring alternative — bitstruct and bitstring focus specifically on bit-level manipulation and stream reading; construct provides integrated bit-level parsing via BitStruct, Flag, and BitsSwapped alongside the full byte-level Struct system — use construct for protocols that mix byte-level and bit-level fields in the same definition. The Claude Skills 360 bundle includes construct skill sets covering parse_bytes()/build_bytes()/parse_file()/size_of(), TLVRecord stream parsing, IPv4Header/DNSQuestion primitives, MessageTypeEnum/Message framing with Switch payload dispatch, encode_message()/decode_message(), FileHeaderStruct/BinaryFile container create_binary_file()/read_binary_file(), TCPFlags BitStruct parse_tcp_flags()/build_tcp_flags(). Start with the free tier to try binary protocol parsing and serialization code generation.