Skip to content
Back to blog Build an ETL Pipeline with Python, PostgreSQL, and Airflow

Build an ETL Pipeline with Python, PostgreSQL, and Airflow

BackendDevOps

Build an ETL Pipeline with Python, PostgreSQL, and Airflow

ETL pipelines are the backbone of data engineering. Extract data from a source, transform it into something useful, load it into a destination. Simple concept, but the implementation details matter.

This guide walks through building a real ETL pipeline: pulling weather data from OpenWeatherMap, transforming it with pandas, and loading it into PostgreSQL. Then we add Airflow for scheduling and email notifications for monitoring.

TL;DR

  • Extract weather data from OpenWeatherMap API
  • Transform with pandas (cleaning, type conversion, enrichment)
  • Load into PostgreSQL
  • Orchestrate with Airflow on a schedule
  • Email notifications on success/failure
  • Everything runs in Docker

Architecture

┌─────────────────┐
│ OpenWeatherMap  │
│      API        │
└────────┬────────┘
         │ Extract

┌─────────────────┐
│     Python      │
│   (pandas)      │
│   Transform     │
└────────┬────────┘
         │ Load

┌─────────────────┐
│   PostgreSQL    │
│    Database     │
└─────────────────┘


┌─────────────────┐
│    Airflow      │
│  (scheduling)   │
└─────────────────┘


┌─────────────────┐
│     Email       │
│ Notifications   │
└─────────────────┘

Stack

  • Python - ETL logic
  • pandas - Data transformation
  • PostgreSQL - Data warehouse
  • Docker - Containerization
  • Airflow - Workflow orchestration
  • OpenWeatherMap - Data source (free tier)

Project Structure

etl-pipeline/
├── docker-compose.yml
├── Dockerfile
├── Makefile
├── requirements.txt
├── .env
├── etl/
│   ├── __init__.py
│   ├── extract.py
│   ├── transform.py
│   ├── load.py
│   └── pipeline.py
├── dags/
│   └── weather_etl_dag.py
└── sql/
    └── init.sql

The ETL Code

Extract: Fetch Weather Data

# etl/extract.py
import requests
import os
from typing import Dict, Any

def extract_weather(city: str = "London") -> Dict[str, Any]:
    """Extract current weather data from OpenWeatherMap API."""
    
    api_key = os.getenv("OPENWEATHERMAP_API_KEY")
    if not api_key:
        raise ValueError("OPENWEATHERMAP_API_KEY environment variable not set")
    
    url = f"https://api.openweathermap.org/data/2.5/weather"
    params = {
        "q": city,
        "appid": api_key,
        "units": "metric"
    }
    
    response = requests.get(url, params=params)
    response.raise_for_status()
    
    return response.json()

Transform: Clean and Enrich

# etl/transform.py
import pandas as pd
from datetime import datetime
from typing import Dict, Any

def transform_weather(raw_data: Dict[str, Any]) -> pd.DataFrame:
    """Transform raw weather data into a clean DataFrame."""
    
    # Extract relevant fields
    transformed = {
        "city": raw_data["name"],
        "country": raw_data["sys"]["country"],
        "temperature": raw_data["main"]["temp"],
        "feels_like": raw_data["main"]["feels_like"],
        "humidity": raw_data["main"]["humidity"],
        "pressure": raw_data["main"]["pressure"],
        "wind_speed": raw_data["wind"]["speed"],
        "weather_main": raw_data["weather"][0]["main"],
        "weather_description": raw_data["weather"][0]["description"],
        "clouds": raw_data["clouds"]["all"],
        "visibility": raw_data.get("visibility", None),
        "sunrise": datetime.fromtimestamp(raw_data["sys"]["sunrise"]),
        "sunset": datetime.fromtimestamp(raw_data["sys"]["sunset"]),
        "timestamp": datetime.now(),
        "raw_json": str(raw_data)
    }
    
    df = pd.DataFrame([transformed])
    
    # Data quality checks
    df["temperature"] = pd.to_numeric(df["temperature"], errors="coerce")
    df["humidity"] = pd.to_numeric(df["humidity"], errors="coerce")
    
    # Enrichment: Add temperature category
    df["temp_category"] = df["temperature"].apply(categorize_temp)
    
    return df

