Crafting Your First Real Airflow DAG

Oct 8, 2024 · 2 min read

Welcome back to Day 2! Today, we’ll explore the anatomy of a DAG and get hands-on by writing a simple ETL workflow.

Writing Your First Workflow (DAG)

Here’s a complete ETL pipeline for an art gallery:

from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
import pandas as pd

with DAG(
    dag_id="art_gallery_etl_2024",
    start_date=datetime(year=2024, month=1, day=1, hour=9, minute=0),
    schedule="@daily",
    catchup=True,
    max_active_runs=1,
    render_template_as_native_obj=True
) as dag:

    def extract_art_data_callable():
        print("Extracting art piece data from gallery records")
        return {
            "date_acquired": "2022-09-15",
            "artist": "Vincent van Gogh",
            "title": "Starry Night",
            "details": {
                "type": "Painting",
                "dimensions": "73.7 cm x 92.1 cm"
            }
        }

    extract_art_data = PythonOperator(
        task_id="extract_art_data",
        python_callable=extract_art_data_callable
    )

    def transform_art_data_callable(raw_data):
        transformed_data = [
            [
                raw_data.get("date_acquired"),
                raw_data.get("artist"),
                raw_data.get("title"),
                raw_data.get("details").get("type"),
                raw_data.get("details").get("dimensions")
            ]
        ]
        return transformed_data

    transform_art_data = PythonOperator(
        task_id="transform_art_data",
        python_callable=transform_art_data_callable,
        op_kwargs={"raw_data": "{{ ti.xcom_pull(task_ids='extract_art_data') }}"}
    )

    def load_art_data_callable(transformed_data):
        loaded_data = pd.DataFrame(transformed_data)
        loaded_data.columns = ["date_acquired", "artist", "title", "art_type", "dimensions"]
        print(loaded_data)

    load_art_data = PythonOperator(
        task_id="load_art_data",
        python_callable=load_art_data_callable,
        op_kwargs={"transformed_data": "{{ ti.xcom_pull(task_ids='transform_art_data') }}"}
    )

    extract_art_data >> transform_art_data >> load_art_data

Key Components

  • DAG: Defines the workflow with properties like dag_id, start_date, and schedule
  • Tasks: Each task performs a single action (extract, transform, load)
  • Operators: Define what kind of task is executed (PythonOperator for Python functions)
  • Dependencies: Using >> to define execution order

Handling Task Failures

Airflow excels at handling failures:

  • View detailed logs from the UI
  • Selectively rerun just the failed task
  • Successful tasks don’t need to be rerun

Stay tuned for Day 3, where we’ll dive into scheduling!

Aditya Paliwal
Authors
Data Engineer
Data Engineer with 4+ years of experience in implementing and deploying end-to-end data pipelines in production environments. Passionate about combining data engineering with cutting-edge machine learning and AI technologies to create intelligent, data-driven products.