Building ETL Data Pipeline for ML Training with Airflow
In real-world machine learning workflows, preparing data and training models is rarely a one-time manual process. It's iterative, messy, and benefits immensely from automation. In this post, I’ll walk you through a complete Airflow pipeline that prepares a COCO-format dataset for training and kicks off a PyTorch-based YOLOv5 training job — all locally.
We'll build a clean ETL-style pipeline:
- Extract raw images and annotations.
- Transform them: resize images, update bounding boxes.
- Split into training and validation sets.
- Train a YOLOv5 object detection model using PyTorch.
Project Structure
Here's the expected directory layout:
/raw_data/
images/
1.jpg
2.jpg
annotations.json
After processing, images will be split and labeled like this:
/processed_data/
images/
1.jpg
2.jpg
annotations.json
train/
images/1.jpg
labels/1.txt
val/
images/
labels/
yolov5_data.yaml
Airflow DAG: ml_data_preparation_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import os
import shutil
import json
from PIL import Image
RAW_IMAGES_DIR = "/raw_data/images"
RAW_ANNOTATIONS_PATH = "/raw_data/annotations.json"
WORKING_IMAGES_DIR = "/tmp/xxx_data/images"
WORKING_ANNOTATIONS_PATH = "/tmp/xxx_data/annotations.json"
PROCESSED_DIR = "/processed_data"
PROCESSED_IMAGES_DIR = os.path.join(PROCESSED_DIR, "images")
PROCESSED_ANNOTATIONS_PATH = os.path.join(PROCESSED_DIR, "annotations.json")
TARGET_SIZE = (416, 416)
TRAIN_DIR = os.path.join(PROCESSED_DIR, "train")
VAL_DIR = os.path.join(PROCESSED_DIR, "val")
TRAIN_IMAGES_DIR = os.path.join(TRAIN_DIR, "images")
TRAIN_LABELS_DIR = os.path.join(TRAIN_DIR, "labels")
VAL_IMAGES_DIR = os.path.join(VAL_DIR, "images")
VAL_LABELS_DIR = os.path.join(VAL_DIR, "labels")
DATA_YOLO_CONFIG = os.path.join(PROCESSED_DIR, "yolov5_data.yaml")
# Get the new data
def extract_data():
os.makedirs(WORKING_DIR, exist_ok=True)
os.makedirs(PROCESSED_IMAGES_DIR, exist_ok=True)
# Assume that I get data from local RAW_IMAGES_DIR, every time we can get new data then update it into PROCESSED_DIR
shutil.copy(RAW_IMAGES_DIR, WORKING_IMAGES_DIR)
shutil.copy(RAW_ANNOTATIONS_PATH, WORKING_ANNOTATIONS_PATH)
# Resize data
def transform_data():
# Assume: the new added image filename not conflict
with open(WORKING_ANNOTATIONS_PATH, 'r') as f:
coco = json.load(f)
image_id_map = {}
for img in coco['images']:
img_path = os.path.join(WORKING_IMAGES_DIR, img['file_name'])
new_img_path = os.path.join(PROCESSED_IMAGES_DIR, img['file_name'])
with Image.open(img_path) as im:
orig_width, orig_height = im.size
resized = im.resize(TARGET_SIZE)
resized.save(new_img_path)
scale_x = TARGET_SIZE[0] / orig_width
scale_y = TARGET_SIZE[1] / orig_height
img['width'], img['height'] = TARGET_SIZE
image_id_map[img['id']] = (scale_x, scale_y)
for ann in coco['annotations']:
scale_x, scale_y = image_id_map[ann['image_id']]
x, y, w, h = ann['bbox']
ann['bbox'] = [
round(x * scale_x, 2),
round(y * scale_y, 2),
round(w * scale_x, 2),
round(h * scale_y, 2)
]
# Update the annotation
with open(PROCESSED_ANNOTATIONS_PATH, 'a') as f:
old_coco = json.load(f)
old_coco.update(coco)
json.dump(old_coco, f)
# Add a `yolov5` compatible data
def load_data():
images_dir = PROCESSED_IMAGES_DIR
ann_path = PROCESSED_ANNOTATIONS_PATH
os.makedirs(TRAIN_IMAGES_DIR, exist_ok=True)
os.makedirs(TRAIN_LABELS_DIR, exist_ok=True)
os.makedirs(VAL_IMAGES_DIR, exist_ok=True)
os.makedirs(VAL_LABELS_DIR, exist_ok=True)
with open(ann_path) as f:
coco = json.load(f)
# Step 1: Build image ID to filename map
img_id_to_filename = {img['id']: img['file_name'] for img in coco['images']}
# Step 2: Extract classes dynamically
categories = {cat['id']: cat['name'] for cat in coco['categories']}
class_name_to_id = {v: i for i, v in enumerate(sorted(set(categories.values())))}
class_id_map = {cat_id: class_name_to_id[name] for cat_id, name in categories.items()}
class_names = list(class_name_to_id.keys())
# Step 3: Build label files in YOLO format
labels = {img_id: [] for img_id in img_id_to_filename}
for ann in coco['annotations']:
image_id = ann['image_id']
category_id = ann['category_id']
x, y, w, h = ann['bbox']
img = next(i for i in coco['images'] if i['id'] == image_id)
img_w, img_h = img['width'], img['height']
x_center = (x + w / 2) / img_w
y_center = (y + h / 2) / img_h
w /= img_w
h /= img_h
class_id = class_id_map[category_id]
labels[image_id].append(f"{class_id} {x_center:.6f} {y_center:.6f} {w:.6f} {h:.6f}")
# Step 4: Split train/val
image_ids = list(img_id_to_filename.keys())
random.shuffle(image_ids)
split = int(0.8 * len(image_ids))
train_ids, val_ids = image_ids[:split], image_ids[split:]
for subset, subset_ids in [('train', train_ids), ('val', val_ids)]:
for img_id in subset_ids:
filename = img_id_to_filename[img_id]
shutil.copy(os.path.join(PROCESSED_IMAGES_DIR, filename), os.path.join(PROCESSED_DIR, subset, "images", filename))
label_path = os.path.join(PROCESSED_DIR, subset, "labels", f"{Path(filename).stem}.txt")
with open(label_path, 'w') as f:
f.write('\n'.join(labels[img_id]))
# Step 5: Write data.yaml
data_yaml = {
'train': TRAIN_IMAGES_DIR,
'val': VAL_IMAGES_DIR,
'nc': len(class_names),
'names': class_names
}
with open(DATA_YOLO_CONFIG, 'w') as f:
yaml.dump(data_yaml, f)
YOLOv5 Training with Python API
After transformation, we split the dataset and kick off training with YOLOv5 (using the official PyTorch implementation). This step:
- Extracts class names from COCO.
- Generates YOLO .txt label files.
- Splits data 80/20.
- Trains a model using YOLOv5’s Python API.
PROCESSED_DIR = "/processed_data"
DATA_YOLO_CONFIG = os.path.join(PROCESSED_DIR, "yolov5_data.yaml")
TARGET_SIZE = (416, 416)
def train_model():
import os
import json
import shutil
import random
import yaml
from yolov5 import train
train.run(
imgsz=416,
batch_size=8,
epochs=5,
data=DATA_YOLO_CONFIG,
weights='yolov5s.pt',
name='custom_yolov5_model',
project=os.path.join(PROCESSED_DIR, "runs"),
exist_ok=True
)
Airflow DAG Pipeline
Plug all three steps into Airflow:
with DAG(
dag_id="ml_data_preparation_dag",
schedule_interval=None,
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["ml", "data-prep", "coco"]
) as dag:
extract_task = PythonOperator(
task_id="extract_data",
python_callable=extract_data
)
transform_task = PythonOperator(
task_id="transform_data",
python_callable=transform_data
)
load_task = PythonOperator(
task_id="transform_data",
python_callable=load_data
)
train_task = PythonOperator(
task_id="train_model",
python_callable=train_model
)
extract_task >> transform_task >> load_task >> train_task
Results
After running the DAG:
- Your images and labels are properly resized and split.
- Your YOLOv5 model is trained with updated bounding boxes.
- All steps are repeatable and traceable via Airflow.
What’s Next?
- Add evaluation and mAP tracking.
- Schedule retraining weekly.
- push trained weights to S3 or serve them with FastAPI.