Airflow (Apache Python Platform for Automation Workflows)

Introduction

Apache Airflow is an open-source workflow management platform for data engineering pipelines. It started at Airbnb in October 2014 as a solution to manage the company's increasingly complex workflows. Creating Airflow allowed Airbnb to programmatically author and schedule their workflows and monitor them via the built-in Airflow user interface. From the beginning, the project was made open source, becoming an Apache Incubator project in March 2016 and a top-level Apache Software Foundation project in January 2019. Airflow is written in Python, and workflows are created via Python scripts. Airflow is designed under the principle of "configuration as code". While other "configuration as code" workflow platforms exist using markup languages like XML, using Python allows developers to import libraries and classes to help them create their workflows.

--(Wikipedia 2023)

TODO: Include the module notes in module 22.

TODO: There should be a data engineering topics index. The PCDE course outline is a good place to pull from. Create one with some conceptual outlines and new documents with core concepts and include a lot of the links relevant from the PCDE outline therein. Note that this document mentions it and should be linked as such.

Why Airflow?

Airflow is a batch workflow orchestration platform. The Airflow framework contains operators to connect with many technologies and is easily extensible to connect with a new technology. If you workflows have a clear start and end, and run at regular intervals, they can be programmed as an Airflow DAG.

If you prefer coding over clicking, Airflow is the tool for you. Workflows are defined as Python code which means:

Rich scheduling and execution semantics enable you to easily define complex pipelines, running at regular intervals. Backfilling allows you to run (or re-run) pipelines on historical data after making changes to the logic. Also, the ability to rerun partial pipelines after resolving an error helps maximize efficiency.

The user interface provides both in-depth views of pipelines and individual tasks, and an overview of pipelines over time. From the interface, you can inspect logs and manage tasks, for example retrying a task in case of failure.

It's Open-Source Community ensures that components being worked on are developed, tested and deployed by many other organizations around the world. It also means there's tons of resources to get started with the platform.

Why NOT Airflow?

Airflow was designed for finite batch workflows. While the CLI and REST API allows triggering workflows, Airflow is not a streaming solution. However, a streaming system such as Apache Spark or Kafka can be used in tandem to add realtime streaming capabilities. There's even Operators for both Spark and Kafka in Airflow.

Basics

Workflows as Code

The main characteristic of Airflow workflows is that all workflows are defined in Python code. "Workflows as code" serves several purposes:

Directed Acyclic Graphs (DAG)

A DAG (Directed Acyclical Graph) is the core concept of Airflow, collecting tasks together, organized with dependencies and relationships to say how they should run. (Apache Airflow 2023)

Here's a basic example of a DAG, note there is no looping back to any task:

graph LR
A(TaskA)
A --> B(TaskB)
A --> C(TaskC)
B --> D
C --> D
D(TaskD)

To demonstrate a DAG let's examine the below code.

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:

# Tasks are represented as operators
hello = BashOperator(task_id="hello", bash_command="echo hello")

@task()
def airflow():
print("airflow")

# Set dependencies between tasks
hello >> airflow()

DAGs

What are DAGs?

DAGs or Directed Acyclic Graphs are, as the name implies, a directed graph. Mathematically, directed graphs are any graph where the vertices and edges have an order or direction associated with them. That is, generally they end up in the same edges. In the example below, no matter what edge you start in, the vertices will guide you to one edge, D.

graph LR
A-->B
A-->C
B-->C
C-->D
D-->D

There is a variation of the directed graph, the directed acyclic graph, that adds an important property. That is, there are no cycles, so it doesn't ever loop-back to any previous edge at any point. The D edge in the example above breaks this rule and thus it isn't a DAG. In the example below it is both a directed and acyclic graph.

graph LR
A-->B
B-->C
B-->D
C-->E
D-->E
E-->F

Note that this example, both always ends up in one edge no matter the start. It also never loops back to another node. This opens up some interesting properties as a data-structure, specific to Airflow's task scheduling.

Defining DAGs

