Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ Airflow supports multiple types of Dag Bundles, each catering to specific use ca
**airflow.providers.google.cloud.bundles.gcs.GCSDagBundle**
These bundles reference a GCS bucket containing Dag files. They do not support versioning of the bundle, meaning tasks always run using the latest code.

**airflow.providers.microsoft.azure.bundles.wasb.WasbDagBundle**
Comment thread
Nishieee marked this conversation as resolved.
These bundles reference an Azure Blob Storage container containing Dag files. They do not support versioning of the bundle, meaning tasks always run using the latest code.

Configuring Dag bundles
-----------------------

Expand Down
80 changes: 80 additions & 0 deletions providers/microsoft/azure/docs/bundles/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

Bundles
#######

Dag bundles allow Airflow to load Dags from external sources. For a general overview see
:doc:`apache-airflow:administration-and-deployment/dag-bundles`.

WasbDagBundle
=============

Use the :class:`~airflow.providers.microsoft.azure.bundles.wasb.WasbDagBundle` to configure an Azure Blob
Storage bundle in your Airflow's ``[dag_processor] dag_bundle_config_list``. The bundle does not support
versioning; tasks always use the latest blobs synced to the local bundle directory.

Example of using the WasbDagBundle:

**JSON format example**:

.. code-block:: bash

export AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST='[
{
"name": "my-wasb-dags",
"classpath": "airflow.providers.microsoft.azure.bundles.wasb.WasbDagBundle",
"kwargs": {
"wasb_conn_id": "wasb_default",
"container_name": "airflow-dags",
"prefix": "dags/",
"refresh_interval": 60
}
}
]'

Authentication
--------------

The bundle uses a ``wasb`` Connection (``wasb_conn_id``). Authentication is the same as for
:class:`~airflow.providers.microsoft.azure.hooks.wasb.WasbHook` — see :doc:`../connections/wasb`. On Azure-hosted
Airflow, managed identity via ``DefaultAzureCredential`` is typical.

Permissions
-----------

The identity needs read access to list and download blobs in the target container. Assign
`Storage Blob Data Reader <https://learn.microsoft.com/en-us/azure/role-based-access-control/built-in-roles/storage#storage-blob-data-reader>`_
at the storage account or container scope.

Container and prefix
--------------------

Set ``container_name`` to the blob container that holds your Dag files. Use ``prefix`` for an optional
virtual folder inside the container.

Networking
----------

The Dag processor needs outbound HTTPS to the blob endpoint. Storage firewalls and private endpoints
must allow access from Airflow, as for any WASB client.

Reusing the Connection in Dags
------------------------------

You can use the same ``wasb`` Connection ID in ``wasb_conn_id`` for the bundle and for operators or sensors
that use ``WasbHook``.
1 change: 1 addition & 0 deletions providers/microsoft/azure/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
:maxdepth: 1
:caption: Guides

Bundles <bundles/index>
Connection types <connections/index>
Message queues <message-queues/index>
Operators <operators/index>
Expand Down
5 changes: 5 additions & 0 deletions providers/microsoft/azure/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ hooks:
python-modules:
- airflow.providers.microsoft.azure.hooks.powerbi

bundles:
- integration-name: Microsoft Azure Blob Storage
python-modules:
- airflow.providers.microsoft.azure.bundles.wasb

triggers:
- integration-name: Microsoft Azure Batch
python-modules:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import os
from pathlib import Path

import structlog

from airflow.dag_processing.bundles.base import BaseDagBundle
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook


class WasbDagBundle(BaseDagBundle):
"""
WASB Dag bundle - exposes a directory in Azure Blob Storage as a Dag bundle.

This allows Airflow to load Dags directly from an Azure Blob Storage container.

:param wasb_conn_id: Airflow connection ID for Azure Blob Storage. Defaults to WasbHook.default_conn_name.
:param container_name: The name of the blob container containing the Dag files.
:param prefix: Optional subdirectory within the container where the Dags are stored.
If empty, Dags are assumed to be at the root of the container.
"""

supports_versioning = False

