Skip to main content

Building ETL Data Pipeline for ML Training with Airflow

In real-world machine learning workflows, preparing data and training models is rarely a one-time manual process. It's iterative, messy, and benefits immensely from automation. In this post, I’ll walk you through a complete Airflow pipeline that prepares a COCO-format dataset for training and kicks off a PyTorch-based YOLOv5 training job — all locally.

We'll build a clean ETL-style pipeline:

  1. Extract raw images and annotations.
  2. Transform them: resize images, update bounding boxes.
  3. Split into training and validation sets.
  4. Train a YOLOv5 object detection model using PyTorch.

Project Structure

Here's the expected directory layout:

/raw_data/
images/
1.jpg
2.jpg
annotations.json

After processing, images will be split and labeled like this:

/processed_data/
images/
1.jpg
2.jpg
annotations.json
train/
images/1.jpg
labels/1.txt
val/
images/
labels/
yolov5_data.yaml

Airflow DAG: ml_data_preparation_dag.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import os
import shutil
import json
from PIL import Image

RAW_IMAGES_DIR = "/raw_data/images"
RAW_ANNOTATIONS_PATH = "/raw_data/annotations.json"
WORKING_IMAGES_DIR = "/tmp/xxx_data/images"
WORKING_ANNOTATIONS_PATH = "/tmp/xxx_data/annotations.json"
PROCESSED_DIR = "/processed_data"
PROCESSED_IMAGES_DIR = os.path.join(PROCESSED_DIR, "images")
PROCESSED_ANNOTATIONS_PATH = os.path.join(PROCESSED_DIR, "annotations.json")
TARGET_SIZE = (416, 416)

TRAIN_DIR = os.path.join(PROCESSED_DIR, "train")
VAL_DIR = os.path.join(PROCESSED_DIR, "val")
TRAIN_IMAGES_DIR = os.path.join(TRAIN_DIR, "images")
TRAIN_LABELS_DIR = os.path.join(TRAIN_DIR, "labels")
VAL_IMAGES_DIR = os.path.join(VAL_DIR, "images")
VAL_LABELS_DIR = os.path.join(VAL_DIR, "labels")

DATA_YOLO_CONFIG = os.path.join(PROCESSED_DIR, "yolov5_data.yaml")

# Get the new data
def extract_data():
os.makedirs(WORKING_DIR, exist_ok=True)
os.makedirs(PROCESSED_IMAGES_DIR, exist_ok=True)
# Assume that I get data from local RAW_IMAGES_DIR, every time we can get new data then update it into PROCESSED_DIR
shutil.copy(RAW_IMAGES_DIR, WORKING_IMAGES_DIR)
shutil.copy(RAW_ANNOTATIONS_PATH, WORKING_ANNOTATIONS_PATH)

# Resize data
def transform_data():
# Assume: the new added image filename not conflict
with open(WORKING_ANNOTATIONS_PATH, 'r') as f:
coco = json.load(f)

image_id_map = {}
for img in coco['images']:
img_path = os.path.join(WORKING_IMAGES_DIR, img['file_name'])
new_img_path = os.path.join(PROCESSED_IMAGES_DIR, img['file_name'])

with Image.open(img_path) as im:
orig_width, orig_height = im.size
resized = im.resize(TARGET_SIZE)
resized.save(new_img_path)

scale_x = TARGET_SIZE[0] / orig_width
scale_y = TARGET_SIZE[1] / orig_height

img['width'], img['height'] = TARGET_SIZE
image_id_map[img['id']] = (scale_x, scale_y)

for ann in coco['annotations']:
scale_x, scale_y = image_id_map[ann['image_id']]
x, y, w, h = ann['bbox']
ann['bbox'] = [
round(x * scale_x, 2),
round(y * scale_y, 2),
round(w * scale_x, 2),
round(h * scale_y, 2)
]

# Update the annotation
with open(PROCESSED_ANNOTATIONS_PATH, 'a') as f:
old_coco = json.load(f)
old_coco.update(coco)
json.dump(old_coco, f)

# Add a `yolov5` compatible data
def load_data():
images_dir = PROCESSED_IMAGES_DIR
ann_path = PROCESSED_ANNOTATIONS_PATH

