Haystack builds production document AI systems through composable Pipeline components. Each component — DocumentSplitter, SentenceTransformersDocumentEmbedder, InMemoryEmbeddingRetriever, PromptBuilder, OpenAIChatGenerator — does one thing and connects to others via typed sockets. Pipeline.connect() links output ports to input ports with type checking. DocumentStores abstract over Elasticsearch, Weaviate, OpenSearch, or in-memory storage. Hybrid retrieval combines BM25 keyword search with dense vector search for production accuracy. Custom components implement the @component decorator with run(). Claude Code generates Haystack indexing pipelines, retrieval-augmented generation chains, custom components, and the FastAPI deployment wrappers for document AI systems.
CLAUDE.md for Haystack Projects
## Haystack Stack
- Version: haystack-ai >= 2.6 (Haystack 2.x — NOT 1.x, breaking API change)
- DocumentStore: InMemoryDocumentStore (dev), WeaviateDocumentStore (prod vector), ElasticsearchDocumentStore (prod BM25)
- Embeddings: SentenceTransformersDocumentEmbedder + SentenceTransformersTextEmbedder
- LLM: OpenAIChatGenerator (GPT-4o), AnthropicChatGenerator (claude-sonnet-4-6)
- Hybrid: JoinDocuments + weighting of BM25 + vector retrieval results
- Custom: @component decorator with InputSocket/OutputSocket type annotation
- Testing: unit test components in isolation, integration test full pipelines
Document Indexing Pipeline
# pipelines/indexing.py — index documents into vector store
from haystack import Pipeline, Document
from haystack.components.converters import TextFileToDocument, PyPDFToDocument
from haystack.components.preprocessors import DocumentSplitter, DocumentCleaner
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.writers import DocumentWriter
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.document_stores.types import DuplicatePolicy
from pathlib import Path
def build_indexing_pipeline(document_store) -> Pipeline:
"""Build a pipeline to index PDF and text documents."""
pipeline = Pipeline()
# Add components
pipeline.add_component("pdf_converter", PyPDFToDocument())
pipeline.add_component("text_converter", TextFileToDocument())
pipeline.add_component("cleaner", DocumentCleaner(
remove_empty_lines=True,
remove_extra_whitespaces=True,
remove_repeated_substrings=False,
))
pipeline.add_component("splitter", DocumentSplitter(
split_by="sentence",
split_length=5,
split_overlap=1,
))
pipeline.add_component("embedder", SentenceTransformersDocumentEmbedder(
model="sentence-transformers/all-MiniLM-L6-v2",
batch_size=64,
progress_bar=True,
))
pipeline.add_component("writer", DocumentWriter(
document_store=document_store,
policy=DuplicatePolicy.OVERWRITE,
))
# Connect components: output_socket -> input_socket
# Note: pdf and text converters both merge into cleaner
# For single source, connect directly:
pipeline.connect("pdf_converter", "cleaner")
pipeline.connect("cleaner", "splitter")
pipeline.connect("splitter", "embedder")
pipeline.connect("embedder", "writer")
return pipeline
def index_documents(file_paths: list[str], document_store) -> dict:
"""Index a list of documents and return stats."""
pipeline = build_indexing_pipeline(document_store)
pipeline.warm_up() # Load embedding model
# Separate PDFs and text files
pdf_files = [f for f in file_paths if f.endswith(".pdf")]
text_files = [f for f in file_paths if not f.endswith(".pdf")]
results = {}
if pdf_files:
result = pipeline.run({
"pdf_converter": {"sources": [Path(f) for f in pdf_files]}
})
results["pdfs"] = result
if text_files:
result = pipeline.run({
"text_converter": {"sources": [Path(f) for f in text_files]}
})
results["texts"] = result
count = document_store.count_documents()
print(f"Indexed documents. Total in store: {count}")
return results
RAG Query Pipeline
# pipelines/rag.py — retrieval-augmented generation pipeline
from haystack import Pipeline
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import PromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
import os
RAG_PROMPT = """
You are a helpful assistant that answers questions based on the provided documents.
If the documents don't contain enough information to answer confidently, say so.
Documents:
{% for doc in documents %}
---
Source: {{ doc.meta.get('file_path', 'unknown') }}
{{ doc.content }}
{% endfor %}
---
Question: {{ question }}
Answer:
"""
def build_rag_pipeline(document_store) -> Pipeline:
"""Build an end-to-end RAG pipeline."""
pipeline = Pipeline()
pipeline.add_component("embedder", SentenceTransformersTextEmbedder(
model="sentence-transformers/all-MiniLM-L6-v2",
))
pipeline.add_component("retriever", InMemoryEmbeddingRetriever(
document_store=document_store,
top_k=5,
))
pipeline.add_component("prompt_builder", PromptBuilder(
template=RAG_PROMPT,
))
pipeline.add_component("llm", OpenAIChatGenerator(
model="gpt-4o",
api_key=os.environ["OPENAI_API_KEY"],
generation_kwargs={
"temperature": 0.1,
"max_tokens": 1024,
},
))
# Connect the pipeline
pipeline.connect("embedder.embedding", "retriever.query_embedding")
pipeline.connect("retriever.documents", "prompt_builder.documents")
pipeline.connect("prompt_builder.prompt", "llm.messages")
return pipeline
def answer_question(question: str, pipeline: Pipeline) -> dict:
"""Run a question through the RAG pipeline."""
result = pipeline.run({
"embedder": {"text": question},
"prompt_builder": {"question": question},
})
response = result["llm"]["replies"][0].content
retrieved_docs = result["retriever"]["documents"]
return {
"answer": response,
"sources": [
{
"content": doc.content[:200],
"source": doc.meta.get("file_path", "unknown"),
"score": doc.score,
}
for doc in retrieved_docs
],
}
Hybrid Retrieval (BM25 + Vector)
# pipelines/hybrid_rag.py — combine keyword and semantic search
from haystack import Pipeline
from haystack.components.retrievers.in_memory import (
InMemoryEmbeddingRetriever,
InMemoryBM25Retriever,
)
from haystack.components.joiners import DocumentJoiner
from haystack.components.rankers import MetaFieldRanker
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.builders import PromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
def build_hybrid_rag_pipeline(document_store) -> Pipeline:
"""Hybrid RAG: combine BM25 keyword + dense vector retrieval."""
pipeline = Pipeline()
# Embedder for vector retrieval
pipeline.add_component("embedder", SentenceTransformersTextEmbedder(
model="sentence-transformers/all-MiniLM-L6-v2"
))
# BM25 retriever: good for exact keyword matches
pipeline.add_component("bm25_retriever", InMemoryBM25Retriever(
document_store=document_store,
top_k=10,
))
# Vector retriever: good for semantic similarity
pipeline.add_component("vector_retriever", InMemoryEmbeddingRetriever(
document_store=document_store,
top_k=10,
))
# Join and deduplicate results — reciprocal rank fusion
pipeline.add_component("joiner", DocumentJoiner(
join_mode="reciprocal_rank_fusion",
top_k=7,
))
pipeline.add_component("prompt_builder", PromptBuilder(template=RAG_PROMPT))
pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o"))
# BM25 doesn't need embedding
pipeline.connect("bm25_retriever.documents", "joiner.documents")
# Vector path
pipeline.connect("embedder.embedding", "vector_retriever.query_embedding")
pipeline.connect("vector_retriever.documents", "joiner.documents")
# Joined → prompt → LLM
pipeline.connect("joiner.documents", "prompt_builder.documents")
pipeline.connect("prompt_builder.prompt", "llm.messages")
return pipeline
def run_hybrid_query(question: str, pipeline: Pipeline) -> dict:
result = pipeline.run({
"embedder": {"text": question},
"bm25_retriever": {"query": question},
"prompt_builder": {"question": question},
})
return {
"answer": result["llm"]["replies"][0].content,
"n_docs": len(result["joiner"]["documents"]),
}
Custom Component
# components/query_expander.py — custom Haystack component
from haystack import component, default_from_dict, default_to_dict
from haystack.dataclasses import ChatMessage
from haystack.components.generators.chat import OpenAIChatGenerator
from typing import List
import os
@component
class QueryExpander:
"""Expand a query into multiple rephrased variants for better recall."""
def __init__(self, n_variants: int = 3):
self.n_variants = n_variants
self.generator = OpenAIChatGenerator(
model="gpt-4o-mini",
api_key=os.environ["OPENAI_API_KEY"],
)
@component.output_types(queries=List[str])
def run(self, query: str) -> dict:
"""Expand query into N rephrased variants."""
messages = [ChatMessage.from_user(
f"""Generate {self.n_variants} different phrasings of this question.
Return only the rephrased questions, one per line, no numbering.
Original: {query}"""
)]
result = self.generator.run(messages=messages)
response_text = result["replies"][0].content
variants = [line.strip() for line in response_text.strip().split("\n") if line.strip()]
all_queries = [query] + variants[:self.n_variants]
return {"queries": all_queries}
def to_dict(self) -> dict:
return default_to_dict(self, n_variants=self.n_variants)
@classmethod
def from_dict(cls, data: dict):
return default_from_dict(cls, data)
FastAPI Deployment
# api/main.py — serve Haystack pipeline via FastAPI
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from haystack.document_stores.in_memory import InMemoryDocumentStore
from pipelines.hybrid_rag import build_hybrid_rag_pipeline
from pipelines.indexing import index_documents
import os
app = FastAPI(title="Document QA API")
# Shared state
document_store = InMemoryDocumentStore()
rag_pipeline = None
@app.on_event("startup")
async def startup():
global rag_pipeline
# Index documents on startup
docs_dir = os.environ.get("DOCS_DIR", "./documents")
if os.path.exists(docs_dir):
files = [str(f) for f in Path(docs_dir).glob("**/*") if f.is_file()]
if files:
index_documents(files, document_store)
rag_pipeline = build_hybrid_rag_pipeline(document_store)
rag_pipeline.warm_up()
class QuestionRequest(BaseModel):
question: str
top_k: int = 5
class AnswerResponse(BaseModel):
answer: str
sources: list[dict]
@app.post("/ask", response_model=AnswerResponse)
async def ask_question(request: QuestionRequest):
if not rag_pipeline:
raise HTTPException(503, "Pipeline not initialized")
result = run_hybrid_query(request.question, rag_pipeline)
return AnswerResponse(
answer=result["answer"],
sources=result.get("sources", []),
)
@app.post("/index")
async def index_new_documents(file_paths: list[str]):
"""Index new documents at runtime."""
global rag_pipeline
index_documents(file_paths, document_store)
rag_pipeline = build_hybrid_rag_pipeline(document_store)
rag_pipeline.warm_up()
return {"indexed": len(file_paths), "total": document_store.count_documents()}
For the LlamaIndex alternative that focuses on knowledge graph construction and multi-index routing rather than Haystack’s component pipeline model, see the LlamaIndex guide for ingestion pipelines and query engines. For the DSPy optimizer that treats RAG as a compiled program and auto-optimizes prompts and retrieval, the DSPy guide covers declarative LLM programs. The Claude Skills 360 bundle includes Haystack skill sets covering indexing pipelines, hybrid retrieval, and custom components. Start with the free tier to try Haystack pipeline generation.