Skip to content
Back to blog OpenTelemetry Collector Pipelines: Transform, Filter, Route Telemetry

OpenTelemetry Collector Pipelines: Transform, Filter, Route Telemetry

ObservabilitySRE

OpenTelemetry Collector Pipelines: Transform, Filter, Route

The OpenTelemetry Collector is the Swiss Army knife of telemetry. It receives, processes, and exports traces, metrics, and logs. This guide covers building production pipelines.

TL;DR

  • Collector = vendor-agnostic telemetry pipeline
  • Receivers = ingest data (OTLP, Prometheus, etc.)
  • Processors = transform, filter, batch, sample
  • Exporters = send to backends (Prometheus, Jaeger, etc.)
  • Connectors = route between pipelines

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    OpenTelemetry Collector                       │
│                                                                  │
│  ┌──────────────┐   ┌──────────────┐   ┌──────────────────────┐ │
│  │  Receivers   │──▶│  Processors  │──▶│     Exporters        │ │
│  │              │   │              │   │                      │ │
│  │ - OTLP       │   │ - batch      │   │ - otlp (Tempo)       │ │
│  │ - prometheus │   │ - filter     │   │ - prometheus         │ │
│  │ - filelog    │   │ - transform  │   │ - loki               │ │
│  │ - jaeger     │   │ - tail_sample│   │ - datadog            │ │
│  └──────────────┘   └──────────────┘   └──────────────────────┘ │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Basic Configuration

# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  batch:
    timeout: 5s
    send_batch_size: 1000

exporters:
  otlp:
    endpoint: tempo.monitoring:4317
    tls:
      insecure: true

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch]
      exporters: [otlp]

Metrics Pipeline

Prometheus + Remote Write

receivers:
  prometheus:
    config:
      scrape_configs:
        - job_name: kubernetes-pods
          kubernetes_sd_configs:
            - role: pod
          relabel_configs:
            - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
              action: keep
              regex: true
            - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
              action: replace
              target_label: __metrics_path__
              regex: (.+)
            - source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
              action: replace
              regex: ([^:]+)(?::\d+)?;(\d+)
              replacement: $$1:$$2
              target_label: __address__

processors:
  batch:
    timeout: 10s
  
  # Add cluster label
  resource:
    attributes:
      - key: cluster
        value: production
        action: upsert
  
  # Filter out high-cardinality metrics
  filter:
    metrics:
      exclude:
        match_type: regexp
        metric_names:
          - ".*_bucket"  # Exclude histogram buckets
          - "go_.*"      # Exclude Go runtime metrics

exporters:
  prometheusremotewrite:
    endpoint: https://prometheus.company.com/api/v1/write
    headers:
      Authorization: Bearer ${PROM_TOKEN}

service:
  pipelines:
    metrics:
      receivers: [prometheus]
      processors: [batch, resource, filter]
      exporters: [prometheusremotewrite]

Traces Pipeline

Tail Sampling

Sample traces intelligently based on content:

receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317

processors:
  batch:
    timeout: 5s
  
  # Memory limiter to prevent OOM
  memory_limiter:
    check_interval: 1s
    limit_mib: 1000
    spike_limit_mib: 200
  
  # Tail-based sampling
  tail_sampling:
    decision_wait: 10s
    num_traces: 100000
    expected_new_traces_per_sec: 1000
    policies:
      # Always sample errors
      - name: errors
        type: status_code
        status_code:
          status_codes: [ERROR]
      
      # Always sample slow traces
      - name: slow-traces
        type: latency
        latency:
          threshold_ms: 1000
      
      # Sample 10% of everything else
      - name: probabilistic
        type: probabilistic
        probabilistic:
          sampling_percentage: 10
      
      # Always sample specific operations
      - name: important-operations
        type: string_attribute
        string_attribute:
          key: http.route
          values:
            - /api/payments
            - /api/checkout
          enabled_regex_matching: false

exporters:
  otlp:
    endpoint: tempo.monitoring:4317
    tls:
      insecure: true

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [memory_limiter, tail_sampling, batch]
      exporters: [otlp]

Logs Pipeline

File to Loki