os.makedirs(TRAIN_IMAGES_DIR, exist_ok=True)
os.makedirs(TRAIN_LABELS_DIR, exist_ok=True)
os.makedirs(VAL_IMAGES_DIR, exist_ok=True)
os.makedirs(VAL_LABELS_DIR, exist_ok=True)

with open(ann_path) as f:
coco = json.load(f)

# Step 1: Build image ID to filename map
img_id_to_filename = {img['id']: img['file_name'] for img in coco['images']}

# Step 2: Extract classes dynamically
categories = {cat['id']: cat['name'] for cat in coco['categories']}
class_name_to_id = {v: i for i, v in enumerate(sorted(set(categories.values())))}
class_id_map = {cat_id: class_name_to_id[name] for cat_id, name in categories.items()}
class_names = list(class_name_to_id.keys())

# Step 3: Build label files in YOLO format
labels = {img_id: [] for img_id in img_id_to_filename}
for ann in coco['annotations']:
image_id = ann['image_id']
category_id = ann['category_id']
x, y, w, h = ann['bbox']
img = next(i for i in coco['images'] if i['id'] == image_id)
img_w, img_h = img['width'], img['height']
x_center = (x + w / 2) / img_w
y_center = (y + h / 2) / img_h
w /= img_w
h /= img_h
class_id = class_id_map[category_id]
labels[image_id].append(f"{class_id} {x_center:.6f} {y_center:.6f} {w:.6f} {h:.6f}")

# Step 4: Split train/val
image_ids = list(img_id_to_filename.keys())
random.shuffle(image_ids)
split = int(0.8 * len(image_ids))
train_ids, val_ids = image_ids[:split], image_ids[split:]

for subset, subset_ids in [('train', train_ids), ('val', val_ids)]:
for img_id in subset_ids:
filename = img_id_to_filename[img_id]
shutil.copy(os.path.join(PROCESSED_IMAGES_DIR, filename), os.path.join(PROCESSED_DIR, subset, "images", filename))
label_path = os.path.join(PROCESSED_DIR, subset, "labels", f"{Path(filename).stem}.txt")
with open(label_path, 'w') as f:
f.write('\n'.join(labels[img_id]))

# Step 5: Write data.yaml
data_yaml = {
'train': TRAIN_IMAGES_DIR,
'val': VAL_IMAGES_DIR,
'nc': len(class_names),
'names': class_names
}
with open(DATA_YOLO_CONFIG, 'w') as f:
yaml.dump(data_yaml, f)

YOLOv5 Training with Python API

After transformation, we split the dataset and kick off training with YOLOv5 (using the official PyTorch implementation). This step:

  • Extracts class names from COCO.
  • Generates YOLO .txt label files.
  • Splits data 80/20.
  • Trains a model using YOLOv5’s Python API.
PROCESSED_DIR = "/processed_data"
DATA_YOLO_CONFIG = os.path.join(PROCESSED_DIR, "yolov5_data.yaml")
TARGET_SIZE = (416, 416)

def train_model():
import os
import json
import shutil
import random
import yaml
from yolov5 import train

train.run(
imgsz=416,
batch_size=8,
epochs=5,
data=DATA_YOLO_CONFIG,
weights='yolov5s.pt',
name='custom_yolov5_model',
project=os.path.join(PROCESSED_DIR, "runs"),
exist_ok=True
)

Airflow DAG Pipeline

Plug all three steps into Airflow:

with DAG(
dag_id="ml_data_preparation_dag",
schedule_interval=None,
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["ml", "data-prep", "coco"]
) as dag:

extract_task = PythonOperator(
task_id="extract_data",
python_callable=extract_data
)

transform_task = PythonOperator(
task_id="transform_data",
python_callable=transform_data
)

load_task = PythonOperator(
task_id="transform_data",
python_callable=load_data
)

train_task = PythonOperator(
task_id="train_model",
python_callable=train_model
)

extract_task >> transform_task >> load_task >> train_task

Results

After running the DAG:

  • Your images and labels are properly resized and split.
  • Your YOLOv5 model is trained with updated bounding boxes.
  • All steps are repeatable and traceable via Airflow.

What’s Next?

  • Add evaluation and mAP tracking.
  • Schedule retraining weekly.
  • push trained weights to S3 or serve them with FastAPI.