Skip to main content
Data Analysis February 14, 2026 10 min read

Tutorial: Building an ETL Pipeline with Apache Airflow for SMBs

Learn how to create a robust ETL pipeline with Apache Airflow to automate your data workflows in SMBs without breaking your tech budget.

M
Mohamed Boukri

Introduction: Why Apache Airflow for Your SMB ETL

Data workflow automation has become a major challenge for SMBs in 2026. An ETL pipeline (Extract, Transform, Load) extracts data from multiple sources, transforms it according to your business needs, then loads it into a centralized system for analysis.

Apache Airflow stands out from expensive proprietary solutions like Talend or Informatica through its open-source approach and total flexibility. Originally developed by Airbnb, Airflow has become the reference for orchestrating complex data workflows.

Concrete use cases in SMBs are numerous: daily consolidation of CRM and ERP data, automatic synchronization between your billing system and analytics tool, or aggregation of multi-source sales data for reporting. If you manage multiple data sources and spend time doing manual exports, this tutorial is for you.

Key Takeaway
Apache Airflow lets you automate your data pipelines with simple Python code, without expensive licenses or vendor lock-in.

Technical Prerequisites and Development Environment

Before starting, you’ll need:

  • Docker and Docker Compose installed on your machine or server
  • Basic Python knowledge to write DAGs (Directed Acyclic Graphs)
  • Access to the data sources you want to integrate: SQL databases, REST APIs, CSV files
  • A server with at least 4 GB RAM and 2 CPUs to run Airflow comfortably
If you’re new to Docker, check the official documentation to install it on your operating system.

Installing and Configuring Apache Airflow with Docker

The simplest way to get started with Airflow is using Docker Compose. Let’s download the official configuration file:

Terminal window
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.1/docker-compose.yaml'

Next, create the necessary folders to persist data:

Terminal window
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env

The docker-compose.yaml file already contains the essential configurations. Here are the important environment variables to know:

environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
Set AIRFLOW__CORE__LOAD_EXAMPLES to false to avoid loading example DAGs that clutter the interface.

Initialize the database and launch the containers:

Terminal window
docker-compose up airflow-init
docker-compose up -d

Access the web interface at http://localhost:8080. The default credentials are airflow / airflow. Change them immediately in production via the CLI:

Terminal window
docker-compose run airflow-worker airflow users create \
--username admin \
--firstname Mohamed \
--lastname Boukri \
--role Admin \
--email contact@kodixar.com

Verify that the scheduler and webserver are running correctly with docker-compose ps. You should see all services in “healthy” state.

Creating Your First DAG: Extracting Data from an API

An Airflow DAG is a Python file that defines a workflow. Create a file dags/api_extraction.py:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
import requests
import json
default_args = {
'owner': 'kodixar',
'depends_on_past': False,
'start_date': datetime(2026, 2, 14),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'sales_api_extraction',
default_args=default_args,
description='Daily extraction of sales data',
schedule_interval='0 6 * * *', # Every day at 6am
catchup=False,
tags=['etl', 'sales'],
)
def extract_api_data(**context):
"""Extract data from sales API"""
api_url = Variable.get("sales_api_url")
api_key = Variable.get("sales_api_key")
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
response = requests.get(
f"{api_url}/sales/daily",
headers=headers,
params={'date': context['ds']} # DAG execution date
)
response.raise_for_status()
data = response.json()
# Store in XCom for subsequent tasks
context['ti'].xcom_push(key='raw_data', value=data)
print(f"Extraction successful: {len(data)} records")
return len(data)
extraction_task = PythonOperator(
task_id='extract_data',
python_callable=extract_api_data,
dag=dag,
)

To manage credentials securely, use Airflow Variables. In the web interface, go to Admin > Variables and create:

  • sales_api_url: your API base URL
  • sales_api_key: your API key
Never hardcode API keys in source code. Always use Airflow Variables or Connections.

Activate the DAG from the interface and trigger a manual run to test. Check the logs to verify that extraction works correctly.

Transforming Data with Custom Python Tasks

Let’s now add a transformation task that cleans and aggregates the extracted data. Install pandas in the Airflow container by creating a requirements.txt file:

pandas==2.2.0

Then modify docker-compose.yaml to install these dependencies (x-airflow-common section). Add this transformation function to your DAG:

import pandas as pd
def transform_data(**context):
"""Transform and aggregate sales data"""
ti = context['ti']
raw_data = ti.xcom_pull(task_ids='extract_data', key='raw_data')
# Convert to pandas DataFrame
df = pd.DataFrame(raw_data)
# Cleaning: remove duplicates and null values
df = df.drop_duplicates(subset=['sale_id'])
df = df.dropna(subset=['amount', 'region'])
# Transformation: calculate total amount with VAT
df['total_amount'] = df['net_amount'] * 1.20
# Aggregation by region
aggregation = df.groupby('region').agg({
'total_amount': 'sum',
'sale_id': 'count'
}).reset_index()
aggregation.columns = ['region', 'total_revenue', 'num_sales']
# Store for loading task
transformed_data = aggregation.to_dict('records')
ti.xcom_push(key='transformed_data', value=transformed_data)
print(f"Transformation successful: {len(aggregation)} regions")
return len(aggregation)
transformation_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
dag=dag,
)
# Define task dependency
extraction_task >> transformation_task

