Build an ETL Pipeline with Python, PostgreSQL, and Airflow
ETL pipelines are the backbone of data engineering. Extract data from a source, transform it into something useful, load it into a destination. Simple concept, but the implementation details matter.
This guide walks through building a real ETL pipeline: pulling weather data from OpenWeatherMap, transforming it with pandas, and loading it into PostgreSQL. Then we add Airflow for scheduling and email notifications for monitoring.
TL;DR
- Extract weather data from OpenWeatherMap API
- Transform with pandas (cleaning, type conversion, enrichment)
- Load into PostgreSQL
- Orchestrate with Airflow on a schedule
- Email notifications on success/failure
- Everything runs in Docker
Architecture
┌─────────────────┐
│ OpenWeatherMap │
│ API │
└────────┬────────┘
│ Extract
▼
┌─────────────────┐
│ Python │
│ (pandas) │
│ Transform │
└────────┬────────┘
│ Load
▼
┌─────────────────┐
│ PostgreSQL │
│ Database │
└─────────────────┘
│
▼
┌─────────────────┐
│ Airflow │
│ (scheduling) │
└─────────────────┘
│
▼
┌─────────────────┐
│ Email │
│ Notifications │
└─────────────────┘
Stack
- Python - ETL logic
- pandas - Data transformation
- PostgreSQL - Data warehouse
- Docker - Containerization
- Airflow - Workflow orchestration
- OpenWeatherMap - Data source (free tier)
Project Structure
etl-pipeline/
├── docker-compose.yml
├── Dockerfile
├── Makefile
├── requirements.txt
├── .env
├── etl/
│ ├── __init__.py
│ ├── extract.py
│ ├── transform.py
│ ├── load.py
│ └── pipeline.py
├── dags/
│ └── weather_etl_dag.py
└── sql/
└── init.sql
The ETL Code
Extract: Fetch Weather Data
# etl/extract.py
import requests
import os
from typing import Dict, Any
def extract_weather(city: str = "London") -> Dict[str, Any]:
"""Extract current weather data from OpenWeatherMap API."""
api_key = os.getenv("OPENWEATHERMAP_API_KEY")
if not api_key:
raise ValueError("OPENWEATHERMAP_API_KEY environment variable not set")
url = f"https://api.openweathermap.org/data/2.5/weather"
params = {
"q": city,
"appid": api_key,
"units": "metric"
}
response = requests.get(url, params=params)
response.raise_for_status()
return response.json()
Transform: Clean and Enrich
# etl/transform.py
import pandas as pd
from datetime import datetime
from typing import Dict, Any
def transform_weather(raw_data: Dict[str, Any]) -> pd.DataFrame:
"""Transform raw weather data into a clean DataFrame."""
# Extract relevant fields
transformed = {
"city": raw_data["name"],
"country": raw_data["sys"]["country"],
"temperature": raw_data["main"]["temp"],
"feels_like": raw_data["main"]["feels_like"],
"humidity": raw_data["main"]["humidity"],
"pressure": raw_data["main"]["pressure"],
"wind_speed": raw_data["wind"]["speed"],
"weather_main": raw_data["weather"][0]["main"],
"weather_description": raw_data["weather"][0]["description"],
"clouds": raw_data["clouds"]["all"],
"visibility": raw_data.get("visibility", None),
"sunrise": datetime.fromtimestamp(raw_data["sys"]["sunrise"]),
"sunset": datetime.fromtimestamp(raw_data["sys"]["sunset"]),
"timestamp": datetime.now(),
"raw_json": str(raw_data)
}
df = pd.DataFrame([transformed])
# Data quality checks
df["temperature"] = pd.to_numeric(df["temperature"], errors="coerce")
df["humidity"] = pd.to_numeric(df["humidity"], errors="coerce")
# Enrichment: Add temperature category
df["temp_category"] = df["temperature"].apply(categorize_temp)
return df
def categorize_temp(temp: float) -> str:
"""Categorize temperature into human-readable buckets."""
if temp < 0:
return "freezing"
elif temp < 10:
return "cold"
elif temp < 20:
return "mild"
elif temp < 30:
return "warm"
else:
return "hot"
Load: Insert into PostgreSQL
# etl/load.py
import pandas as pd
from sqlalchemy import create_engine
import os
def load_weather(df: pd.DataFrame) -> int:
"""Load transformed weather data into PostgreSQL."""
db_url = os.getenv(
"DATABASE_URL",
"postgresql://etluser:etlpass@postgres-db:5432/weatherdb"
)
engine = create_engine(db_url)
# Append to existing table
rows = df.to_sql(
"weather",
engine,
if_exists="append",
index=False,
method="multi"
)
return len(df)
Pipeline: Tie It Together
# etl/pipeline.py
from etl.extract import extract_weather
from etl.transform import transform_weather
from etl.load import load_weather
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def run_pipeline(cities: list = None) -> dict:
"""Run the full ETL pipeline for given cities."""
if cities is None:
cities = ["London", "New York", "Tokyo", "Sydney", "Dubai"]
results = {"success": 0, "failed": 0, "errors": []}
for city in cities:
try:
logger.info(f"Processing {city}...")
# Extract
raw_data = extract_weather(city)
logger.info(f"Extracted data for {city}")
# Transform
df = transform_weather(raw_data)
logger.info(f"Transformed data: {len(df)} rows")
# Load
rows_loaded = load_weather(df)
logger.info(f"Loaded {rows_loaded} rows for {city}")
results["success"] += 1
except Exception as e:
logger.error(f"Failed to process {city}: {e}")
results["failed"] += 1
results["errors"].append({"city": city, "error": str(e)})
return results
if __name__ == "__main__":
run_pipeline()
Database Schema
-- sql/init.sql
CREATE TABLE IF NOT EXISTS weather (
id SERIAL PRIMARY KEY,
city VARCHAR(100) NOT NULL,
country VARCHAR(10),
temperature DECIMAL(5,2),
feels_like DECIMAL(5,2),
humidity INTEGER,
pressure INTEGER,
wind_speed DECIMAL(5,2),
weather_main VARCHAR(50),
weather_description VARCHAR(200),
clouds INTEGER,
visibility INTEGER,
sunrise TIMESTAMP,
sunset TIMESTAMP,
temp_category VARCHAR(20),
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
raw_json TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_weather_city ON weather(city);
CREATE INDEX idx_weather_timestamp ON weather(timestamp);
Docker Configuration
Docker Compose
# docker-compose.yml
version: '3.8'
services:
postgres-db:
image: postgres:15-alpine
container_name: etl-postgres
environment:
POSTGRES_USER: etluser
POSTGRES_PASSWORD: etlpass
POSTGRES_DB: weatherdb
volumes:
- postgres_data:/var/lib/postgresql/data
- ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U etluser -d weatherdb"]
interval: 10s
retries: 5
etl:
build: .
container_name: etl-pipeline
environment:
- OPENWEATHERMAP_API_KEY=${OPENWEATHERMAP_API_KEY}
- DATABASE_URL=postgresql://etluser:etlpass@postgres-db:5432/weatherdb
depends_on:
postgres-db:
condition: service_healthy
command: python -m etl.pipeline
# Airflow services
airflow-webserver:
image: apache/airflow:2.7.3
container_name: airflow-webserver
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://etluser:etlpass@postgres-db:5432/weatherdb
- AIRFLOW__CORE__FERNET_KEY=${FERNET_KEY}
- AIRFLOW__WEBSERVER__SECRET_KEY=${SECRET_KEY}
- AIRFLOW__SMTP__SMTP_HOST=smtp.gmail.com
- AIRFLOW__SMTP__SMTP_PORT=587
- AIRFLOW__SMTP__SMTP_USER=${SMTP_USER}
- AIRFLOW__SMTP__SMTP_PASSWORD=${SMTP_PASSWORD}
- AIRFLOW__SMTP__SMTP_MAIL_FROM=${SMTP_USER}
- OPENWEATHERMAP_API_KEY=${OPENWEATHERMAP_API_KEY}
volumes:
- ./dags:/opt/airflow/dags
- ./etl:/opt/airflow/etl
- airflow_logs:/opt/airflow/logs
ports:
- "8080:8080"
depends_on:
postgres-db:
condition: service_healthy
command: webserver
airflow-scheduler:
image: apache/airflow:2.7.3
container_name: airflow-scheduler
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://etluser:etlpass@postgres-db:5432/weatherdb
- AIRFLOW__CORE__FERNET_KEY=${FERNET_KEY}
- OPENWEATHERMAP_API_KEY=${OPENWEATHERMAP_API_KEY}
volumes:
- ./dags:/opt/airflow/dags
- ./etl:/opt/airflow/etl
- airflow_logs:/opt/airflow/logs
depends_on:
- airflow-webserver
command: scheduler
volumes:
postgres_data:
airflow_logs:
Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "-m", "etl.pipeline"]
Requirements
# requirements.txt
pandas==2.1.0
requests==2.31.0
sqlalchemy==2.0.21
psycopg2-binary==2.9.9
python-dotenv==1.0.0
Airflow DAG
# dags/weather_etl_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
import sys
sys.path.insert(0, '/opt/airflow')
from etl.pipeline import run_pipeline
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['alerts@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'weather_etl',
default_args=default_args,
description='ETL pipeline for weather data',
schedule_interval='0 */6 * * *', # Every 6 hours
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'weather'],
) as dag:
def run_etl(**context):
"""Execute the ETL pipeline."""
cities = ["London", "New York", "Tokyo", "Sydney", "Dubai", "Paris"]
results = run_pipeline(cities)
if results["failed"] > 0:
raise Exception(f"Pipeline failed for {results['failed']} cities: {results['errors']}")
return results
etl_task = PythonOperator(
task_id='run_weather_etl',
python_callable=run_etl,
provide_context=True,
)
success_email = EmailOperator(
task_id='send_success_email',
to='alerts@example.com',
subject='Weather ETL Pipeline - Success',
html_content="""
<h3>Weather ETL Pipeline Completed Successfully</h3>
<p>The weather data ETL pipeline has completed successfully.</p>
<p>Execution Time: {{ execution_date }}</p>
""",
trigger_rule=TriggerRule.ALL_SUCCESS,
)
failure_email = EmailOperator(
task_id='send_failure_email',
to='alerts@example.com',
subject='Weather ETL Pipeline - FAILED',
html_content="""
<h3>Weather ETL Pipeline Failed</h3>
<p>The weather data ETL pipeline has failed.</p>
<p>Execution Time: {{ execution_date }}</p>
<p>Please check the Airflow logs for details.</p>
""",
trigger_rule=TriggerRule.ONE_FAILED,
)
etl_task >> [success_email, failure_email]
Setup and Running
1. Get an API Key
Sign up at OpenWeatherMap and get a free API key.
2. Configure Environment
# .env
OPENWEATHERMAP_API_KEY=your_api_key_here
FERNET_KEY=your_fernet_key
SECRET_KEY=your_secret_key
SMTP_USER=your_email@gmail.com
SMTP_PASSWORD=your_app_password
Generate security keys:
# Fernet key for Airflow
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
# Secret key
python -c "import secrets; print(secrets.token_urlsafe(32))"
3. Start the Stack
# Build and start
docker-compose up --build
# Run in background
docker-compose up -d
4. Initialize Airflow
# Initialize the database
docker-compose run --rm airflow-webserver airflow db init
# Create admin user
docker-compose exec airflow-webserver airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password admin
5. Access UIs
- Airflow: http://localhost:8080 (admin/admin)
- PostgreSQL: localhost:5432
Verify the Data
# Connect to PostgreSQL
docker exec -it etl-postgres psql -U etluser -d weatherdb
# Or use make
make psql
-- Check the data
SELECT * FROM weather;
-- Count by city
SELECT city, COUNT(*) as records
FROM weather
GROUP BY city;
-- Latest readings
SELECT city, temperature, humidity, timestamp
FROM weather
ORDER BY timestamp DESC
LIMIT 10;
-- Temperature trends
SELECT
city,
DATE(timestamp) as date,
AVG(temperature) as avg_temp,
MIN(temperature) as min_temp,
MAX(temperature) as max_temp
FROM weather
GROUP BY city, DATE(timestamp)
ORDER BY date DESC;
Email Notifications Setup
For Gmail SMTP, create an App Password:
- Go to Google App Passwords
- Generate a new app password for “Mail”
- Use this password in
SMTP_PASSWORD
Note: Don’t use your regular Gmail password. App passwords are required for SMTP access.
Makefile
# Makefile
.PHONY: up down logs psql test
up:
docker-compose up --build -d
down:
docker-compose down
logs:
docker-compose logs -f
psql:
docker exec -it etl-postgres psql -U etluser -d weatherdb
test:
docker-compose run --rm etl python -m pytest tests/
restart-airflow:
docker-compose restart airflow-webserver airflow-scheduler
Extending the Pipeline
Add More Data Sources
# etl/extract.py
def extract_forecast(city: str, days: int = 5) -> Dict[str, Any]:
"""Extract weather forecast data."""
api_key = os.getenv("OPENWEATHERMAP_API_KEY")
url = f"https://api.openweathermap.org/data/2.5/forecast"
params = {"q": city, "appid": api_key, "units": "metric", "cnt": days * 8}
response = requests.get(url, params=params)
response.raise_for_status()
return response.json()
Add Data Quality Checks
# etl/validate.py
def validate_weather_data(df: pd.DataFrame) -> bool:
"""Validate transformed weather data."""
checks = [
df["temperature"].between(-50, 60).all(),
df["humidity"].between(0, 100).all(),
df["city"].notna().all(),
len(df) > 0,
]
return all(checks)
Add Incremental Loading
# etl/load.py
def load_weather_incremental(df: pd.DataFrame) -> int:
"""Load only new records (upsert)."""
engine = create_engine(os.getenv("DATABASE_URL"))
# Check for existing records
existing = pd.read_sql(
"SELECT city, timestamp FROM weather WHERE timestamp > NOW() - INTERVAL '1 hour'",
engine
)
# Filter out duplicates
df_new = df[~df["city"].isin(existing["city"])]
if len(df_new) > 0:
df_new.to_sql("weather", engine, if_exists="append", index=False)
return len(df_new)
Best Practices
- Idempotency - Pipeline can run multiple times without duplicating data
- Logging - Log every step for debugging
- Error Handling - Graceful failures with meaningful messages
- Monitoring - Email alerts on failures, Airflow task monitoring
- Testing - Unit tests for transform functions
- Secrets Management - Never commit API keys or passwords
Troubleshooting
Airflow Database Issues
# Reset the database
docker-compose run --rm airflow-webserver airflow db reset
# Re-initialize
docker-compose run --rm airflow-webserver airflow db init
Connection Refused
# Check if PostgreSQL is running
docker-compose ps
docker-compose logs postgres-db
Email Not Sending
- Verify App Password is correct
- Check spam folder
- Review Airflow logs:
docker-compose logs airflow-webserver
Resources
Repository
Full source code: github.com/moabukar/etl-pipeline
Data pipelines don’t have to be complicated. Extract, transform, load, schedule, alert. That’s it. Happy engineering.