Introduction

Airflow summit is just around the corner (September 10–12), and the timing was, therefore perfect for a new important release of Airflow. This minor release, “2.10” does not disappoint, and this short article will cover the main features and fixes introduced with this release. If you want to explore everything that Airflow 2.10 has to offer, this article is for you.

Airflow Dataset

As a reminder, an Airflow dataset is a logical grouping of data. Upstream producer tasks can update datasets, and dataset updates contribute to scheduling downstream consumer DAGs. This release added a few new features to the Airflow Dataset.

Dynamic Dataset Definition

A dataset alias can be used to emit dataset events of datasets with association to the aliases. Downstreams can depend on resolved dataset. This feature allows you to define complex dependencies for DAG executions based on dataset updates.

Imagine you have a task that generates output files in an S3 bucket, with the location of these files dependent on the execution date (ds). Previously, you would have to define the dataset statically, but with the new DatasetAlias, you can dynamically create the dataset path based on the execution context:

@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task_with_outlet_events(*, ds, outlet_events):
 outlet_events["my-task-outputs"].add(Dataset(f"s3://bucket/my-task/{ds}"))

@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task_with_metadata(*, ds):
 s3_dataset = Dataset(f"s3://bucket/my-task/{ds}")
 yield Metadata(s3_dataset, extra={"k": "v"}, alias="my-task-outputs")

Datasets no longer trigger inactive DAGs

Previously, when a DAG was paused or removed, incoming dataset events would still trigger it, and the DAG would run when it was unpaused or added back in a DAG file. This has been changed; a DAG’s dataset schedule can now only be satisfied by events that occur when the DAG is active.

Consider a scenario where you have multiple DAGs consuming the same dataset, but some of these DAGs are paused for maintenance or no longer active. With this update, you can ensure that dataset events only trigger active, unpaused DAGs, preventing unnecessary executions and reducing the risk of errors:

Update on the Datasets view

The Datasets view Airflow has been significantly revamped to provide a more intuitive and detailed overview of dataset events and their impact on workflows. This update is particularly important for users managing complex data pipelines where understanding the flow of data and its triggering effects is crucial.

Key Enhancements:

  • Dataset Events List: The new view prominently features a list of all dataset events, allowing users to quickly see the most recent activity across all datasets. This is particularly useful for monitoring data pipelines and diagnosing issues.


Screenshot of a data management interface showing a list of datasets with timestamps, task details, and status icons, under sections titled Dataset Events, Dependency Graph, and Datasets.


  • Event-Rich Cards: Instead of a simple table, dataset events are now displayed as cards with detailed information about the event source, downstream runs, and any associated metadata. This richer display provides better context at a glance.

  • Tabbed Interface: The new interface organizes information into tabs, including dataset events, details, datasets list, and a graph view. This separation allows users to focus on the specific aspect of the datasets they are interested in, without clutter.

  • Breadcrumb Navigation: The introduction of breadcrumb navigation improves the user experience by clearly showing the selected dataset or event, making it easier to navigate back and forth between views.

  • Richer Dataset Details: The dataset details tab now includes more comprehensive information, such as extra metadata, consuming DAGs, and producing tasks. This level of detail helps users understand the full lifecycle of their datasets.

Flowchart diagram displaying the structure of a data processing workflow with elements labeled as 'producer', 'consumer', and various datasets and aliases connected by arrows indicating data flow, annotated with icons for DAG, dataset, and dataset alias.


Hybrid executors

This feature is experimental. Previously known as hybrid executors, this new feature allows Airflow to use multiple executors concurrently. DAGs, or even individual tasks, can be configured to use a specific executor that suits its needs best. A single DAG can contain tasks all using different executors.

Running multiple executors allows you to make better use of the strengths of all the available executors and avoid their weaknesses. In other words, you can use a specific executor for a specific set of tasks where its particular merits and benefits make the most sense for that use case.

To specify an executor for a task, make use of the executor parameter on Airflow Operators:

BashOperator(
    task_id="hello_world",
    executor="LocalExecutor",
    bash_command="echo 'hello world!'",
)

Dark mode

We had to wait until 2.10 to have a dark mode in Airflow. “All things come to those who wait” as the adage says. Well, not much to say about this self-explanatory feature, and hereafter is a screenshot of this dark mode:

Dashboard interface of Apache Airflow displaying various data pipelines (DAGs) with their status indicators such as running (green), failed (red), and other statuses, along with details like owner, schedule, and recent run times on a dark theme background.

You can toggle the dark/light mode by clicking the crescent icon on the right side of the navigation bar.

New concat() method

A notable addition is the introduction of the concat() method for XComArg objects. This new feature enhances the way users can manipulate and combine data passed between tasks in a DAG, making the process more efficient and less resource-intensive.

The concat() method allows users to concatenate multiple XComArg references into a single XComArg object. This method is particularly useful when you need to perform an operation on several lists or sequences of items without creating additional tasks in your DAG. Essentially, concat() works like Python’s itertools.chain, but with the added benefit of supporting index access on the concatenated results.


Sequence diagram illustrating the interaction between a user and objects 'XComArg' and 'ConcatXComArg'. The diagram shows the user calling the 'concat()' method to combine multiple 'XComArg' instances, resulting in a new 'ConcatXComArg' object that behaves as a sequence and can be accessed via indices, treated as a list or dictionary.


