Introduction: Why Apache Airflow for Your SMB ETL
Data workflow automation has become a major challenge for SMBs in 2026. An ETL pipeline (Extract, Transform, Load) extracts data from multiple sources, transforms it according to your business needs, then loads it into a centralized system for analysis.
Apache Airflow stands out from expensive proprietary solutions like Talend or Informatica through its open-source approach and total flexibility. Originally developed by Airbnb, Airflow has become the reference for orchestrating complex data workflows.
Concrete use cases in SMBs are numerous: daily consolidation of CRM and ERP data, automatic synchronization between your billing system and analytics tool, or aggregation of multi-source sales data for reporting. If you manage multiple data sources and spend time doing manual exports, this tutorial is for you.
Technical Prerequisites and Development Environment
Before starting, you’ll need:
- Docker and Docker Compose installed on your machine or server
- Basic Python knowledge to write DAGs (Directed Acyclic Graphs)
- Access to the data sources you want to integrate: SQL databases, REST APIs, CSV files
- A server with at least 4 GB RAM and 2 CPUs to run Airflow comfortably
Installing and Configuring Apache Airflow with Docker
The simplest way to get started with Airflow is using Docker Compose. Let’s download the official configuration file:
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.1/docker-compose.yaml'Next, create the necessary folders to persist data:
mkdir -p ./dags ./logs ./plugins ./configecho -e "AIRFLOW_UID=$(id -u)" > .envThe docker-compose.yaml file already contains the essential configurations. Here are the important environment variables to know:
environment: AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__FERNET_KEY: '' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'false'AIRFLOW__CORE__LOAD_EXAMPLES to false to avoid loading example DAGs that clutter the interface. Initialize the database and launch the containers:
docker-compose up airflow-initdocker-compose up -dAccess the web interface at http://localhost:8080. The default credentials are airflow / airflow. Change them immediately in production via the CLI:
docker-compose run airflow-worker airflow users create \ --username admin \ --firstname Mohamed \ --lastname Boukri \ --role Admin \ --email contact@kodixar.comVerify that the scheduler and webserver are running correctly with docker-compose ps. You should see all services in “healthy” state.
Creating Your First DAG: Extracting Data from an API
An Airflow DAG is a Python file that defines a workflow. Create a file dags/api_extraction.py:
from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.models import Variableimport requestsimport json
default_args = { 'owner': 'kodixar', 'depends_on_past': False, 'start_date': datetime(2026, 2, 14), 'email_on_failure': True, 'email_on_retry': False, 'retries': 2, 'retry_delay': timedelta(minutes=5),}
dag = DAG( 'sales_api_extraction', default_args=default_args, description='Daily extraction of sales data', schedule_interval='0 6 * * *', # Every day at 6am catchup=False, tags=['etl', 'sales'],)
def extract_api_data(**context): """Extract data from sales API""" api_url = Variable.get("sales_api_url") api_key = Variable.get("sales_api_key")
headers = { 'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json' }
response = requests.get( f"{api_url}/sales/daily", headers=headers, params={'date': context['ds']} # DAG execution date )
response.raise_for_status() data = response.json()
# Store in XCom for subsequent tasks context['ti'].xcom_push(key='raw_data', value=data)
print(f"Extraction successful: {len(data)} records") return len(data)
extraction_task = PythonOperator( task_id='extract_data', python_callable=extract_api_data, dag=dag,)To manage credentials securely, use Airflow Variables. In the web interface, go to Admin > Variables and create:
sales_api_url: your API base URLsales_api_key: your API key
Activate the DAG from the interface and trigger a manual run to test. Check the logs to verify that extraction works correctly.
Transforming Data with Custom Python Tasks
Let’s now add a transformation task that cleans and aggregates the extracted data. Install pandas in the Airflow container by creating a requirements.txt file:
pandas==2.2.0Then modify docker-compose.yaml to install these dependencies (x-airflow-common section). Add this transformation function to your DAG:
import pandas as pd
def transform_data(**context): """Transform and aggregate sales data""" ti = context['ti'] raw_data = ti.xcom_pull(task_ids='extract_data', key='raw_data')
# Convert to pandas DataFrame df = pd.DataFrame(raw_data)
# Cleaning: remove duplicates and null values df = df.drop_duplicates(subset=['sale_id']) df = df.dropna(subset=['amount', 'region'])
# Transformation: calculate total amount with VAT df['total_amount'] = df['net_amount'] * 1.20
# Aggregation by region aggregation = df.groupby('region').agg({ 'total_amount': 'sum', 'sale_id': 'count' }).reset_index()
aggregation.columns = ['region', 'total_revenue', 'num_sales']
# Store for loading task transformed_data = aggregation.to_dict('records') ti.xcom_push(key='transformed_data', value=transformed_data)
print(f"Transformation successful: {len(aggregation)} regions") return len(aggregation)
transformation_task = PythonOperator( task_id='transform_data', python_callable=transform_data, dag=dag,)
# Define task dependencyextraction_task >> transformation_taskThe >> operator defines execution order: transformation only starts after successful extraction. If extraction fails, Airflow will automatically retry twice with 5-minute intervals (defined in default_args).
Loading Transformed Data into PostgreSQL
To store the transformed data, we’ll use PostgreSQL. First configure a connection in Airflow (Admin > Connections):
- Connection Id:
postgres_dwh - Connection Type: Postgres
- Host:
postgres(or your server address) - Schema:
datawarehouse - Login:
airflow - Password: your password
- Port:
5432
Create the destination table with PostgresOperator:
from airflow.providers.postgres.operators.postgres import PostgresOperatorfrom airflow.providers.postgres.hooks.postgres import PostgresHook
table_creation_task = PostgresOperator( task_id='create_sales_region_table', postgres_conn_id='postgres_dwh', sql=""" CREATE TABLE IF NOT EXISTS sales_by_region ( extraction_date DATE, region VARCHAR(100), total_revenue DECIMAL(12,2), num_sales INTEGER, PRIMARY KEY (extraction_date, region) ); """, dag=dag,)
def load_data_postgres(**context): """Load data into PostgreSQL""" ti = context['ti'] data = ti.xcom_pull(task_ids='transform_data', key='transformed_data') exec_date = context['ds']
hook = PostgresHook(postgres_conn_id='postgres_dwh') conn = hook.get_conn() cursor = conn.cursor()
# Delete existing data for this date (idempotence) cursor.execute( "DELETE FROM sales_by_region WHERE extraction_date = %s", (exec_date,) )
# Batch insert new data for row in data: cursor.execute( """ INSERT INTO sales_by_region (extraction_date, region, total_revenue, num_sales) VALUES (%s, %s, %s, %s) """, (exec_date, row['region'], row['total_revenue'], row['num_sales']) )
conn.commit() cursor.close()
print(f"Loading successful: {len(data)} rows inserted") return len(data)
loading_task = PythonOperator( task_id='load_data', python_callable=load_data_postgres, dag=dag,)
# Complete pipeline chainingextraction_task >> transformation_task >> table_creation_task >> loading_taskThe delete-before-insert approach guarantees idempotence: re-running the DAG for the same date will always produce the same result. This is an essential best practice in data engineering.
Verify data integrity by connecting to PostgreSQL:
docker-compose exec postgres psql -U airflow -d datawarehouseSELECT * FROM sales_by_region ORDER BY extraction_date DESC LIMIT 10;Scheduling, Monitoring and Production Best Practices
The schedule_interval defines execution frequency. Here are common formats:
| Expression | Meaning |
|---|---|
'@daily' | Every day at midnight |
'0 6 * * *' | Every day at 6am |
'0 */4 * * *' | Every 4 hours |
'0 0 * * 1' | Every Monday at midnight |
To receive email alerts on failure, configure SMTP in docker-compose.yaml:
AIRFLOW__SMTP__SMTP_HOST: smtp.gmail.comAIRFLOW__SMTP__SMTP_PORT: 587AIRFLOW__SMTP__SMTP_USER: your-email@gmail.comAIRFLOW__SMTP__SMTP_PASSWORD: your-app-passwordAIRFLOW__SMTP__SMTP_MAIL_FROM: airflow@kodixar.comThen add the email in default_args:
default_args = { 'email': ['contact@kodixar.com'], 'email_on_failure': True,}Airflow logs are accessible from the web interface (click “Log” on each task) and stored in the ./logs folder. Use them to debug and audit your executions.
Best practices to follow:
- Make your tasks idempotent: re-running a DAG should produce the same result
- Use backfill to catch up on missed executions:
airflow dags backfill -s 2026-02-01 -e 2026-02-10 sales_api_extraction - Optimize parallelism by configuring
max_active_runsandconcurrencyin the DAG - Avoid overly long tasks: break them into subtasks
For production deployment, you can use Coolify to orchestrate your Docker containers or a dedicated server with Docker Compose. Configure an Nginx reverse proxy to secure access to the web interface.
Conclusion and Next Steps to Industrialize Your Pipelines
We’ve seen how to set up a complete ETL pipeline with Apache Airflow: extraction from an API, transformation with pandas, and loading into PostgreSQL. This stack is perfectly suited to SMB needs for automating data workflows without investing in expensive proprietary solutions.
Possible evolutions are numerous. You can integrate dbt (data build tool) to manage your SQL transformations in a versioned and testable way. Connecting to visualization tools like Metabase or Superset then lets you exploit your consolidated data.
To go further, check the official Airflow documentation and join specialized forums. In a future article, we’ll explore how to orchestrate machine learning workflows with Airflow and MLflow.
Need help industrializing your data pipelines? At Kodixar, I help SMBs implement custom data engineering solutions.