How to Skip Tasks in Airflow DAGs

Author:Murphy  |  View: 26143  |  Time: 2025-03-23 19:52:36

Recently, I was attempting to add a new task in an existing Airflow DAG that would only run on specific days of the week. However, I was surprised to find that skipping tasks in Airflow isn't as straightforward as I anticipated.

In this article, I will demonstrate how to skip tasks in Airflow DAGs, specifically focusing on the use of AirflowSkipException when working with PythonOperator or Operators that inherit from built-in operators (such as TriggerDagRunOperator).

Lastly, I will discuss the use of BranchPythonOperator and ShortCircuitOperator and how then can potentially be used to decide when a tasks needs to be skipped.


How to Run Airflow Locally With Docker


Now let's assume we have an Airlflow DAG consisting of three tasks

Example DAG consisting of three `PythonOperator ta`sks – Source: Author
from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator

with DAG(
    dag_id='test_dag',
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

  first_task = PythonOperator(task_id='task_a', python_callable=lambda: print('Hi from task_a'))
  second_task = PythonOperator(task_id='task_b', python_callable=lambda: print('Hi form task_b'))
  third_task = PythonOperator(task_id='task_c', python_callable=lambda: print('Hi form task_c'))

  first_task >> second_task >> third_task

Skipping PythonOperator tasks

The most intuitive way to skip tasks created via PythonOperator is to raise [AirflowSkipException](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/exceptions/index.html#airflow.exceptions.AirflowSkipException). This means python_callable function that gets executed via PythonOperator needs to implement the logic that decides when to raise exception.

Let's return to the example DAG we previously discussed and consider a scenario where task task_b must only run on Mondays of every week of the year.

from datetime import datetime

from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator

def my_func(**context):
    # If the DagRun start date is not a Monday, then skip this task
    dag_run_start_date = context['dag_run'].start_date
    if dag_run_start_date.weekday() != 0:
        raise AirflowSkipException

    # Anything beyond this line will be executed only if the
    # task is not skipped, based on the condition specified above
    print('Hi from task_b')

with DAG(
    dag_id='test_dag',
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:
    first_task = PythonOperator(task_id='task_a', python_callable=lambda: print('Hi from task_a'))
    second_task = PythonOperator(task_id='task_b', python_callable=my_func)
    third_task = PythonOperator(
        task_id='task_c',
        python_callable=lambda: print('Hi form task_c'),
        trigger_rule='none_failed'
    )

    first_task >> second_task >> third_task

If DagRun's start date is not a Monday, then task_b will be skipped and appear in pink colour (that denotes skipped tasks as per the legend on the UI).

Skip condition is met, and trigger_rule for task_c is set to none_failed – Source: Author

It's important to pay attention to the 'none_failed' value provided to trigger_rule keyword argument of third_task. If we skip this configuration, then task_c will also be skipped whenever task_b is also skipped.

task_c is also skipped given that trigger_rule will get the default all_success value – Source: Author

By default, trigger_rule is set to all_success which means that a task will be executed only if all of the upstream tasks are not skipped and are successful.

If the DagRun's start date is a Monday, then task_b will be executed:

Skip condition is not met – Source: Author

Skipping built-in Operator tasks

Now let's assume we have another DAG consisting of three tasks, including a TriggerDagRunOperator that is used to trigger another DAG.

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

with DAG(
  dag_id='test_dag',
  start_date=datetime(2021, 1, 1),
  catchup=False,
  tags=['example'],
) as dag:
  first_task = PythonOperator(
      task_id='task_a', 
      python_callable=lambda: print('Hi from task_a'),
    )
    trigger_task = TriggerDagRunOperator(
      task_id='trigger_other_dag', 
      trigger_dag_id='example_branch_operator',
    )
    last_task = PythonOperator(
        task_id='task_c',
        python_callable=lambda: print('Hi from task_c'),
        trigger_rule='none_failed',
    )

  first_task >> second_task >> third_task
A DAG consisting of TriggerDagRunOperator – Source: Author

Now things are a bit more complicated if you are looking into skipping tasks created using built-in operators (or even custom ones that inherit from built-in operators). To do so, there are essentially a few different options we can consider. In this section, I'm going to provide all of them and it's up to you to pick the one that best suits your needs.

The first option we have is BranchPythonOperator that is used to create a branching logic such that the DAG can take certain direction based on some conditional logic. Once again, let's assume that trigger_other_dag – that essentially uses a TriggerDagRunOperator to trigger another Airflow DAG – needs to be executed only on Mondays.

We can choose when to skip a task using a BranchPythonOperator with two branches and a callable that underlying branching logic.

def choose_branch(**context):
    dag_run_start_date = context['dag_run'].start_date
    if dag_run_start_date.weekday() != 0:  # check if Monday
        return 'task_a'
    return 'trigger_other_dag'

Now let's go ahead and create the Airlfow Tasks.

from datetime import datetime

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

with DAG(
  dag_id='test_dag',
  start_date=datetime(2021, 1, 1),
  catchup=False,
  tags=['example'],
) as dag:
  branch_task = BranchPythonOperator(
    task_id='branching', 
    python_callable=choose_branch,
  )

  first_task = PythonOperator(
    task_id='task_a', 
    python_callable=lambda: print('Hi from task_a'),
  )
  trigger_task = TriggerDagRunOperator(
    task_id='trigger_other_dag', 
    trigger_dag_id='example_branch_operator',
  )
  last_task = PythonOperator(
      task_id='task_c',
      python_callable=lambda: print('Hi from task_c'),
      trigger_rule='none_failed',
  )
  dummy_task = DummyOperator(task_id='skip', )

  first_task >>branch_task >> [trigger_task, dummy_task] >> last_task
Using BranchPythonOperator to decide when to skip a task – Source: Author

Note that branch operators cannot have empty paths and thus we've had to create a dummy task using DummyOperator and corresponds to skipping task. Whenever the trigger task needs to be skipped, then skip operator will be executed (not really..) instead:

Skip trigger_other_dag task— Source: Author

The second option we have is to use ShortCircuitOperator in order to implement a conditional logic to decide when to skip a particular task.

The ShortCircuitOperator is derived from the PythonOperator and evaluates the result of a python_callable. If the returned result is False or a falsy value, the pipeline will be short-circuited. Downstream tasks will be marked with a state of "skipped" based on the short-circuiting mode configured. If the returned result is True or a truthy value, downstream tasks proceed as normal and an XCom of the returned result is pushed.

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator, ShortCircuitOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

def is_monday(**context):
  return context['dag_run'].start_date.weekday() == 0

with DAG(
  dag_id='test_dag',
  start_date=datetime(2021, 1, 1),
  catchup=False,
  tags=['example'],
) as dag:
  is_monday_task = ShortCircuitOperator(
    task_id='is_not_monday',
    python_callable=is_monday,
    ignore_downstream_trigger_rules=False,
  )

  first_task = PythonOperator(
    task_id='task_a',
    python_callable=lambda: print('Hi from task_a'),
  )

  trigger_task = TriggerDagRunOperator(
    task_id='trigger_other_dag',
    trigger_dag_id='example_branch_operator',
  )

  last_task = PythonOperator(
    task_id='task_c',
    python_callable=lambda: print('Hi from task_c'),
    trigger_rule='none_failed',
  )

  first_task >> is_monday_task >> trigger_task >> last_task
Skipping Airflow tasks uisng ShortCircuitOperator – Source: Author

Now if is_monday() evaluates to False Airflow will skip all downstream tasks. However, this is the default behaviour which means that if we want just one task to be skipped we also need to provide ignore_downstream_trigger_rules=False when creating an instance of ShortCircuitOperator. This configuration will then take into account the corresponding trigger rules for downstream tasks and decide which should still be skipped or executed (note the trigger rule in our last task task_c).

If is_monday ShortCircuitOperator returns False, then skip only – Source: Author

If the result from python_callable is True then downstream tasks will also be executed.

If is_monday ShortCircuitOperator returns True, then all downstream tasks will be executed – Source: Author

Lastly, the third option involves the implementation of a sub-class that inherits from a built-in operator, such as TriggerDagRunOperator.

The following custom operator inhertis from the built-in TriggerDagRunOperator and takes an additional callable argument that will be used to decide whether an AirflowSkipException will be raised.

from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowSkipException
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

from typing import Any, Dict, Callable, TypeVar

Context = TypeVar('Context', bound=Dict[Any, Any])

class ConditionalTriggerDagRunOperator(TriggerDagRunOperator):
    """
    This is a custom operator that will execute TriggerDagRunOperator only if
    `conditional_checker_callable` callable result  evaluates to `True`. Otherwise, the task will
    be skipped by raising a `AirflowSkipException`
    """

    @apply_defaults
    def __init__(
        self,
        conditional_checker_callable: Callable[[Context], bool],
        **kwargs: Any,
    ) -> None:
        super().__init__(**kwargs)
        self.conditional_checker_callable = conditional_checker_callable

    def execute(self, context: Context) -> None:
        if not self.conditional_checker_callable(context):
            raise AirflowSkipException

        super().execute(context)

If the result of conditional_checker_callable returns True, then the operator will be executed otherwise it will be skipped. Now the full code for our DAG becomes as

from datetime import datetime

from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

from typing import Any, Dict, Callable, TypeVar

Context = TypeVar('Context', bound=Dict[Any, Any])

class ConditionalTriggerDagRunOperator(TriggerDagRunOperator):
    """
    This is a custom operator that will execute TriggerDagRunOperator only if
    `conditional_checker_callable` callable result  evaluates to `True`. Otherwise, the task will
    be skipped by raising a `AirflowSkipException`
    """

    def __init__(
        self,
        conditional_checker_callable: Callable[[Context], bool],
        **kwargs: Any,
    ) -> None:
        super().__init__(**kwargs)
        self.conditional_checker_callable = conditional_checker_callable

    def execute(self, context: Context) -> None:
        if not self.conditional_checker_callable(context):
            raise AirflowSkipException

        super().execute(context)

def is_monday(**context):
    return context['dag_run'].start_date.weekday() == 0

with DAG(
    dag_id='test_dag',
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:
    first_task = PythonOperator(
        task_id='task_a',
        python_callable=lambda: print('Hi from task_a'),
    )

    trigger_task = ConditionalTriggerDagRunOperator(
        task_id='trigger_other_dag',
        conditional_checker_callable=is_monday,
        trigger_dag_id='example_branch_operator',
    )

    last_task = PythonOperator(
        task_id='task_c',
        python_callable=lambda: print('Hi from task_c'),
        trigger_rule='none_failed',
    )

    first_task >> trigger_task >> last_task
Creating a custom operator that is capable of being skipped. Note that for this option, no additional tasks are required— Source: Author

Now if the condition results in False, the task created using our custom operator will be skipped.

Skipping the task if the condition is False – Source: Author

Likewise, if the condition is True the task will be executed.

Executing the task if the condition is True – Source: Author

Notice that for this particular option, we don't need to create additional tasks like we did in the two previous examples with BranchPythonOperator and ShortCircuitOperator which is something I personally like given that it declutters our DAG.

Note that a similar behaviour can be achived by inheriting functionality from SkipMixin Mixin Airlflow class. For more information, feel free to take a look at the documentation.

Personally, I do like the last approach as it's a bit more clear when it comes to the DAG visualisation on the Airflow UI and at the same time, by creating a sub-class from a built-in operator to implement skipping logic, you can re-use it for other DAGs as well. But this comes down to the specific use-case and any preference you may have, so feel free to choose the approach that best suits your needs.


Final Thoughts

Skipping tasks while authoring Airflow DAGs is a very common requirement that lets Engineers orchestrate tasks in a more dynamic and sophisticated way.

In this article, we demonstrate many different options when it comes to implementing logic that requires conditional execution of certain Airflow tasks. More specifically, we demonstrated how you can implement such functionality when using PythonOperator by raising AirflowSkipException.

Furthermore, we also demonstrated a few different approaches when it comes to skipping tasks created using built-in Operators, other than the PythonOperator. Based on your specific use-case it's up to you to decide which approach to take.

Even though skipping tasks is a common requirement, it doesn't seem that Airflow has a built-in feature to perform conditional runs within a specific DAG. I would expect that a feature allowing developers to specify trigger conditions in specific tasks would be available, but I am pretty sure that sooner or later this functionality will be implemented and packed into a future Airflow version.


Tags: Airflow Data Science Programming Python Tips And Tricks

Comment