def categorize_temp(temp: float) -> str:
    """Categorize temperature into human-readable buckets."""
    if temp < 0:
        return "freezing"
    elif temp < 10:
        return "cold"
    elif temp < 20:
        return "mild"
    elif temp < 30:
        return "warm"
    else:
        return "hot"

Load: Insert into PostgreSQL

# etl/load.py
import pandas as pd
from sqlalchemy import create_engine
import os

def load_weather(df: pd.DataFrame) -> int:
    """Load transformed weather data into PostgreSQL."""
    
    db_url = os.getenv(
        "DATABASE_URL",
        "postgresql://etluser:etlpass@postgres-db:5432/weatherdb"
    )
    
    engine = create_engine(db_url)
    
    # Append to existing table
    rows = df.to_sql(
        "weather",
        engine,
        if_exists="append",
        index=False,
        method="multi"
    )
    
    return len(df)

Pipeline: Tie It Together

# etl/pipeline.py
from etl.extract import extract_weather
from etl.transform import transform_weather
from etl.load import load_weather
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def run_pipeline(cities: list = None) -> dict:
    """Run the full ETL pipeline for given cities."""
    
    if cities is None:
        cities = ["London", "New York", "Tokyo", "Sydney", "Dubai"]
    
    results = {"success": 0, "failed": 0, "errors": []}
    
    for city in cities:
        try:
            logger.info(f"Processing {city}...")
            
            # Extract
            raw_data = extract_weather(city)
            logger.info(f"Extracted data for {city}")
            
            # Transform
            df = transform_weather(raw_data)
            logger.info(f"Transformed data: {len(df)} rows")
            
            # Load
            rows_loaded = load_weather(df)
            logger.info(f"Loaded {rows_loaded} rows for {city}")
            
            results["success"] += 1
            
        except Exception as e:
            logger.error(f"Failed to process {city}: {e}")
            results["failed"] += 1
            results["errors"].append({"city": city, "error": str(e)})
    
    return results

if __name__ == "__main__":
    run_pipeline()

Database Schema

-- sql/init.sql
CREATE TABLE IF NOT EXISTS weather (
    id SERIAL PRIMARY KEY,
    city VARCHAR(100) NOT NULL,
    country VARCHAR(10),
    temperature DECIMAL(5,2),
    feels_like DECIMAL(5,2),
    humidity INTEGER,
    pressure INTEGER,
    wind_speed DECIMAL(5,2),
    weather_main VARCHAR(50),
    weather_description VARCHAR(200),
    clouds INTEGER,
    visibility INTEGER,
    sunrise TIMESTAMP,
    sunset TIMESTAMP,
    temp_category VARCHAR(20),
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    raw_json TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_weather_city ON weather(city);
CREATE INDEX idx_weather_timestamp ON weather(timestamp);

Docker Configuration

Docker Compose

# docker-compose.yml
version: '3.8'

services:
  postgres-db:
    image: postgres:15-alpine
    container_name: etl-postgres
    environment:
      POSTGRES_USER: etluser
      POSTGRES_PASSWORD: etlpass
      POSTGRES_DB: weatherdb
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql
    ports:
      - "5432:5432"
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U etluser -d weatherdb"]
      interval: 10s
      retries: 5

  etl:
    build: .
    container_name: etl-pipeline
    environment:
      - OPENWEATHERMAP_API_KEY=${OPENWEATHERMAP_API_KEY}
      - DATABASE_URL=postgresql://etluser:etlpass@postgres-db:5432/weatherdb
    depends_on:
      postgres-db:
        condition: service_healthy
    command: python -m etl.pipeline

  # Airflow services
  airflow-webserver:
    image: apache/airflow:2.7.3
    container_name: airflow-webserver
    environment:
      - AIRFLOW__CORE__EXECUTOR=LocalExecutor
      - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://etluser:etlpass@postgres-db:5432/weatherdb
      - AIRFLOW__CORE__FERNET_KEY=${FERNET_KEY}
      - AIRFLOW__WEBSERVER__SECRET_KEY=${SECRET_KEY}
      - AIRFLOW__SMTP__SMTP_HOST=smtp.gmail.com
      - AIRFLOW__SMTP__SMTP_PORT=587
      - AIRFLOW__SMTP__SMTP_USER=${SMTP_USER}
      - AIRFLOW__SMTP__SMTP_PASSWORD=${SMTP_PASSWORD}
      - AIRFLOW__SMTP__SMTP_MAIL_FROM=${SMTP_USER}
      - OPENWEATHERMAP_API_KEY=${OPENWEATHERMAP_API_KEY}
    volumes:
      - ./dags:/opt/airflow/dags
      - ./etl:/opt/airflow/etl
      - airflow_logs:/opt/airflow/logs
    ports:
      - "8080:8080"
    depends_on:
      postgres-db:
        condition: service_healthy
    command: webserver

  airflow-scheduler:
    image: apache/airflow:2.7.3
    container_name: airflow-scheduler
    environment:
      - AIRFLOW__CORE__EXECUTOR=LocalExecutor
      - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://etluser:etlpass@postgres-db:5432/weatherdb
      - AIRFLOW__CORE__FERNET_KEY=${FERNET_KEY}
      - OPENWEATHERMAP_API_KEY=${OPENWEATHERMAP_API_KEY}
    volumes:
      - ./dags:/opt/airflow/dags
      - ./etl:/opt/airflow/etl
      - airflow_logs:/opt/airflow/logs
    depends_on:
      - airflow-webserver
    command: scheduler

volumes:
  postgres_data:
  airflow_logs:

Dockerfile

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "-m", "etl.pipeline"]

