Building ETL Pipelines: A Practical Guide
ETL Pipeline Fundamentals
ETL (Extract, Transform, Load) pipelines move data from source systems to data warehouses.
Batch vs Streaming
Batch Processing
Process data at scheduled intervals. Simpler, cheaper, and easier to test:
def extract_orders():
return pd.read_sql("SELECT * FROM orders WHERE date = CURRENT_DATE", conn)
def transform_orders(df):
df['total'] = df['quantity'] * df['price']
return df
def load_orders(df):
df.to_sql('daily_orders', warehouse_conn, if_exists='append')
Stream Processing
Process data as it arrives with millisecond latency. Use for real-time dashboards, fraud detection, and event-driven architectures.
Orchestration with Airflow
with DAG('orders_etl', schedule_interval='0 6 * * *'):
extract = PythonOperator(task_id='extract', python_callable=extract_orders)
transform = PythonOperator(task_id='transform', python_callable=transform_orders)
load = PythonOperator(task_id='load', python_callable=load_orders)
extract >> transform >> load
Transformation with dbt
-- models/staging/stg_orders.sql
with source as (
select * from {{ source('ecommerce', 'orders') }}
)
select id as order_id, customer_id, amount from source
Data Quality
data_quality_checks:
- check: row_count > 1000
severity: critical
- check: max_timestamp >= yesterday
severity: warning
Conclusion
Choose batch for simplicity and streaming for real-time needs. Use Airflow for orchestration and dbt for transformations. Implement data quality checks at every stage. Design for idempotency and incremental loads.