Python scripts define Airflow DAGs through the use of Operators. Below are the three most commonly used Operators.

These operators are essentially collections of predefined tasks and logic that needs to be performed in sequences and may have dependencies on other operators.

Note: To see a list and references of all the default and included hooks and operators, see the Operators and Hooks Reference

A task or operator requires dependencies to be defined in relation to any other task in a DAG. Defining these dependencies is what define the DAG structure, i.e. the edges of the directed acyclic graph.

There are two main ways to declare individual Task dependencies. The preferred approach uses the >> and << operators. For example, with a task first_task & second_task, the following code makes the second_task dependant on the first_task.

first_task >> second_task

If you want to define a fourth_task and fifth_task to occur after a third_task you can use an array literal ([a, b]) with the >> or << operator.

third_task >> [fourth_task, fifth_task]

Example DAG

Example DAG: Import Libraries

This DAG will use a Python function to print a string to screen. First, import the necessary libraries.

from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

Example DAG: DAG Arguments

DAGs require arguments to specify metadata about how it will be run, who wrote it, dependencies, etc. This is done with a dictionary that watches expected configuration keys. Here are a few.

default_args = {
'owner': 'me'
'depends_on_past': False,
'start_date': days_ago(2),
'email': 'me@example.com',
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'interval': '0 12 * * *',
}

Example DAG: First Python Operator

Next, define a Python function that will print a string using an argument. This function will later be used by the PythonOperator.

def my_function(x):
return x + ' is a must have tool for Data Engineers.'

Example DAG: Define the DAG

Now a DAG object needs to be constructed. You pass the arguments defined previously in the default_args variable, or directly as a dictionary literal.

with DAG(default_args) as dag:
python_task = PythonOperator(task_id='my_function', python_callable=my_function)

Example DAG: Trigger the DAG

With a python file holding this code in the dags directory, it should now show up in the DAGs tab of the UI. First it needs to be enabled using the toggle next to the DAG in the list. Without trigger the DAG, the interval should start the DAG at noon every day. To trigger it manually, use the (..., or three dots menu) of the DAG to select the Manually Trigger action.

With it running, the logs section of the DAG should show a log with the expected Airflow is a must have tool for Data Engineers.

Deploy in Containers

One of the easiest ways to deploy Airflow for developing, testing and even production is via a container. In this example Docker is used with a Docker-Compose File.

Create a Workspace Directory

First create a workspace for the container's DAGs, logs, and configurations to be persisted.

mkdir airflow-container

Download the Official Docker Compose File

Fortunately Apache maintains an excellent and well documented and commented docker-compose file in their docker repository. This can be done easily through the command line with a utility like cURL or taking the URL below and potentially editing the version portion of it.

curl \
'https://airflow.apache.org/docs/apache-airflow/2.1.1/docker-compose.yaml' \
-o docker-compose.yaml

Edit the Compose File

TODO: When the bin service is ready, get a copy of a recent airflow docker compose file, put it there, and link to it here.

There are a lot of configurations that have been broken out into variables and the airflow-common section that simplifies configuring the containers. Here are some notable configurations to consider away from the defaults.

Docker Compose Up

Then use docker-compose up to bring the containers up, from within the workspace directory. The -d or --detach flag can be used to run the containers in the background in a detached state.

# This should be done from within the airflow container workspace directory
docker-compose up -d

Inspect the Workspace Directory

Give the containers some time to initialize. If they haven't been given enough resources it can sometimes take over 5 minutes to initialize containers. Once the containers are initialized there should be three new directories in the airflow container workspace directory where the docker-compose file was downloaded. There should be a dags, logs, and plugins directory. An ls command should look something like this:

ls
# output: dags/ logs/ plugins/ docker-compose.yml

Login to Airflow Web Server

Once it's all up and running the default username and password can be given as airflow for both fields. The first view that should be visible is the DAGs overview where all the currently available DAGs should be visible. From here it's possible to toggle whether a DAG is active.

References

Web Links

Note Links