Dynamic Workflows on Apache Airflow

Matija Pilepić

/ 2022-01-13

Introduction

Apache Airflow is a platform to programmatically author, schedule, and monitor workflows.

It is used to author workflows as directed acyclic graphs (DAGs) of tasks. The Apache Airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.

When workflows are defined as code, they become easier to maintain, version, test and collaborate on.

Customer Issue

Apache Airflow is an amazingly powerful open-source workflow tool that has been picking up steam in the past few years. One of the core features that makes it so powerful is that all workflows are defined with code. This gives us enormous flexibility and allows us to use Apache Airflow whenever needed.

Let’s say we want to create a new DAG dynamically where the content of the DAG including what kind of tasks to peform (Operators) and scheduling of the workflow etc. depends on a trigger event. A dynamic workflow in Apache Airflow means that we have a DAG with a bunch of tasks inside it and depending on some outside parameters (that are unknown at the time the DAG starts), it will sometimes create different amounts and/or types of tasks. For example, the DAG starts, and its first task runs some query that returns five records from a database, and if we want five distinct tasks to process each of those records, we could dynamically spin up five tasks to do that. Maybe the next time when DAG runs, it only gets one record back and so in that run we will only have one task spun up. It does not only have to change the number of tasks it spins up, but also create different types of tasks based on each record.  

The Solution

An Example of DAG with a dynamic workflow

The DAG before it is started:

center-medium

The DAG after it is run:

center-medium

As you can see there are more tasks as compared to when the DAG first started. In this article, I will provide the steps to achieving this functionality.

Design overview

Apache Airflow Variables were used to achieve this functionality. These variables are a valuable resource but are often overlooked in people’s designs. It is important to note that using them is not the only solution. AWS S3 or a database could be used among other options to store these values. Apache Airflow Variables can be set both programmatically using Python library, through the command line, or from Apache Airflow UI. Setting Variables from the UI gives us a unique feature within our DAGs of being able to manipulate the DAG from the UI without the need to change the underlying code.

For this example to work, two new Apache Airflow Variables need to be created:

airflow variables --set DynamicWorkflow_Group1 1

airflow variables --set DynamicWorkflow_Group2 0

Next, a new DAG needs to be created:

  
import airflow
  
from airflow.operators.python_operator import PythonOperator
  
import os
  
from airflow.models import Variable
  
import logging
  
from airflow import configuration as conf
  
from airflow.models import DagBag, TaskInstance
  
from airflow import DAG, settings
  
from airflow.operators.bash_operator import BashOperator
  
  

main_dag_id = 'DynamicWorkflow'
  
  
args = {
  
    'owner': 'airflow',
  
    'start_date': airflow.utils.dates.days_ago(2),
  
    'provide_context': True
  
}
  
  
dag = DAG(
  
    main_dag_id,
  
    schedule_interval="@once",
  
    default_args=args)
  
  
def start(*args, **kwargs):
  
  
    value = Variable.get("DynamicWorkflow_Group1")
  
    logging.info("Current DynamicWorkflow_Group1 value is " + str(value))
  
  

def bridge1(*args, **kwargs):
  
  

    # You can set this value dynamically e.g., from a database or a calculation
  
    dynamicValue = 2
  
  

    variableValue = Variable.get("DynamicWorkflow_Group2")
  
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))
  


    logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
  
    os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))
  


    variableValue = Variable.get("DynamicWorkflow_Group2")
  
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))
  


def end(*args, **kwargs):
  
    logging.info("Ending")
  



def doSomeWork(name, index, *args, **kwargs):
  
    # Do whatever work you need to do
  
    os.system('ls')
  


starting_task = PythonOperator(
  
    task_id='start',
  
    dag=dag,
  
    provide_context=True,
  
    python_callable=start,
  
    op_args=[])
  

# Used to connect the stream in the event that the range is zero
  
bridge1_task = PythonOperator(
  
    task_id='bridge1',
  
    dag=dag,
  
    provide_context=True,
  
    python_callable=bridge1,
  
    op_args=[])
  

  
ending_task = PythonOperator(
  
    task_id='end',
  
    dag=dag,
  
    provide_context=True,
  
    python_callable=end,
  
    op_args=[])
  


  
DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
  
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))
  


  
for index in range(int(DynamicWorkflow_Group1)):
  
    dynamicTask = PythonOperator(
  
        task_id='firstGroup_' + str(index),
  
        dag=dag,
  
        provide_context=True,
  
        python_callable=doSomeWork,
  
        op_args=['firstGroup', index])
  


  
    starting_task.set_downstream(dynamicTask)
  
    dynamicTask.set_downstream(bridge1_task)
  


  
DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
  
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))
  

  
for index in range(int(DynamicWorkflow_Group2)):
  
    # You can make this logic anything you'd like
  
    # I chose to use the PythonOperator for all tasks
  
    # except the last task will use the BashOperator
  
    if index < (int(DynamicWorkflow_Group2) - 1):
  
        dynamicTask = PythonOperator(
  
            task_id='secondGroup_' + str(index),
  
            dag=dag,
  
            provide_context=True,
  
            python_callable=doSomeWork,
  
            op_args=['secondGroup', index])
  
    else:
  
        dynamicTask = BashOperator(
  
            task_id='secondGroup_' + str(index),
  
            bash_command='ls',
  
            dag=dag)
  


    bridge1_task.set_downstream(dynamicTask)
  
    dynamicTask.set_downstream(ending_task)
  
  
  
# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
  
# and your tasks will run simultaneously instead of in your desired stream order.
  
starting_task.set_downstream(bridge1_task)
  
bridge1_task.set_downstream(ending_task)
  

This is what we see when the DAG is started:

center-medium

Let's trigger the DAG and go over the code at each step. The first task is called ‘start’. It is just a starting point since we don’t want the first task to be a dynamic one.

DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")

  
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))
  

  
for index in range(int(DynamicWorkflow_Group1)):
  
    dynamicTask = PythonOperator(
  
        task_id='firstGroup_' + str(index),
  
        dag=dag,
  
        provide_context=True,
  
        python_callable=doSomeWork,
  
        op_args=['firstGroup', index])

  
    starting_task.set_downstream(dynamicTask)
  
    dynamicTask.set_downstream(bridge1_task)

In the ‘firstGroup’ we are grabbing the value from the Apache Airflow Variable called DynamicWorkflow_Group1 and then simply going through a for loop to create a task for every value in the range. In this example, code does not change the value of the variable, it can be manually set to any value.

def bridge1(*args, **kwargs):

  
    # You can set this value dynamically e.g., from a database or a calculation
  
    dynamicValue = 2

  
    variableValue = Variable.get("DynamicWorkflow_Group2")
  
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

  
    logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
  
    os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))

  
    variableValue = Variable.get("DynamicWorkflow_Group2")
  
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))
    

Bridge1 is responsible for changing the value of the Apache Airflow Variable DynamicWorkflow_Group2, which, in turn, will control the number of tasks spun up next. Now it's important to point out why Apache Airflow Variable, S3, a database, or some external form of storage needs to be used to achieve this. That's because a DAG is not a regular Python file that's running in one system process. Instead, each of these Tasks (Operators) run independently of one another and even on different servers if a clustered Apache Airflow environment with multiple workers is set up. Therefore, global variables can’t be used to pass this information around. To keep this example simple, dynamicValue = 2 is hardcoded but this value can be obtained from some type of calculation performed or perhaps from a database query. When this task is run, it will be displayed in the logs that the starting value of the Apache Airflow Variable DynamicWorkflow_Group2 was zero and the updated value is two. Now, when the next task(s) is created, the DAG will use this value to determine how many to create.

center-medium

The DAG went from having no tasks shown in the secondGroup_* section to now having two and they both ran successfully. So not only can we manipulate the number of tasks created manually, as shown in firstGroup_* but we can also set it programmatically.

for index in range(int(DynamicWorkflow_Group2)):
  
    # You can make this logic anything you'd like
  
    # I chose to use the PythonOperator for all tasks
  
    # except the last task will use the BashOperator
  
    if index < (int(DynamicWorkflow_Group2) - 1):
  
        dynamicTask = PythonOperator(
  
            task_id='secondGroup_' + str(index),
  
            dag=dag,
  
            provide_context=True,
  
            python_callable=doSomeWork,
  
            op_args=['secondGroup', index])
  
    else:
  
        dynamicTask = BashOperator(
  
            task_id='secondGroup_' + str(index),
  
            bash_command='ls',
  
            dag=dag)

Instead of creating two PythonOperator tasks, logic to determine what type of tasks to create can be added. In this simple example, the first task is the PythonOperator and the last task is the BashOperator.

Conclusion

Apache Airflow’s ability to define workflows as a code allows creating dynamic workflows. This example kept the dynamic aspect very simple, but it clearly shows how far it can go. Instead of having a dynamic integer value, a JSON structure containing multiple bits of information can be used to create DAG's tasks. There are so many possibilities to where you can take this example.

Check out our other blog posts to find more technical advice, tools feedback, and verified walkthroughs.

Share This Story, Choose Your Platform!

Share This Story

Drive your business forward!

iOLAP experts are here to assist you