PyAnnote performs speaker diarization — who spoke when. pip install pyannote.audio. Requires HuggingFace token and access to gated models at hf.co/pyannote. from pyannote.audio import Pipeline. Diarization: pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=HF_TOKEN), diarization = pipeline("audio.wav"), for turn, _, speaker in diarization.itertracks(yield_label=True): print(f"{turn.start:.2f}s - {turn.end:.2f}s: {speaker}"). Speaker count: pipeline("audio.wav", num_speakers=2) or min_speakers=2, max_speakers=5. RTTM: with open("out.rttm","w") as f: diarization.write_rttm(f). Voice activity: from pyannote.audio import Pipeline as VADPipeline, vad = VADPipeline.from_pretrained("pyannote/voice-activity-detection"), speech = vad("audio.wav"), speech.get_timeline().support(). Speaker verification: from pyannote.audio import Model, model = Model.from_pretrained("pyannote/embedding"), from pyannote.audio import Inference, inference = Inference(model, window="whole"), embedding = inference("audio.wav"). Two speakers: score = 1 - scipy.spatial.distance.cdist([emb1],[emb2],"cosine")[0,0]. Overlap detection: Pipeline.from_pretrained("pyannote/overlapped-speech-detection"). Segmentation: Pipeline.from_pretrained("pyannote/speaker-segmentation-3.0"). GPU: pipeline.to(torch.device("cuda")). pyannote.core.Segment(start, end) for manual annotations. Claude Code generates PyAnnote diarization pipelines, speaker-attributed transcripts, RTTM writers, and audio segmentation scripts.
CLAUDE.md for PyAnnote
## PyAnnote Stack
- Version: pyannote.audio >= 3.1 (requires HF token + model access)
- Diarize: Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=HF_TOKEN)
- Run: diarization = pipeline(audio_path, num_speakers=N | min/max_speakers=N)
- Iterate: for turn, _, speaker in diarization.itertracks(yield_label=True)
- VAD: Pipeline.from_pretrained("pyannote/voice-activity-detection")
- Embed: Inference(Model.from_pretrained("pyannote/embedding"), window="whole")
- GPU: pipeline.to(torch.device("cuda"))
- RTTM: diarization.write_rttm(file)
- Integrate: combine with Whisper transcription for speaker-attributed transcript
PyAnnote Diarization Pipeline
# audio/pyannote_diarization.py — speaker diarization with PyAnnote Audio
from __future__ import annotations
import os
from pathlib import Path
from typing import Optional
import torch
import torchaudio
import numpy as np
from scipy.spatial.distance import cdist
from pyannote.audio import Pipeline, Model, Inference
from pyannote.core import Annotation, Segment, Timeline
HF_TOKEN = os.environ.get("HF_TOKEN", "")
# ── 1. Pipeline loading ───────────────────────────────────────────────────────
def load_diarization_pipeline(
model_name: str = "pyannote/speaker-diarization-3.1",
device: str = "cpu",
) -> Pipeline:
"""
Load the speaker diarization pipeline.
Requires accepting license at https://hf.co/pyannote/speaker-diarization-3.1
"""
pipeline = Pipeline.from_pretrained(model_name, use_auth_token=HF_TOKEN)
if device == "cuda" and torch.cuda.is_available():
pipeline.to(torch.device("cuda"))
print(f"PyAnnote diarization on GPU")
else:
print(f"PyAnnote diarization on CPU")
return pipeline
def load_vad_pipeline(device: str = "cpu") -> Pipeline:
"""Load voice activity detection pipeline."""
pipeline = Pipeline.from_pretrained(
"pyannote/voice-activity-detection",
use_auth_token=HF_TOKEN,
)
if device == "cuda":
pipeline.to(torch.device("cuda"))
return pipeline
def load_embedding_model(device: str = "cpu") -> Inference:
"""Load speaker embedding model for verification."""
model = Model.from_pretrained(
"pyannote/embedding",
use_auth_token=HF_TOKEN,
)
inference = Inference(
model,
window="whole",
device=torch.device(device),
)
return inference
# ── 2. Speaker diarization ────────────────────────────────────────────────────
def diarize(
pipeline: Pipeline,
audio_path: str,
num_speakers: int = None,
min_speakers: int = None,
max_speakers: int = None,
) -> Annotation:
"""
Run speaker diarization on an audio file.
Returns pyannote Annotation with speaker turns.
"""
params = {}
if num_speakers:
params["num_speakers"] = num_speakers
elif min_speakers or max_speakers:
if min_speakers:
params["min_speakers"] = min_speakers
if max_speakers:
params["max_speakers"] = max_speakers
diarization = pipeline(audio_path, **params)
return diarization
def format_diarization(diarization: Annotation) -> list[dict]:
"""Convert diarization to list of segment dicts."""
segments = []
for turn, _, speaker in diarization.itertracks(yield_label=True):
segments.append({
"speaker": speaker,
"start": round(turn.start, 3),
"end": round(turn.end, 3),
"duration": round(turn.end - turn.start, 3),
})
return sorted(segments, key=lambda x: x["start"])
def diarize_and_print(
pipeline: Pipeline,
audio_path: str,
**kwargs,
) -> list[dict]:
"""Diarize audio and print speaker turns."""
diarization = diarize(pipeline, audio_path, **kwargs)
segments = format_diarization(diarization)
speakers = set(s["speaker"] for s in segments)
print(f"\nDetected {len(speakers)} speakers in {audio_path}:")
for seg in segments:
bar = "█" * int(seg["duration"] * 5)
print(f" [{seg['start']:6.2f}s - {seg['end']:6.2f}s] {seg['speaker']}: {bar}")
return segments
def save_rttm(diarization: Annotation, audio_path: str, output_path: str = None) -> str:
"""Save diarization in RTTM format for evaluation tools."""
if output_path is None:
output_path = Path(audio_path).with_suffix(".rttm")
with open(output_path, "w") as f:
diarization.write_rttm(f)
print(f"RTTM saved: {output_path}")
return str(output_path)
# ── 3. Voice activity detection ───────────────────────────────────────────────
def detect_speech(
vad_pipeline: Pipeline,
audio_path: str,
) -> list[dict]:
"""
Detect speech vs silence.
Returns list of speech segments with start/end times.
"""
speech_timeline = vad_pipeline(audio_path)
# get_timeline().support() merges overlapping speech regions
speech_segments = []
for segment in speech_timeline.get_timeline().support():
speech_segments.append({
"start": round(segment.start, 3),
"end": round(segment.end, 3),
"duration": round(segment.end - segment.start, 3),
})
total_speech = sum(s["duration"] for s in speech_segments)
print(f"Speech: {total_speech:.1f}s across {len(speech_segments)} segments")
return speech_segments
def extract_speech_segments(
audio_path: str,
speech_regions: list[dict],
output_dir: str = "./speech_segments",
min_duration: float = 0.5,
) -> list[str]:
"""Extract speech segments from audio file into separate WAV files."""
Path(output_dir).mkdir(parents=True, exist_ok=True)
signal, sr = torchaudio.load(audio_path)
output_paths = []
for i, seg in enumerate(speech_regions):
if seg["duration"] < min_duration:
continue
start_frame = int(seg["start"] * sr)
end_frame = int(seg["end"] * sr)
chunk = signal[:, start_frame:end_frame]
out_path = os.path.join(output_dir, f"speech_{i:04d}.wav")
torchaudio.save(out_path, chunk, sr)
output_paths.append(out_path)
print(f"Extracted {len(output_paths)} speech segments → {output_dir}")
return output_paths
# ── 4. Speaker embeddings and verification ────────────────────────────────────
class SpeakerVerifier:
"""Speaker verification using ECAPA-TDNN embeddings from PyAnnote."""
def __init__(self, inference: Inference, threshold: float = 0.6):
self.inference = inference
self.threshold = threshold
self._gallery: dict[str, np.ndarray] = {}
def embed_file(self, audio_path: str) -> np.ndarray:
"""Extract speaker embedding from audio file."""
embedding = self.inference(audio_path)
return embedding
def embed_segment(self, audio_path: str, start: float, end: float) -> np.ndarray:
"""Extract embedding from a specific segment."""
from pyannote.core import Segment
embedding = self.inference.crop(audio_path, Segment(start, end))
return embedding
def enroll(self, speaker_id: str, audio_paths: list[str]) -> np.ndarray:
"""Enroll a speaker with multiple recordings (average embedding)."""
embeddings = np.array([self.embed_file(p) for p in audio_paths])
mean_emb = embeddings.mean(axis=0)
mean_emb = mean_emb / np.linalg.norm(mean_emb) # L2 normalize
self._gallery[speaker_id] = mean_emb
print(f"Enrolled: {speaker_id} ({len(audio_paths)} recordings)")
return mean_emb
def verify(self, audio_path: str, claimed_id: str) -> dict:
"""Verify if audio matches claimed speaker identity."""
if claimed_id not in self._gallery:
raise KeyError(f"Speaker '{claimed_id}' not enrolled")
test_emb = self.embed_file(audio_path)
test_emb = test_emb / np.linalg.norm(test_emb)
ref_emb = self._gallery[claimed_id]
score = 1 - float(cdist([test_emb], [ref_emb], metric="cosine")[0, 0])
accepted = score >= self.threshold
return {
"claimed_id": claimed_id,
"score": round(score, 4),
"accepted": accepted,
"threshold": self.threshold,
}
def identify(self, audio_path: str) -> tuple[str, float]:
"""Identify speaker from gallery — return best match or 'unknown'."""
if not self._gallery:
raise RuntimeError("No speakers enrolled")
test_emb = self.embed_file(audio_path)
test_emb = test_emb / np.linalg.norm(test_emb)
ref_embs = np.array(list(self._gallery.values()))
ref_ids = list(self._gallery.keys())
scores = 1 - cdist([test_emb], ref_embs, metric="cosine")[0]
best_idx = int(np.argmax(scores))
best_score = float(scores[best_idx])
if best_score < self.threshold:
return "unknown", best_score
return ref_ids[best_idx], best_score
# ── 5. Speaker-attributed transcription (Whisper + PyAnnote) ─────────────────
def transcribe_with_speakers(
audio_path: str,
dia_pipeline: Pipeline,
whisper_model = None,
language: str = None,
num_speakers: int = None,
) -> list[dict]:
"""
Combine Whisper transcription with PyAnnote diarization.
Returns list of segments with text and speaker labels.
"""
# 1. Diarize
diarization = diarize(dia_pipeline, audio_path, num_speakers=num_speakers)
dia_segs = format_diarization(diarization)
# 2. Load audio for Whisper
signal, sr = torchaudio.load(audio_path)
if sr != 16000:
signal = torchaudio.functional.resample(signal, sr, 16000)
if whisper_model is None:
# Use faster-whisper if available
try:
from faster_whisper import WhisperModel
whisper_model = WhisperModel("base", device="cpu", compute_type="int8")
except ImportError:
raise ImportError("Install faster-whisper: pip install faster-whisper")
# 3. Transcribe each speaker turn
audio_np = signal.squeeze().numpy()
results = []
for seg in dia_segs:
if seg["duration"] < 0.3:
continue
start_sample = int(seg["start"] * 16000)
end_sample = int(seg["end"] * 16000)
chunk_np = audio_np[start_sample:end_sample]
# Transcribe chunk
chunk_path = f"/tmp/_chunk_{seg['start']:.3f}.wav"
torchaudio.save(chunk_path, torch.from_numpy(chunk_np).unsqueeze(0), 16000)
try:
from faster_whisper import WhisperModel
if hasattr(whisper_model, "transcribe"):
segs, _ = whisper_model.transcribe(
chunk_path,
language=language,
beam_size=3,
)
text = " ".join(s.text.strip() for s in segs)
else:
result = whisper_model.transcribe(chunk_path, language=language)
text = result["text"].strip()
except Exception:
text = ""
finally:
Path(chunk_path).unlink(missing_ok=True)
if text:
results.append({
"speaker": seg["speaker"],
"start": seg["start"],
"end": seg["end"],
"text": text,
})
return results
def save_attributed_transcript(segments: list[dict], output_path: str = "transcript.txt"):
"""Save speaker-attributed transcript to text file."""
lines = []
for seg in segments:
lines.append(f"[{seg['start']:>6.2f} - {seg['end']:>6.2f}] {seg['speaker']:>12}: {seg['text']}")
Path(output_path).write_text("\n".join(lines), encoding="utf-8")
print(f"Transcript saved: {output_path}")
# ── Demo ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
if not HF_TOKEN:
print("Set HF_TOKEN environment variable to use PyAnnote pretrained models")
print("Accept model access at: https://hf.co/pyannote/speaker-diarization-3.1")
exit(0)
# Load pipeline
pipeline = load_diarization_pipeline(device="cpu")
# Diarize an audio file
test_audio = "meeting.wav" # Replace with your audio file
segments = diarize_and_print(pipeline, test_audio, min_speakers=1, max_speakers=6)
# Save RTTM
diarization = diarize(pipeline, test_audio)
save_rttm(diarization, test_audio)
# Speaker-attributed transcript
from faster_whisper import WhisperModel
whisper = WhisperModel("base", device="cpu", compute_type="int8")
transcript_segs = transcribe_with_speakers(test_audio, pipeline, whisper, num_speakers=2)
save_attributed_transcript(transcript_segs)
for seg in transcript_segs[:5]:
print(f"[{seg['start']:.2f}s] {seg['speaker']}: {seg['text'][:70]}")
For the Whisper-native diarization alternative when using WhisperX which bundles word alignment + pyannote diarization in a single package — WhisperX provides a simplified one-command pipeline while using PyAnnote directly gives full control over each pipeline stage (VAD → segmentation → embedding → clustering), the ability to tune thresholds, and the option to use the verifier separately for applications like speaker access control that need verification scores without transcription. For the AssemblyAI speaker diarization API alternative when needing cloud-hosted diarization without GPU infrastructure and with speaker labels included in transcription JSON automatically — AssemblyAI handles infrastructure while PyAnnote’s completely offline processing means no audio leaves your servers, making it mandatory for healthcare, legal, and enterprise applications with data privacy requirements where audio files cannot be sent to third-party APIs. The Claude Skills 360 bundle includes PyAnnote skill sets covering diarization pipeline setup, RTTM output, voice activity detection, speaker embedding extraction, speaker enrollment and verification, and speaker-attributed Whisper transcription. Start with the free tier to try speaker diarization pipeline generation.