OpenTelemetry Collector Pipelines: Transform, Filter, Route
The OpenTelemetry Collector is the Swiss Army knife of telemetry. It receives, processes, and exports traces, metrics, and logs. This guide covers building production pipelines.
TL;DR
- Collector = vendor-agnostic telemetry pipeline
- Receivers = ingest data (OTLP, Prometheus, etc.)
- Processors = transform, filter, batch, sample
- Exporters = send to backends (Prometheus, Jaeger, etc.)
- Connectors = route between pipelines
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ OpenTelemetry Collector │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ Receivers │──▶│ Processors │──▶│ Exporters │ │
│ │ │ │ │ │ │ │
│ │ - OTLP │ │ - batch │ │ - otlp (Tempo) │ │
│ │ - prometheus │ │ - filter │ │ - prometheus │ │
│ │ - filelog │ │ - transform │ │ - loki │ │
│ │ - jaeger │ │ - tail_sample│ │ - datadog │ │
│ └──────────────┘ └──────────────┘ └──────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Basic Configuration
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
timeout: 5s
send_batch_size: 1000
exporters:
otlp:
endpoint: tempo.monitoring:4317
tls:
insecure: true
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
Metrics Pipeline
Prometheus + Remote Write
receivers:
prometheus:
config:
scrape_configs:
- job_name: kubernetes-pods
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
action: replace
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $$1:$$2
target_label: __address__
processors:
batch:
timeout: 10s
# Add cluster label
resource:
attributes:
- key: cluster
value: production
action: upsert
# Filter out high-cardinality metrics
filter:
metrics:
exclude:
match_type: regexp
metric_names:
- ".*_bucket" # Exclude histogram buckets
- "go_.*" # Exclude Go runtime metrics
exporters:
prometheusremotewrite:
endpoint: https://prometheus.company.com/api/v1/write
headers:
Authorization: Bearer ${PROM_TOKEN}
service:
pipelines:
metrics:
receivers: [prometheus]
processors: [batch, resource, filter]
exporters: [prometheusremotewrite]
Traces Pipeline
Tail Sampling
Sample traces intelligently based on content:
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
processors:
batch:
timeout: 5s
# Memory limiter to prevent OOM
memory_limiter:
check_interval: 1s
limit_mib: 1000
spike_limit_mib: 200
# Tail-based sampling
tail_sampling:
decision_wait: 10s
num_traces: 100000
expected_new_traces_per_sec: 1000
policies:
# Always sample errors
- name: errors
type: status_code
status_code:
status_codes: [ERROR]
# Always sample slow traces
- name: slow-traces
type: latency
latency:
threshold_ms: 1000
# Sample 10% of everything else
- name: probabilistic
type: probabilistic
probabilistic:
sampling_percentage: 10
# Always sample specific operations
- name: important-operations
type: string_attribute
string_attribute:
key: http.route
values:
- /api/payments
- /api/checkout
enabled_regex_matching: false
exporters:
otlp:
endpoint: tempo.monitoring:4317
tls:
insecure: true
service:
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, tail_sampling, batch]
exporters: [otlp]
Logs Pipeline
File to Loki
receivers:
filelog:
include:
- /var/log/pods/*/*/*.log
include_file_path: true
operators:
# Parse container runtime format
- type: regex_parser
regex: '^(?P<time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
timestamp:
parse_from: attributes.time
layout: '%Y-%m-%dT%H:%M:%S.%LZ'
# Parse JSON logs
- type: json_parser
parse_from: attributes.log
if: 'attributes.log matches "^\\{"'
# Extract Kubernetes metadata
- type: regex_parser
regex: '^/var/log/pods/(?P<namespace>[^_]+)_(?P<pod>[^_]+)_[^/]+/(?P<container>[^/]+)/'
parse_from: attributes["log.file.path"]
processors:
batch:
timeout: 5s
# Add resource attributes
resource:
attributes:
- key: service.name
from_attribute: container
action: upsert
- key: k8s.namespace.name
from_attribute: namespace
action: upsert
# Filter out noisy logs
filter:
logs:
exclude:
match_type: regexp
bodies:
- ".*health.*check.*"
- ".*readiness.*probe.*"
exporters:
loki:
endpoint: http://loki.monitoring:3100/loki/api/v1/push
labels:
resource:
service.name: service
k8s.namespace.name: namespace
attributes:
level: level
service:
pipelines:
logs:
receivers: [filelog]
processors: [batch, resource, filter]
exporters: [loki]
Multi-Destination Routing
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
processors:
batch:
timeout: 5s
# Route by attribute
routing:
from_attribute: tenant
default_exporters: [otlp/default]
table:
- value: tenant-a
exporters: [otlp/tenant-a]
- value: tenant-b
exporters: [otlp/tenant-b]
exporters:
otlp/default:
endpoint: tempo-default.monitoring:4317
otlp/tenant-a:
endpoint: tempo-a.tenant-a:4317
otlp/tenant-b:
endpoint: tempo-b.tenant-b:4317
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch, routing]
exporters: [otlp/default, otlp/tenant-a, otlp/tenant-b]
Transform Processor
processors:
transform:
trace_statements:
- context: span
statements:
# Rename attribute
- set(attributes["http.method"], attributes["http.request.method"])
- delete_key(attributes, "http.request.method")
# Truncate long values
- truncate_all(attributes, 256)
# Hash sensitive data
- set(attributes["user.id"], SHA256(attributes["user.id"]))
# Add derived attribute
- set(attributes["is_error"], status.code == STATUS_CODE_ERROR)
metric_statements:
- context: datapoint
statements:
# Convert units
- set(attributes["duration_seconds"], attributes["duration_ms"] / 1000.0)
log_statements:
- context: log
statements:
# Parse severity
- set(severity_number, SEVERITY_NUMBER_ERROR) where IsMatch(body, "(?i)error")
- set(severity_number, SEVERITY_NUMBER_WARN) where IsMatch(body, "(?i)warn")
Kubernetes Deployment
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: otel-collector-agent
namespace: monitoring
spec:
selector:
matchLabels:
app: otel-collector-agent
template:
metadata:
labels:
app: otel-collector-agent
spec:
serviceAccountName: otel-collector
containers:
- name: collector
image: otel/opentelemetry-collector-contrib:0.91.0
args:
- --config=/etc/otel/config.yaml
ports:
- containerPort: 4317
hostPort: 4317
- containerPort: 4318
hostPort: 4318
volumeMounts:
- name: config
mountPath: /etc/otel
- name: varlog
mountPath: /var/log
readOnly: true
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 500m
memory: 512Mi
volumes:
- name: config
configMap:
name: otel-collector-config
- name: varlog
hostPath:
path: /var/log
Gateway Pattern
# Agent (DaemonSet) -> Gateway (Deployment) -> Backends
# Agent config
exporters:
otlp:
endpoint: otel-gateway.monitoring:4317
# Gateway config
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
processors:
batch:
timeout: 10s
send_batch_size: 10000
tail_sampling:
# Sampling config here
exporters:
otlp/tempo:
endpoint: tempo.monitoring:4317
prometheusremotewrite:
endpoint: https://prometheus.company.com/api/v1/write
loki:
endpoint: http://loki.monitoring:3100/loki/api/v1/push
References
- OTel Collector Docs: https://opentelemetry.io/docs/collector
- Contrib Receivers: https://github.com/open-telemetry/opentelemetry-collector-contrib
- Configuration: https://opentelemetry.io/docs/collector/configuration