Introduction to Data Pipelines with Apache Airflow

Oct 7, 2024 · 2 min read

Welcome to the first post of my 15-day series on learning Apache Airflow! Every day, I’ll break down one chapter from the book “Data Pipelines with Apache Airflow” by Bas Harenslak & Julian Rutger de Ruiter.

What is Apache Airflow?

Apache Airflow is an open-source platform that allows you to orchestrate complex data workflows. It was developed to automate the scheduling and monitoring of tasks involved in data pipelines.

Key Concepts

Workflows & DAGs (Directed Acyclic Graphs)

  • Airflow manages workflows by representing them as DAGs
  • A DAG is a collection of tasks executed based on dependencies
  • Example: A data pipeline where raw data is extracted, transformed, and loaded into a database

Tasks in Airflow

  • Each node in a DAG represents a task
  • Tasks can be anything from running a Python script to querying a database
  • Tasks run in parallel or sequentially based on dependencies

Schedulers

  • Airflow has a scheduler that ensures tasks run at the right time
  • You can schedule tasks to run at specific intervals (daily, hourly, etc.)

Setting Up Apache Airflow

1. Set Up a Python Environment

python3 -m venv airflow_venv
source airflow_venv/bin/activate

2. Install Apache Airflow

export AIRFLOW_VERSION=2.6.3
export PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

3. Initialize the Database

airflow db init

4. Create a User

airflow users create \
   --username admin \
   --firstname FIRST_NAME \
   --lastname LAST_NAME \
   --role Admin \
   --email admin@example.com

5. Start Airflow

airflow webserver --port 8080
airflow scheduler

My First DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

dag = DAG('my_first_dag', start_date=datetime(2024, 10, 1), schedule_interval='@daily')

def my_first_task():
    print("Hello, this is my first Airflow task!")

task = PythonOperator(
    task_id='print_hello',
    python_callable=my_first_task,
    dag=dag,
)

Stay tuned for Day 2 where I’ll cover Airflow’s Architecture in detail!

Aditya Paliwal
Authors
Data Engineer
Data Engineer with 4+ years of experience in implementing and deploying end-to-end data pipelines in production environments. Passionate about combining data engineering with cutting-edge machine learning and AI technologies to create intelligent, data-driven products.