The >> operator defines execution order: transformation only starts after successful extraction. If extraction fails, Airflow will automatically retry twice with 5-minute intervals (defined in default_args).

Use pandas to manipulate your data in memory. For very large volumes (>1 GB), prefer PySpark or direct SQL transformations.

Loading Transformed Data into PostgreSQL

To store the transformed data, we’ll use PostgreSQL. First configure a connection in Airflow (Admin > Connections):

  • Connection Id: postgres_dwh
  • Connection Type: Postgres
  • Host: postgres (or your server address)
  • Schema: datawarehouse
  • Login: airflow
  • Password: your password
  • Port: 5432

Create the destination table with PostgresOperator:

from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
table_creation_task = PostgresOperator(
task_id='create_sales_region_table',
postgres_conn_id='postgres_dwh',
sql="""
CREATE TABLE IF NOT EXISTS sales_by_region (
extraction_date DATE,
region VARCHAR(100),
total_revenue DECIMAL(12,2),
num_sales INTEGER,
PRIMARY KEY (extraction_date, region)
);
""",
dag=dag,
)
def load_data_postgres(**context):
"""Load data into PostgreSQL"""
ti = context['ti']
data = ti.xcom_pull(task_ids='transform_data', key='transformed_data')
exec_date = context['ds']
hook = PostgresHook(postgres_conn_id='postgres_dwh')
conn = hook.get_conn()
cursor = conn.cursor()
# Delete existing data for this date (idempotence)
cursor.execute(
"DELETE FROM sales_by_region WHERE extraction_date = %s",
(exec_date,)
)
# Batch insert new data
for row in data:
cursor.execute(
"""
INSERT INTO sales_by_region (extraction_date, region, total_revenue, num_sales)
VALUES (%s, %s, %s, %s)
""",
(exec_date, row['region'], row['total_revenue'], row['num_sales'])
)
conn.commit()
cursor.close()
print(f"Loading successful: {len(data)} rows inserted")
return len(data)
loading_task = PythonOperator(
task_id='load_data',
python_callable=load_data_postgres,
dag=dag,
)
# Complete pipeline chaining
extraction_task >> transformation_task >> table_creation_task >> loading_task

The delete-before-insert approach guarantees idempotence: re-running the DAG for the same date will always produce the same result. This is an essential best practice in data engineering.

Verify data integrity by connecting to PostgreSQL:

Terminal window
docker-compose exec postgres psql -U airflow -d datawarehouse
SELECT * FROM sales_by_region ORDER BY extraction_date DESC LIMIT 10;

Scheduling, Monitoring and Production Best Practices

The schedule_interval defines execution frequency. Here are common formats:

ExpressionMeaning
'@daily'Every day at midnight
'0 6 * * *'Every day at 6am
'0 */4 * * *'Every 4 hours
'0 0 * * 1'Every Monday at midnight

To receive email alerts on failure, configure SMTP in docker-compose.yaml:

AIRFLOW__SMTP__SMTP_HOST: smtp.gmail.com
AIRFLOW__SMTP__SMTP_PORT: 587
AIRFLOW__SMTP__SMTP_USER: your-email@gmail.com
AIRFLOW__SMTP__SMTP_PASSWORD: your-app-password
AIRFLOW__SMTP__SMTP_MAIL_FROM: airflow@kodixar.com

Then add the email in default_args:

default_args = {
'email': ['contact@kodixar.com'],
'email_on_failure': True,
}

Airflow logs are accessible from the web interface (click “Log” on each task) and stored in the ./logs folder. Use them to debug and audit your executions.

In production, enable RBAC authentication and change all default passwords.

Best practices to follow:

  • Make your tasks idempotent: re-running a DAG should produce the same result
  • Use backfill to catch up on missed executions: airflow dags backfill -s 2026-02-01 -e 2026-02-10 sales_api_extraction
  • Optimize parallelism by configuring max_active_runs and concurrency in the DAG
  • Avoid overly long tasks: break them into subtasks

For production deployment, you can use Coolify to orchestrate your Docker containers or a dedicated server with Docker Compose. Configure an Nginx reverse proxy to secure access to the web interface.

Conclusion and Next Steps to Industrialize Your Pipelines

We’ve seen how to set up a complete ETL pipeline with Apache Airflow: extraction from an API, transformation with pandas, and loading into PostgreSQL. This stack is perfectly suited to SMB needs for automating data workflows without investing in expensive proprietary solutions.

Possible evolutions are numerous. You can integrate dbt (data build tool) to manage your SQL transformations in a versioned and testable way. Connecting to visualization tools like Metabase or Superset then lets you exploit your consolidated data.

To go further, check the official Airflow documentation and join specialized forums. In a future article, we’ll explore how to orchestrate machine learning workflows with Airflow and MLflow.

Need help industrializing your data pipelines? At Kodixar, I help SMBs implement custom data engineering solutions.

Available for new projects

Need help with this topic?

Contact us to discuss your project and see how we can help.

Free quote
No commitment
24h response