Skip to content
Back to blog NATS JetStream: Lightweight Alternative to Kafka

NATS JetStream: Lightweight Alternative to Kafka

BackendK8s

NATS JetStream: Lightweight Alternative to Kafka

Kafka is powerful but complex. NATS JetStream provides similar persistence and streaming capabilities with 10x simpler ops. Single binary, no ZooKeeper, no JVM.

TL;DR

  • NATS = ultra-fast messaging (5M+ msg/sec)
  • JetStream = persistent streaming (Kafka-like)
  • Single binary, ~20MB memory footprint
  • Exactly-once delivery, replay, consumer groups
  • Kubernetes operator included

NATS vs Kafka

FEATURE                 NATS JETSTREAM      KAFKA
=======                 ==============      =====
Latency                 <1ms                1-5ms
Throughput              5M+ msg/sec         1M+ msg/sec
Memory footprint        ~20MB               ~1GB+
Dependencies            None                ZooKeeper/KRaft
Operations              Simple              Complex
Persistence             JetStream           Kafka Logs
Exactly-once            Yes                 Yes
Consumer groups         Yes                 Yes
Learning curve          Low                 High

Install NATS

# Helm
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm upgrade --install nats nats/nats \
  --namespace nats --create-namespace \
  --set config.jetstream.enabled=true \
  --set config.cluster.enabled=true \
  --set config.cluster.replicas=3

Values for Production

# nats-values.yaml
config:
  cluster:
    enabled: true
    replicas: 3
  
  jetstream:
    enabled: true
    fileStore:
      pvc:
        size: 50Gi
        storageClassName: gp3
    memoryStore:
      maxSize: 1Gi
  
  # Monitoring
  monitor:
    enabled: true
    port: 8222

natsbox:
  enabled: true  # Debug container

# Prometheus metrics
exporter:
  enabled: true
  serviceMonitor:
    enabled: true

Core Concepts

STREAMS     = Persistent message storage (like Kafka topics)
CONSUMERS   = Read position trackers (like Kafka consumer groups)
SUBJECTS    = Message routing (like Kafka topic partitions)
Producer ──▶ Subject ──▶ Stream ──▶ Consumer ──▶ App

                  └──▶ Stream ──▶ Consumer ──▶ Other App

Create Stream

# Using NATS CLI
nats stream add ORDERS \
  --subjects "orders.*" \
  --storage file \
  --replicas 3 \
  --retention limits \
  --max-msgs-per-subject 1000000 \
  --max-age 7d \
  --max-bytes 10GB

# Or via YAML
nats stream add --config stream.json
{
  "name": "ORDERS",
  "subjects": ["orders.>"],
  "retention": "limits",
  "storage": "file",
  "max_msgs": 10000000,
  "max_bytes": 10737418240,
  "max_age": 604800000000000,
  "max_msg_size": 1048576,
  "replicas": 3,
  "discard": "old"
}

Create Consumer

# Durable consumer (survives restarts)
nats consumer add ORDERS order-processor \
  --ack explicit \
  --deliver all \
  --max-deliver 5 \
  --filter "orders.created" \
  --pull

Go Producer

package main

import (
    "encoding/json"
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

type Order struct {
    ID        string    `json:"id"`
    Customer  string    `json:"customer"`
    Amount    float64   `json:"amount"`
    CreatedAt time.Time `json:"created_at"`
}

func main() {
    nc, err := nats.Connect("nats://nats.nats:4222")
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    js, err := nc.JetStream()
    if err != nil {
        log.Fatal(err)
    }

    order := Order{
        ID:        "ord-123",
        Customer:  "cust-456",
        Amount:    99.99,
        CreatedAt: time.Now(),
    }

    data, _ := json.Marshal(order)

    // Publish with acknowledgment
    ack, err := js.Publish("orders.created", data)
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Published to stream %s, seq %d", ack.Stream, ack.Sequence)
}

Go Consumer

package main

import (
    "encoding/json"
    "log"

    "github.com/nats-io/nats.go"
)

type Order struct {
    ID        string  `json:"id"`
    Customer  string  `json:"customer"`
    Amount    float64 `json:"amount"`
}

func main() {
    nc, err := nats.Connect("nats://nats.nats:4222")
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    js, err := nc.JetStream()
    if err != nil {
        log.Fatal(err)
    }

    // Pull-based consumer
    sub, err := js.PullSubscribe("orders.created", "order-processor")
    if err != nil {
        log.Fatal(err)
    }

    for {
        msgs, err := sub.Fetch(10) // Batch of 10
        if err != nil {
            log.Println("Fetch error:", err)
            continue
        }

        for _, msg := range msgs {
            var order Order
            json.Unmarshal(msg.Data, &order)

            log.Printf("Processing order: %s, amount: %.2f", order.ID, order.Amount)

            // Process order...

            // Acknowledge
            msg.Ack()
        }
    }
}

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: order-processor
spec:
  replicas: 3
  template:
    spec:
      containers:
        - name: processor
          image: order-processor:latest
          env:
            - name: NATS_URL
              value: "nats://nats.nats:4222"
            - name: NATS_STREAM
              value: "ORDERS"
            - name: NATS_CONSUMER
              value: "order-processor"

Key-Value Store

JetStream includes a built-in KV store:

// Create bucket
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
    Bucket:   "config",
    Replicas: 3,
    TTL:      24 * time.Hour,
})

// Put
_, err = kv.Put("api.rate_limit", []byte("1000"))

// Get
entry, err := kv.Get("api.rate_limit")
log.Println(string(entry.Value()))

// Watch for changes
watcher, _ := kv.Watch("api.*")
for update := range watcher.Updates() {
    if update != nil {
        log.Printf("Key %s changed to %s", update.Key(), update.Value())
    }
}

Object Store

Store large files:

// Create bucket
os, err := js.CreateObjectStore(&nats.ObjectStoreConfig{
    Bucket:   "files",
    Replicas: 3,
})

// Put file
os.PutFile("report.pdf", "/path/to/report.pdf")

// Get file
os.GetFile("report.pdf", "/output/report.pdf")

Monitoring

# Prometheus rules
groups:
  - name: nats
    rules:
      - alert: NATSHighLatency
        expr: nats_server_route_latency_ms > 100
        labels:
          severity: warning

      - alert: NATSSlowConsumers
        expr: nats_server_slow_consumers > 0
        labels:
          severity: warning

      - alert: NATSStreamFull
        expr: nats_jetstream_stream_bytes / nats_jetstream_stream_max_bytes > 0.9
        labels:
          severity: warning

References

======================================== NATS JetStream + Kubernetes

Simple messaging. Persistent streaming.

Found this helpful?

Comments