def __init__(
self,
*,
wasb_conn_id: str = WasbHook.default_conn_name,
container_name: str,
prefix: str = "",
**kwargs,
) -> None:
super().__init__(**kwargs)
self.wasb_conn_id = wasb_conn_id
self.container_name = container_name
self.prefix = prefix
self.wasb_dags_dir: Path = self.base_dir

log = structlog.get_logger(__name__)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would try extending WasbDagBundle with LogginMixin instead of instantiating log yourself. If in the future we would for example say we use some other kind of logger, the WasbDagBundle would then be also updated without any intervention needed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I see others do it like that as well, maybe something to refactor in another PR? Maybe BaseDagBundle should extend the LogginMixin.

self._log = log.bind(
bundle_name=self.name,
version=self.version,
container_name=self.container_name,
prefix=self.prefix,
wasb_conn_id=self.wasb_conn_id,
)
self._wasb_hook: WasbHook | None = None

def _initialize(self):
with self.lock():
if not self.wasb_dags_dir.exists():
self._log.info("Creating local Dags directory: %s", self.wasb_dags_dir)
os.makedirs(self.wasb_dags_dir)

if not self.wasb_dags_dir.is_dir():
raise NotADirectoryError(f"Local Dags path: {self.wasb_dags_dir} is not a directory.")

if not self.wasb_hook.check_for_container(container_name=self.container_name):
raise ValueError(f"WASB container '{self.container_name}' does not exist.")

if self.prefix:
if not self.wasb_hook.check_for_prefix(
container_name=self.container_name, prefix=self.prefix, delimiter="/"
):
raise ValueError(
f"WASB prefix 'wasb://{self.container_name}/{self.prefix}' does not exist."
)
self.refresh()

def initialize(self) -> None:
self._initialize()
super().initialize()

@property
def wasb_hook(self):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could simply become a cached_property I think, no need to keep self._wasb_hook.

if self._wasb_hook is None:
self._wasb_hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
return self._wasb_hook

def __repr__(self):
return (
f"<WasbDagBundle("
f"name={self.name!r}, "
f"container_name={self.container_name!r}, "
f"prefix={self.prefix!r}, "
f"version={self.version!r}"
f")>"
)

def get_current_version(self) -> str | None:
"""Return the current version of the Dag bundle. Currently not supported."""
return None

@property
def path(self) -> Path:
"""Return the local path to the Dag files."""
return self.wasb_dags_dir

def refresh(self) -> None:
"""Refresh the Dag bundle by re-downloading the Dags from Azure Blob Storage."""
if self.version:
raise ValueError("Refreshing a specific version is not supported")

with self.lock():
self._log.debug(
"Downloading Dags from wasb://%s/%s to %s",
self.container_name,
self.prefix,
self.wasb_dags_dir,
)
self.wasb_hook.sync_to_local_dir(
container_name=self.container_name,
prefix=self.prefix,
local_dir=self.wasb_dags_dir,
delete_stale=True,
)

def view_url(self, version: str | None = None) -> str | None:
Comment thread
Nishieee marked this conversation as resolved.
"""
Return a URL for viewing the Dags in Azure Blob Storage. Currently, versioning is not supported.

This method is deprecated and will be removed when the minimum supported Airflow version is 3.1.
Use `view_url_template` instead.
"""
return self.view_url_template()

def view_url_template(self) -> str | None:
"""Return a URL for viewing the Dags in Azure Blob Storage. Currently, versioning is not supported."""
if self.version:
raise ValueError("WASB url with version is not supported")
if hasattr(self, "_view_url_template") and self._view_url_template:
return self._view_url_template
account_url = self.wasb_hook.blob_service_client.url
url = f"{account_url.rstrip('/')}/{self.container_name}"
if self.prefix:
url += f"/{self.prefix}"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return f"{url}/{self.prefix}"

return url
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,12 @@ def get_provider_info():
"python-modules": ["airflow.providers.microsoft.azure.hooks.powerbi"],
},
],
"bundles": [
{
"integration-name": "Microsoft Azure Blob Storage",
"python-modules": ["airflow.providers.microsoft.azure.bundles.wasb"],
}
],
"triggers": [
{
"integration-name": "Microsoft Azure Batch",
Expand Down
Loading
Loading