LlamaIndex orchestrates the full RAG pipeline: document ingestion, chunking, embedding, vector storage, retrieval, and response synthesis. The high-level VectorStoreIndex handles the happy path. Custom NodeParser controls chunking strategy. QueryEngine composes retrieval with response synthesis. RouterQueryEngine routes questions to the right index. RAGAS evaluates faithfulness and answer relevance objectively. Claude Code generates LlamaIndex ingestion pipelines, custom retrievers, evaluation harnesses, and the production configurations that ship reliable RAG applications.
CLAUDE.md for LlamaIndex Projects
## LlamaIndex Stack
- Version: llama-index-core >= 0.11, llama-index-llms-anthropic >= 0.4
- LLM: Claude claude-sonnet-4-6 (synthesis), claude-haiku-4-5-20251001 (classification)
- Embeddings: text-embedding-3-large (OpenAI) or voyage-3 (Anthropic Voyage)
- Vector store: Pinecone (production), ChromaDB (local dev)
- Chunking: SentenceSplitter, chunk_size=512, overlap=50
- Retrieval: top_k=5, similarity_threshold=0.75
- Evaluation: RAGAS with faithfulness + answer_relevancy metrics
- Persist: docstore + index store to disk for fast restarts
Document Ingestion Pipeline
# ingest/pipeline.py — document ingestion with metadata
from llama_index.core import (
VectorStoreIndex,
StorageContext,
Settings,
Document,
)
from llama_index.core.ingestion import IngestionPipeline, IngestionCache
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import (
TitleExtractor,
QuestionsAnsweredExtractor,
SummaryExtractor,
)
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.anthropic import Anthropic
from llama_index.vector_stores.pinecone import PineconeVectorStore
from pinecone import Pinecone
import hashlib
# Configure global settings
Settings.llm = Anthropic(
model="claude-sonnet-4-6",
max_tokens=4096,
temperature=0.1,
)
Settings.embed_model = OpenAIEmbedding(
model="text-embedding-3-large",
dimensions=1536,
)
def build_ingestion_pipeline(vector_store: PineconeVectorStore) -> IngestionPipeline:
"""Build ingestion pipeline with caching to skip already-processed docs."""
cache = IngestionCache(
cache=SimpleDocumentStore.from_persist_dir("./cache"),
collection="ingestion_cache",
)
return IngestionPipeline(
transformations=[
# 1. Split into chunks
SentenceSplitter(
chunk_size=512,
chunk_overlap=50,
paragraph_separator="\n\n",
),
# 2. Extract metadata (uses LLM — add cost)
TitleExtractor(nodes=5),
QuestionsAnsweredExtractor(questions=3),
# 3. Generate embeddings
Settings.embed_model,
],
vector_store=vector_store,
cache=cache,
)
def ingest_documents(docs: list[Document], pipeline: IngestionPipeline) -> list:
"""Ingest documents, deduplicating by content hash."""
# Add content hash for deduplication
for doc in docs:
content_hash = hashlib.md5(doc.text.encode()).hexdigest()
doc.metadata["content_hash"] = content_hash
doc.doc_id = content_hash # Use hash as stable ID
nodes = pipeline.run(documents=docs, show_progress=True)
print(f"Ingested {len(nodes)} nodes from {len(docs)} documents")
return nodes
# Usage
def ingest_knowledge_base(file_paths: list[str]):
from llama_index.readers.file import PDFReader, DocxReader, MarkdownReader
pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
index_name = "knowledge-base"
if index_name not in [i.name for i in pc.list_indexes()]:
pc.create_index(
name=index_name,
dimension=1536,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
vector_store = PineconeVectorStore(pinecone_index=pc.Index(index_name))
pipeline = build_ingestion_pipeline(vector_store)
documents = []
for path in file_paths:
if path.endswith(".pdf"):
docs = PDFReader().load_data(path)
elif path.endswith(".md"):
docs = MarkdownReader().load_data(path)
else:
docs = SimpleDirectoryReader(input_files=[path]).load_data()
# Add source metadata
for doc in docs:
doc.metadata.update({
"source": path,
"file_type": path.split(".")[-1],
})
documents.extend(docs)
return ingest_documents(documents, pipeline)
Query Engine Setup
# query/engine.py — query engine with custom retrieval
from llama_index.core import VectorStoreIndex, QueryBundle
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.postprocessor import (
SimilarityPostprocessor,
KeywordNodePostprocessor,
LLMRerank,
)
from llama_index.core.response_synthesizers import get_response_synthesizer
def build_query_engine(
index: VectorStoreIndex,
top_k: int = 5,
rerank: bool = True,
) -> RetrieverQueryEngine:
"""Build query engine with retrieval, reranking, and synthesis."""
# Retriever
retriever = VectorIndexRetriever(
index=index,
similarity_top_k=top_k * 2 if rerank else top_k, # Fetch more for reranking
)
# Post-processors
postprocessors = [
# Filter out low-relevance nodes
SimilarityPostprocessor(similarity_cutoff=0.7),
]
if rerank:
# LLM-based reranking for better precision
postprocessors.append(
LLMRerank(
choice_batch_size=5,
top_n=top_k,
)
)
# Response synthesizer
synthesizer = get_response_synthesizer(
response_mode="tree_summarize", # Best for multi-document synthesis
use_async=True,
verbose=False,
)
return RetrieverQueryEngine(
retriever=retriever,
response_synthesizer=synthesizer,
node_postprocessors=postprocessors,
)
# Streaming response
async def query_streaming(engine: RetrieverQueryEngine, question: str):
"""Stream the response token by token."""
streaming_engine = engine.as_query_engine(streaming=True)
response = streaming_engine.query(question)
for token in response.response_gen:
yield token
# Access source documents
print("\n\nSources:")
for node in response.source_nodes:
print(f" [{node.score:.2f}] {node.metadata.get('source', 'Unknown')}")
print(f" {node.text[:200]}...")
Router Query Engine for Multi-Index
# query/router.py — route questions to the right index
from llama_index.core.query_engine import RouterQueryEngine
from llama_index.core.selectors import LLMSingleSelector
from llama_index.core.tools import QueryEngineTool
def build_router_engine(
product_engine: RetrieverQueryEngine,
policy_engine: RetrieverQueryEngine,
technical_engine: RetrieverQueryEngine,
) -> RouterQueryEngine:
"""Route queries to the most relevant index."""
tools = [
QueryEngineTool.from_defaults(
query_engine=product_engine,
description=(
"Useful for answering questions about product features, "
"pricing, availability, and specifications."
),
),
QueryEngineTool.from_defaults(
query_engine=policy_engine,
description=(
"Useful for answering questions about company policies, "
"return policies, shipping, warranties, and legal terms."
),
),
QueryEngineTool.from_defaults(
query_engine=technical_engine,
description=(
"Useful for answering technical questions, API documentation, "
"integration guides, and developer resources."
),
),
]
return RouterQueryEngine(
selector=LLMSingleSelector.from_defaults(),
query_engine_tools=tools,
)
Hybrid Search
# query/hybrid.py — combine dense and sparse (BM25) search
from llama_index.core.retrievers import QueryFusionRetriever
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.core.node_parser import SentenceSplitter
def build_hybrid_retriever(
index: VectorStoreIndex,
nodes: list,
top_k: int = 5,
) -> QueryFusionRetriever:
"""Combine semantic (vector) and lexical (BM25) search."""
vector_retriever = VectorIndexRetriever(
index=index,
similarity_top_k=top_k,
)
bm25_retriever = BM25Retriever.from_defaults(
nodes=nodes,
similarity_top_k=top_k,
)
return QueryFusionRetriever(
retrievers=[vector_retriever, bm25_retriever],
similarity_top_k=top_k,
num_queries=1, # No query expansion (set >1 for HyDE)
mode="reciprocal_rerank", # RRF fusion
use_async=True,
verbose=False,
)
RAG Evaluation with RAGAS
# eval/evaluate.py — evaluate RAG quality with RAGAS
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall,
)
from ragas.integrations.llama_index import evaluate as ragas_evaluate
from datasets import Dataset
def evaluate_rag_pipeline(
query_engine: RetrieverQueryEngine,
test_questions: list[str],
ground_truth_answers: list[str],
) -> dict:
"""Evaluate RAG pipeline on test dataset."""
# Collect responses and contexts
results = []
for question, ground_truth in zip(test_questions, ground_truth_answers):
response = query_engine.query(question)
results.append({
"question": question,
"answer": str(response),
"contexts": [node.text for node in response.source_nodes],
"ground_truth": ground_truth,
})
# Create dataset for RAGAS
dataset = Dataset.from_list(results)
# Evaluate with multiple metrics
scores = evaluate(
dataset=dataset,
metrics=[
faithfulness, # Is the answer grounded in retrieved context?
answer_relevancy, # Is the answer relevant to the question?
context_precision, # Is retrieved context precise?
context_recall, # Is all necessary info retrieved?
],
)
print(f"Faithfulness: {scores['faithfulness']:.3f}")
print(f"Answer Relevancy: {scores['answer_relevancy']:.3f}")
print(f"Context Precision: {scores['context_precision']:.3f}")
print(f"Context Recall: {scores['context_recall']:.3f}")
return scores.to_pandas().to_dict()
# Run evaluation
TEST_QUESTIONS = [
"What is your return policy for electronics?",
"How do I integrate with the REST API?",
"What payment methods do you accept?",
]
GROUND_TRUTH = [
"Electronics can be returned within 30 days with original packaging.",
"The REST API uses OAuth 2.0 and returns JSON responses.",
"We accept Visa, Mastercard, Amex, and PayPal.",
]
results = evaluate_rag_pipeline(query_engine, TEST_QUESTIONS, GROUND_TRUTH)
FastAPI Production Integration
# api/main.py — FastAPI service exposing RAG query
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio
app = FastAPI(title="RAG API")
class QueryRequest(BaseModel):
question: str
stream: bool = False
top_k: int = 5
class QueryResponse(BaseModel):
answer: str
sources: list[dict]
latency_ms: float
# Initialize query engine at startup
@app.on_event("startup")
async def load_index():
app.state.engine = await build_query_engine_async()
@app.post("/query", response_model=QueryResponse)
async def query(req: QueryRequest):
import time
start = time.time()
response = await app.state.engine.aquery(req.question)
return QueryResponse(
answer=str(response),
sources=[
{
"text": node.text[:500],
"source": node.metadata.get("source"),
"score": node.score,
}
for node in response.source_nodes
],
latency_ms=(time.time() - start) * 1000,
)
@app.post("/query/stream")
async def query_stream(req: QueryRequest):
async def generate():
async for token in query_streaming(app.state.engine, req.question):
yield f"data: {token}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
For the AWS Bedrock alternative for enterprise RAG with Claude using managed Knowledge Bases, see the AWS Bedrock guide for fully managed RAG infrastructure. For the vector database backends that LlamaIndex integrates with, the vector databases guide covers Pinecone, Weaviate, and pgvector. The Claude Skills 360 bundle includes LlamaIndex skill sets covering ingestion pipelines, hybrid search, and RAGAS evaluation. Start with the free tier to try RAG pipeline generation.