receivers:
  filelog:
    include:
      - /var/log/pods/*/*/*.log
    include_file_path: true
    operators:
      # Parse container runtime format
      - type: regex_parser
        regex: '^(?P<time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
        timestamp:
          parse_from: attributes.time
          layout: '%Y-%m-%dT%H:%M:%S.%LZ'
      
      # Parse JSON logs
      - type: json_parser
        parse_from: attributes.log
        if: 'attributes.log matches "^\\{"'
      
      # Extract Kubernetes metadata
      - type: regex_parser
        regex: '^/var/log/pods/(?P<namespace>[^_]+)_(?P<pod>[^_]+)_[^/]+/(?P<container>[^/]+)/'
        parse_from: attributes["log.file.path"]

processors:
  batch:
    timeout: 5s
  
  # Add resource attributes
  resource:
    attributes:
      - key: service.name
        from_attribute: container
        action: upsert
      - key: k8s.namespace.name
        from_attribute: namespace
        action: upsert
  
  # Filter out noisy logs
  filter:
    logs:
      exclude:
        match_type: regexp
        bodies:
          - ".*health.*check.*"
          - ".*readiness.*probe.*"

exporters:
  loki:
    endpoint: http://loki.monitoring:3100/loki/api/v1/push
    labels:
      resource:
        service.name: service
        k8s.namespace.name: namespace
      attributes:
        level: level

service:
  pipelines:
    logs:
      receivers: [filelog]
      processors: [batch, resource, filter]
      exporters: [loki]

Multi-Destination Routing

receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317

processors:
  batch:
    timeout: 5s
  
  # Route by attribute
  routing:
    from_attribute: tenant
    default_exporters: [otlp/default]
    table:
      - value: tenant-a
        exporters: [otlp/tenant-a]
      - value: tenant-b
        exporters: [otlp/tenant-b]

exporters:
  otlp/default:
    endpoint: tempo-default.monitoring:4317
  otlp/tenant-a:
    endpoint: tempo-a.tenant-a:4317
  otlp/tenant-b:
    endpoint: tempo-b.tenant-b:4317

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch, routing]
      exporters: [otlp/default, otlp/tenant-a, otlp/tenant-b]

Transform Processor

processors:
  transform:
    trace_statements:
      - context: span
        statements:
          # Rename attribute
          - set(attributes["http.method"], attributes["http.request.method"])
          - delete_key(attributes, "http.request.method")
          
          # Truncate long values
          - truncate_all(attributes, 256)
          
          # Hash sensitive data
          - set(attributes["user.id"], SHA256(attributes["user.id"]))
          
          # Add derived attribute
          - set(attributes["is_error"], status.code == STATUS_CODE_ERROR)
    
    metric_statements:
      - context: datapoint
        statements:
          # Convert units
          - set(attributes["duration_seconds"], attributes["duration_ms"] / 1000.0)
    
    log_statements:
      - context: log
        statements:
          # Parse severity
          - set(severity_number, SEVERITY_NUMBER_ERROR) where IsMatch(body, "(?i)error")
          - set(severity_number, SEVERITY_NUMBER_WARN) where IsMatch(body, "(?i)warn")

Kubernetes Deployment

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: otel-collector-agent
  namespace: monitoring
spec:
  selector:
    matchLabels:
      app: otel-collector-agent
  template:
    metadata:
      labels:
        app: otel-collector-agent
    spec:
      serviceAccountName: otel-collector
      containers:
        - name: collector
          image: otel/opentelemetry-collector-contrib:0.91.0
          args:
            - --config=/etc/otel/config.yaml
          ports:
            - containerPort: 4317
              hostPort: 4317
            - containerPort: 4318
              hostPort: 4318
          volumeMounts:
            - name: config
              mountPath: /etc/otel
            - name: varlog
              mountPath: /var/log
              readOnly: true
          resources:
            requests:
              cpu: 100m
              memory: 256Mi
            limits:
              cpu: 500m
              memory: 512Mi
      volumes:
        - name: config
          configMap:
            name: otel-collector-config
        - name: varlog
          hostPath:
            path: /var/log

Gateway Pattern

# Agent (DaemonSet) -> Gateway (Deployment) -> Backends

# Agent config
exporters:
  otlp:
    endpoint: otel-gateway.monitoring:4317

# Gateway config
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317

processors:
  batch:
    timeout: 10s
    send_batch_size: 10000
  
  tail_sampling:
    # Sampling config here

exporters:
  otlp/tempo:
    endpoint: tempo.monitoring:4317
  prometheusremotewrite:
    endpoint: https://prometheus.company.com/api/v1/write
  loki:
    endpoint: http://loki.monitoring:3100/loki/api/v1/push

References

======================================== OpenTelemetry Collector + Pipelines

Receive. Transform. Export. Observe.

Found this helpful?

Comments