NATS JetStream: Lightweight Alternative to Kafka
Kafka is powerful but complex. NATS JetStream provides similar persistence and streaming capabilities with 10x simpler ops. Single binary, no ZooKeeper, no JVM.
TL;DR
- NATS = ultra-fast messaging (5M+ msg/sec)
- JetStream = persistent streaming (Kafka-like)
- Single binary, ~20MB memory footprint
- Exactly-once delivery, replay, consumer groups
- Kubernetes operator included
NATS vs Kafka
FEATURE NATS JETSTREAM KAFKA
======= ============== =====
Latency <1ms 1-5ms
Throughput 5M+ msg/sec 1M+ msg/sec
Memory footprint ~20MB ~1GB+
Dependencies None ZooKeeper/KRaft
Operations Simple Complex
Persistence JetStream Kafka Logs
Exactly-once Yes Yes
Consumer groups Yes Yes
Learning curve Low High
Install NATS
# Helm
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm upgrade --install nats nats/nats \
--namespace nats --create-namespace \
--set config.jetstream.enabled=true \
--set config.cluster.enabled=true \
--set config.cluster.replicas=3
Values for Production
# nats-values.yaml
config:
cluster:
enabled: true
replicas: 3
jetstream:
enabled: true
fileStore:
pvc:
size: 50Gi
storageClassName: gp3
memoryStore:
maxSize: 1Gi
# Monitoring
monitor:
enabled: true
port: 8222
natsbox:
enabled: true # Debug container
# Prometheus metrics
exporter:
enabled: true
serviceMonitor:
enabled: true
Core Concepts
STREAMS = Persistent message storage (like Kafka topics)
CONSUMERS = Read position trackers (like Kafka consumer groups)
SUBJECTS = Message routing (like Kafka topic partitions)
Producer ──▶ Subject ──▶ Stream ──▶ Consumer ──▶ App
│
└──▶ Stream ──▶ Consumer ──▶ Other App
Create Stream
# Using NATS CLI
nats stream add ORDERS \
--subjects "orders.*" \
--storage file \
--replicas 3 \
--retention limits \
--max-msgs-per-subject 1000000 \
--max-age 7d \
--max-bytes 10GB
# Or via YAML
nats stream add --config stream.json
{
"name": "ORDERS",
"subjects": ["orders.>"],
"retention": "limits",
"storage": "file",
"max_msgs": 10000000,
"max_bytes": 10737418240,
"max_age": 604800000000000,
"max_msg_size": 1048576,
"replicas": 3,
"discard": "old"
}
Create Consumer
# Durable consumer (survives restarts)
nats consumer add ORDERS order-processor \
--ack explicit \
--deliver all \
--max-deliver 5 \
--filter "orders.created" \
--pull
Go Producer
package main
import (
"encoding/json"
"log"
"time"
"github.com/nats-io/nats.go"
)
type Order struct {
ID string `json:"id"`
Customer string `json:"customer"`
Amount float64 `json:"amount"`
CreatedAt time.Time `json:"created_at"`
}
func main() {
nc, err := nats.Connect("nats://nats.nats:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
order := Order{
ID: "ord-123",
Customer: "cust-456",
Amount: 99.99,
CreatedAt: time.Now(),
}
data, _ := json.Marshal(order)
// Publish with acknowledgment
ack, err := js.Publish("orders.created", data)
if err != nil {
log.Fatal(err)
}
log.Printf("Published to stream %s, seq %d", ack.Stream, ack.Sequence)
}
Go Consumer
package main
import (
"encoding/json"
"log"
"github.com/nats-io/nats.go"
)
type Order struct {
ID string `json:"id"`
Customer string `json:"customer"`
Amount float64 `json:"amount"`
}
func main() {
nc, err := nats.Connect("nats://nats.nats:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
// Pull-based consumer
sub, err := js.PullSubscribe("orders.created", "order-processor")
if err != nil {
log.Fatal(err)
}
for {
msgs, err := sub.Fetch(10) // Batch of 10
if err != nil {
log.Println("Fetch error:", err)
continue
}
for _, msg := range msgs {
var order Order
json.Unmarshal(msg.Data, &order)
log.Printf("Processing order: %s, amount: %.2f", order.ID, order.Amount)
// Process order...
// Acknowledge
msg.Ack()
}
}
}
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-processor
spec:
replicas: 3
template:
spec:
containers:
- name: processor
image: order-processor:latest
env:
- name: NATS_URL
value: "nats://nats.nats:4222"
- name: NATS_STREAM
value: "ORDERS"
- name: NATS_CONSUMER
value: "order-processor"
Key-Value Store
JetStream includes a built-in KV store:
// Create bucket
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "config",
Replicas: 3,
TTL: 24 * time.Hour,
})
// Put
_, err = kv.Put("api.rate_limit", []byte("1000"))
// Get
entry, err := kv.Get("api.rate_limit")
log.Println(string(entry.Value()))
// Watch for changes
watcher, _ := kv.Watch("api.*")
for update := range watcher.Updates() {
if update != nil {
log.Printf("Key %s changed to %s", update.Key(), update.Value())
}
}
Object Store
Store large files:
// Create bucket
os, err := js.CreateObjectStore(&nats.ObjectStoreConfig{
Bucket: "files",
Replicas: 3,
})
// Put file
os.PutFile("report.pdf", "/path/to/report.pdf")
// Get file
os.GetFile("report.pdf", "/output/report.pdf")
Monitoring
# Prometheus rules
groups:
- name: nats
rules:
- alert: NATSHighLatency
expr: nats_server_route_latency_ms > 100
labels:
severity: warning
- alert: NATSSlowConsumers
expr: nats_server_slow_consumers > 0
labels:
severity: warning
- alert: NATSStreamFull
expr: nats_jetstream_stream_bytes / nats_jetstream_stream_max_bytes > 0.9
labels:
severity: warning
References
- NATS Docs: https://docs.nats.io
- JetStream: https://docs.nats.io/nats-concepts/jetstream
- Go Client: https://github.com/nats-io/nats.go