Requirements

# requirements.txt
pandas==2.1.0
requests==2.31.0
sqlalchemy==2.0.21
psycopg2-binary==2.9.9
python-dotenv==1.0.0

Airflow DAG

# dags/weather_etl_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
import sys
sys.path.insert(0, '/opt/airflow')

from etl.pipeline import run_pipeline

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email': ['alerts@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'weather_etl',
    default_args=default_args,
    description='ETL pipeline for weather data',
    schedule_interval='0 */6 * * *',  # Every 6 hours
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'weather'],
) as dag:
    
    def run_etl(**context):
        """Execute the ETL pipeline."""
        cities = ["London", "New York", "Tokyo", "Sydney", "Dubai", "Paris"]
        results = run_pipeline(cities)
        
        if results["failed"] > 0:
            raise Exception(f"Pipeline failed for {results['failed']} cities: {results['errors']}")
        
        return results
    
    etl_task = PythonOperator(
        task_id='run_weather_etl',
        python_callable=run_etl,
        provide_context=True,
    )
    
    success_email = EmailOperator(
        task_id='send_success_email',
        to='alerts@example.com',
        subject='Weather ETL Pipeline - Success',
        html_content="""
        <h3>Weather ETL Pipeline Completed Successfully</h3>
        <p>The weather data ETL pipeline has completed successfully.</p>
        <p>Execution Time: {{ execution_date }}</p>
        """,
        trigger_rule=TriggerRule.ALL_SUCCESS,
    )
    
    failure_email = EmailOperator(
        task_id='send_failure_email',
        to='alerts@example.com',
        subject='Weather ETL Pipeline - FAILED',
        html_content="""
        <h3>Weather ETL Pipeline Failed</h3>
        <p>The weather data ETL pipeline has failed.</p>
        <p>Execution Time: {{ execution_date }}</p>
        <p>Please check the Airflow logs for details.</p>
        """,
        trigger_rule=TriggerRule.ONE_FAILED,
    )
    
    etl_task >> [success_email, failure_email]

Setup and Running

1. Get an API Key

Sign up at OpenWeatherMap and get a free API key.

2. Configure Environment

# .env
OPENWEATHERMAP_API_KEY=your_api_key_here
FERNET_KEY=your_fernet_key
SECRET_KEY=your_secret_key
SMTP_USER=your_email@gmail.com
SMTP_PASSWORD=your_app_password

Generate security keys:

# Fernet key for Airflow
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"

# Secret key
python -c "import secrets; print(secrets.token_urlsafe(32))"

3. Start the Stack

# Build and start
docker-compose up --build

# Run in background
docker-compose up -d

4. Initialize Airflow