When you call concat() on an XComArg, it combines it with one or more other XComArgs, creating a new ConcatXComArg object. This object behaves as a sequence that can be accessed using indices, making it easy to work with the concatenated data as if it were a single list or dictionary.

For example, if you have two or more XComArgs representing lists of files, you can now concatenate them and then map over the combined list in a downstream task without needing to manually merge the lists in a separate task. This not only simplifies the DAG but also saves resources by reducing the amount of data stored in XCom and the memory required to execute tasks.

New DAG-level permissions

Airflow 2.10 introduces enhanced control over DAG-level permissions, providing a more granular approach to managing access within your workflows. This update extends the existing access_control feature to cover DAG Run resources, allowing administrators to define specific permissions for triggering and deleting DAGs at a per-DAG level.

In previous versions of Airflow, access_control allowed for role-based access management (RBAC) at the DAG level, but the granularity of permissions was limited. With the new updates in version 2.10, permissions can now be assigned not only for DAGs as a whole but also for specific actions within those DAGs, such as creating and deleting DAG Runs.

With this new capability, administrators can specify which roles have permission to perform certain actions on specific DAGs. For example, you might allow a user role to trigger a DAG but prevent them from deleting it. This is particularly useful in environments where certain operations should be restricted to specific users or teams.

with DAG(
    'tutorial',
    ...
    access_control={
        'User': {
            'DAG Runs': {'can_create'}
        }
    }
) as dag:

In this case, users with the “User” role are granted permission to create DAG Runs for the tutorial DAG, but not necessarily to delete the DAG itself.

with DAG(
    'tutorial2',
    ...
    access_control={
        'User': {'can_delete'}  # Old style: {'DAGs': {'can_delete'}}
) as dag:

Here, users with the “User” role can delete the tutorial2 DAG, showcasing the ability to apply permissions at both the DAG level and the DAG Run level.

UI with disable buttons with delete and trigger functions working when allowed:

Apache Airflow web interface showing two data pipelines (DAGs) with statuses, owner info, run counts, schedule timings, and controls for last and next run details, displayed on a streamlined dashboard with options for filtering and auto-refresh.User interface element of Apache Airflow showing details of a single DAG named 'tutorial' with a description 'A simple tutorial DAG', its schedule set to daily, and next run time displayed.User interface element of Apache Airflow displaying a single DAG named 'tutorial2' described as 'A simple tutorial DAG', with a daily schedule and the next scheduled run time shown.

Jinja templating with ObjectStoragePath

Jinja templating is now supported for the ObjectStoragePath when used as an operator argument. This enhancement allows users to dynamically render paths based on variables such as execution date, task IDs, or custom parameters, providing greater flexibility and adaptability in defining storage paths within their DAGs.

For example, you can now define an ObjectStoragePath like this:

path = ObjectStoragePath("s3://my-bucket/data/{{ ds }}/{{ task_id }}.csv")

In this example, the path will be dynamically rendered with the specific date (ds) and task ID, making it easier to organize and access files in object storage systems. This feature streamlines the integration of Airflow with cloud storage services, enhancing the overall efficiency of your workflows.

Color to log lines in UI for errors and warnings

Reading and finding errors/warnings in the logs will be much easier from now on. Errors and warnings will be now highlighted in different colors (red and blue, respectively) and in bold.

Screenshot of Apache Airflow’s task attempt log interface, showing error messages and stack traces from a failed task execution. The log details include timestamps, error descriptions, and the Python traceback indicating where the error occurred in the code.

New run_if and skip_if decorators

Airflow 2.10 introduces the run_if and skip_if decorators, providing a more concise and readable way to conditionally execute or skip tasks within a DAG. These decorators act as syntactic sugar, simplifying the logic for task execution based on specific conditions.

The run_if decorator allows you to define a condition under which a task should be executed. If the condition is not met, the task is automatically skipped. This is particularly useful for scenarios where task execution depends on dynamic conditions evaluated at runtime.

UML sequence diagram depicting a conditional task execution process where a user defines a task, applies a decorator, and then at runtime, a condition is evaluated to decide whether to execute or skip the task based on whether the condition is met or not.

Here’s an example of how to use the run_if decorator:

from __future__ import annotations

from pendulum import datetime

from airflow.decorators import dag, task


@dag(start_date=datetime(2022, 1, 1), schedule=None, catchup=False)
def condition_sample():
    @task.run_if(lambda context: context["task_instance"].task_id.endswith("_do_run"))
    @task.bash()
    def echo() -> str:
        return "echo 'run'"

    echo.override(task_id="echo_do_run")()
    echo.override(task_id="echo_do_not_run")()


condition_sample()
In this sample, DAG, the echo task will on

In this sample, DAG, the echo task will only run if its task_id ends with _do_run. The run_if decorator evaluates the condition, and if it’s True, the task proceeds; otherwise, it is skipped.

Conclusion

Every 4–5 months, Airflow releases a new minor version and it’s really exciting to see how much efforts are put to continuously improve this great OSS orchestrator tool. The power of Airflow resides in its strong community and its willingness to make the product always better. This release went in that sense and companies should not wait to migrate to this new version as the new features introduced can simplify/improve significantly your current/new DAGs and overall management of Airflow.

Thank you

If you enjoyed reading this article, stay tuned as we regularly publish technical articles on Airflow and Data Stack in general. Follow Astrafy on LinkedIn, Medium and Youtube to be notified of the next article.

If you are looking for support on Airflow or your Data Stack in general, feel free to reach out to us at sales@astrafy.io.