# Initialize the database
docker-compose run --rm airflow-webserver airflow db init

# Create admin user
docker-compose exec airflow-webserver airflow users create \
  --username admin \
  --firstname Admin \
  --lastname User \
  --role Admin \
  --email admin@example.com \
  --password admin

5. Access UIs


Verify the Data

# Connect to PostgreSQL
docker exec -it etl-postgres psql -U etluser -d weatherdb

# Or use make
make psql
-- Check the data
SELECT * FROM weather;

-- Count by city
SELECT city, COUNT(*) as records 
FROM weather 
GROUP BY city;

-- Latest readings
SELECT city, temperature, humidity, timestamp 
FROM weather 
ORDER BY timestamp DESC 
LIMIT 10;

-- Temperature trends
SELECT 
  city,
  DATE(timestamp) as date,
  AVG(temperature) as avg_temp,
  MIN(temperature) as min_temp,
  MAX(temperature) as max_temp
FROM weather
GROUP BY city, DATE(timestamp)
ORDER BY date DESC;

Email Notifications Setup

For Gmail SMTP, create an App Password:

  1. Go to Google App Passwords
  2. Generate a new app password for “Mail”
  3. Use this password in SMTP_PASSWORD

Note: Don’t use your regular Gmail password. App passwords are required for SMTP access.


Makefile

# Makefile
.PHONY: up down logs psql test

up:
	docker-compose up --build -d

down:
	docker-compose down

logs:
	docker-compose logs -f

psql:
	docker exec -it etl-postgres psql -U etluser -d weatherdb

test:
	docker-compose run --rm etl python -m pytest tests/

restart-airflow:
	docker-compose restart airflow-webserver airflow-scheduler

Extending the Pipeline

Add More Data Sources

# etl/extract.py

def extract_forecast(city: str, days: int = 5) -> Dict[str, Any]:
    """Extract weather forecast data."""
    api_key = os.getenv("OPENWEATHERMAP_API_KEY")
    url = f"https://api.openweathermap.org/data/2.5/forecast"
    params = {"q": city, "appid": api_key, "units": "metric", "cnt": days * 8}
    
    response = requests.get(url, params=params)
    response.raise_for_status()
    return response.json()

Add Data Quality Checks

# etl/validate.py

def validate_weather_data(df: pd.DataFrame) -> bool:
    """Validate transformed weather data."""
    checks = [
        df["temperature"].between(-50, 60).all(),
        df["humidity"].between(0, 100).all(),
        df["city"].notna().all(),
        len(df) > 0,
    ]
    
    return all(checks)

Add Incremental Loading

# etl/load.py

def load_weather_incremental(df: pd.DataFrame) -> int:
    """Load only new records (upsert)."""
    engine = create_engine(os.getenv("DATABASE_URL"))
    
    # Check for existing records
    existing = pd.read_sql(
        "SELECT city, timestamp FROM weather WHERE timestamp > NOW() - INTERVAL '1 hour'",
        engine
    )
    
    # Filter out duplicates
    df_new = df[~df["city"].isin(existing["city"])]
    
    if len(df_new) > 0:
        df_new.to_sql("weather", engine, if_exists="append", index=False)
    
    return len(df_new)

Best Practices

  1. Idempotency - Pipeline can run multiple times without duplicating data
  2. Logging - Log every step for debugging
  3. Error Handling - Graceful failures with meaningful messages
  4. Monitoring - Email alerts on failures, Airflow task monitoring
  5. Testing - Unit tests for transform functions
  6. Secrets Management - Never commit API keys or passwords

Troubleshooting

Airflow Database Issues

# Reset the database
docker-compose run --rm airflow-webserver airflow db reset

# Re-initialize
docker-compose run --rm airflow-webserver airflow db init

Connection Refused

# Check if PostgreSQL is running
docker-compose ps
docker-compose logs postgres-db

Email Not Sending

  • Verify App Password is correct
  • Check spam folder
  • Review Airflow logs: docker-compose logs airflow-webserver

Resources


Repository

Full source code: github.com/moabukar/etl-pipeline


Data pipelines don’t have to be complicated. Extract, transform, load, schedule, alert. That’s it. Happy engineering.

Found this helpful?

Comments