Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions task-sdk/src/airflow/sdk/coordinators/node/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from __future__ import annotations

import base64
import os
import pathlib
from typing import TYPE_CHECKING, Any
Expand All @@ -41,6 +42,9 @@

BUNDLE_FILENAME = "bundle.mjs"
METADATA_FILENAME = "airflow-metadata.yaml"
EMBEDDED_METADATA_MARKER = b"\n//# airflowMetadata="
# Metadata sits on the bundle's last line, so scanning the trailing 1 MiB is enough.
EMBEDDED_METADATA_TAIL_BYTES = 1 << 20


def _validate_schema_version(instance, _, value) -> str:
Expand All @@ -53,6 +57,38 @@ class _NodeBundle:
schema_version: str = attrs.field(validator=_validate_schema_version)


def _read_embedded_metadata(bundle_path: pathlib.Path) -> dict[str, Any] | None:
"""
Read the manifest ``airflow-ts-pack`` embeds in the bundle itself.

The packer appends the ``airflow-metadata.yaml`` content as a trailing
``//# airflowMetadata=<base64>`` line comment, keeping bundle and metadata
a single artifact. Returns ``None`` when the bundle has no such marker.
"""
try:
with bundle_path.open("rb") as bundle_file:
bundle_file.seek(0, os.SEEK_END)
size = bundle_file.tell()
bundle_file.seek(max(0, size - EMBEDDED_METADATA_TAIL_BYTES))
tail = bundle_file.read()
except OSError as exc:
raise ValueError(f"cannot read {bundle_path.name}: {exc}") from exc

marker_at = tail.rfind(EMBEDDED_METADATA_MARKER)
if marker_at == -1:
return None

payload = tail[marker_at + len(EMBEDDED_METADATA_MARKER) :].split(b"\n", 1)[0].strip()
try:
data = yaml.safe_load(base64.b64decode(payload, validate=True).decode("utf-8"))
except (ValueError, yaml.YAMLError) as exc:
raise ValueError(f"cannot parse embedded airflow metadata: {exc}") from exc

if not isinstance(data, dict):
raise ValueError("embedded airflow metadata must contain a mapping")
return data


def _read_bundle_metadata(metadata_path: pathlib.Path) -> dict[str, Any]:
if not metadata_path.is_file():
raise ValueError(f"missing {METADATA_FILENAME}")
Expand Down Expand Up @@ -85,8 +121,9 @@ def _find_bundle(bundles_root: Sequence[pathlib.Path]) -> _NodeBundle:
"""
Locate the ``.mjs`` entry point in *bundles_root*.

Scans each configured directory for ``bundle.mjs`` and reads the sibling
``airflow-metadata.yaml`` for the bundle's supervisor schema version.
Scans each configured directory for ``bundle.mjs`` and reads the bundle's
supervisor schema version from the metadata embedded in the bundle,
falling back to a sibling ``airflow-metadata.yaml`` sidecar.

This is an ordered fallback search, not Dag/task-aware multi-bundle
routing. The first bundle found wins. A future version can use the
Expand All @@ -99,7 +136,9 @@ def _find_bundle(bundles_root: Sequence[pathlib.Path]) -> _NodeBundle:
if not candidate.is_file():
continue
try:
metadata = _read_bundle_metadata(root / METADATA_FILENAME)
metadata = _read_embedded_metadata(candidate)
if metadata is None:
metadata = _read_bundle_metadata(root / METADATA_FILENAME)
log.debug("Selected TypeScript bundle", path=candidate, root=root)
return _NodeBundle(
path=candidate,
Expand Down Expand Up @@ -155,8 +194,10 @@ class NodeCoordinator(SubprocessCoordinator):
``"node"``, which relies on ``$PATH``).
:param bundles_root: Ordered list of directories scanned for a usable
TypeScript bundle. Each bundle directory must contain ``bundle.mjs``
and ``airflow-metadata.yaml``. This is a fallback search path; it does
not yet route different Dag/task pairs to different bundles.
with embedded metadata (as produced by ``airflow-ts-pack``), or
``bundle.mjs`` plus an ``airflow-metadata.yaml`` sidecar. This is a
fallback search path; it does not yet route different Dag/task pairs
to different bundles.
:param task_startup_timeout: Maximum time the coordinator waits for a task
process to start, in seconds. The default is 10 seconds.
"""
Expand Down
82 changes: 64 additions & 18 deletions task-sdk/tests/task_sdk/coordinators/node/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from __future__ import annotations

import base64
import pathlib

import pytest
Expand Down Expand Up @@ -45,27 +46,36 @@ def _make_ti(dag_id: str = "test_dag", queue: str = "ts") -> TaskInstance:
)


def _metadata_yaml(schema_version: str) -> str:
return "\n".join(
[
'airflow_bundle_metadata_version: "1.0"',
"sdk:",
" language: typescript",
' version: "0.1.0"',
f' supervisor_schema_version: "{schema_version}"',
"source: src/airflow.ts",
"dags:",
" test_dag:",
" tasks:",
" - test_task",
"",
]
)


def write_bundle(root: pathlib.Path, schema_version: str = SCHEMA_VERSION) -> pathlib.Path:
bundle = root / "bundle.mjs"
bundle.write_text("export {};\n", encoding="utf-8")
(root / "airflow-metadata.yaml").write_text(
"\n".join(
[
'airflow_bundle_metadata_version: "1.0"',
"sdk:",
" language: typescript",
' version: "0.1.0"',
f' supervisor_schema_version: "{schema_version}"',
"source: src/airflow.ts",
"dags:",
" test_dag:",
" tasks:",
" - test_task",
"",
]
),
encoding="utf-8",
)
(root / "airflow-metadata.yaml").write_text(_metadata_yaml(schema_version), encoding="utf-8")
return bundle


def write_embedded_bundle(root: pathlib.Path, payload: str | None = None) -> pathlib.Path:
if payload is None:
payload = base64.b64encode(_metadata_yaml(SCHEMA_VERSION).encode("utf-8")).decode("ascii")
bundle = root / "bundle.mjs"
bundle.write_text(f"export {{}};\n//# airflowMetadata={payload}\n", encoding="utf-8")
return bundle


Expand Down Expand Up @@ -105,6 +115,42 @@ def test_find_bundle_returns_bundle_mjs(self, tmp_path):
assert found.path == bundle
assert found.schema_version == SCHEMA_VERSION

def test_find_bundle_reads_embedded_metadata_without_sidecar(self, tmp_path):
bundle = write_embedded_bundle(tmp_path)

found = _find_bundle([tmp_path])

assert found.path == bundle
assert found.schema_version == SCHEMA_VERSION

def test_find_bundle_prefers_embedded_metadata_over_sidecar(self, tmp_path):
bundle = write_embedded_bundle(tmp_path)
(tmp_path / "airflow-metadata.yaml").write_text("[not-a-mapping]\n", encoding="utf-8")

found = _find_bundle([tmp_path])

assert found.path == bundle
assert found.schema_version == SCHEMA_VERSION

@pytest.mark.parametrize(
("payload", "message"),
[
("not-base64!", "cannot parse embedded airflow metadata"),
(base64.b64encode(b"[not-a-mapping]").decode("ascii"), "must contain a mapping"),
],
)
def test_find_bundle_rejects_invalid_embedded_metadata(self, tmp_path, payload, message):
write_embedded_bundle(tmp_path, payload=payload)

with pytest.raises(FileNotFoundError, match=message):
_find_bundle([tmp_path])

def test_find_bundle_rejects_empty_marker_at_end_of_file(self, tmp_path):
(tmp_path / "bundle.mjs").write_text("export {};\n//# airflowMetadata=", encoding="utf-8")

with pytest.raises(FileNotFoundError, match="must contain a mapping"):
_find_bundle([tmp_path])

def test_find_bundle_checks_multiple_roots(self, tmp_path):
first = tmp_path / "first"
second = tmp_path / "second"
Expand Down
31 changes: 27 additions & 4 deletions ts-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ coordinators = {
queue_to_coordinator = {"typescript": "ts"}
```

Each configured bundle directory must contain `bundle.mjs` and
`airflow-metadata.yaml`.
Each configured bundle directory must contain a `bundle.mjs` built with
`airflow-ts-pack` (see [Packing bundles](#packing-bundles)), which embeds the
Airflow metadata in the bundle itself. A `bundle.mjs` without embedded
metadata is also accepted alongside an `airflow-metadata.yaml` sidecar.

TypeScript entrypoint:

Expand Down Expand Up @@ -147,8 +149,29 @@ Airflow launches the bundled entrypoint with `--comm=host:port` and
the task startup message, finds the registered handler for the Dag/task pair,
and reports the terminal task state back to Airflow.

See [`example/`](example/) for a coordinator-runtime example that builds a
`bundle.mjs` with `esbuild` and uses a Python stub Dag.
See [`example/`](example/) for a coordinator-runtime example that packs a
bundle with `airflow-ts-pack` and uses a Python stub Dag.

## Packing bundles

`airflow-ts-pack` produces everything `NodeCoordinator` needs in one command:

```bash
airflow-ts-pack src/main.ts --outdir dist
```

It bundles the entrypoint into `dist/bundle.mjs` with esbuild, runs the
bundle with `--airflow-metadata` so the bundle reports its own registered
Dag/task pairs and supervisor schema version, and embeds that manifest in the
bundle as a trailing `//# airflowMetadata=<base64>` comment. The result is a
single deployable file whose metadata cannot drift from its code; no
hand-written sidecar is needed.

Options:

- `--outdir <dir>` — output directory (default `dist`)
- `--source <name>` — display name of the primary source file shown in the
Airflow UI (default: entry basename)

## TaskClient

Expand Down
19 changes: 4 additions & 15 deletions ts-sdk/example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ This example shows the coordinator-mode shape for TypeScript task handlers:
starts the coordinator runtime.
- `dist/bundle.mjs` is the generated Node.js bundle that Airflow launches.

The TypeScript SDK does not include a packer yet, so this example builds the
bundle with `esbuild` and writes the Airflow metadata file manually.
The build uses the SDK's `airflow-ts-pack` tool, which bundles the entrypoint
with esbuild and embeds the Airflow metadata generated from the bundle's
registered tasks, producing a single deployable file.

## Build

Expand All @@ -39,31 +40,19 @@ pnpm install
pnpm run build
```

Build the example bundle:
Build the example bundle and its metadata:

```bash
cd ts-sdk/example
pnpm install
pnpm run build
```

Create the metadata file next to the generated bundle:

```bash
node --input-type=module > dist/airflow-metadata.yaml <<'EOF'
import { SUPERVISOR_API_VERSION } from "@apache-airflow/ts-sdk";

console.log(`sdk:
supervisor_schema_version: "${SUPERVISOR_API_VERSION}"`);
EOF
```

The coordinator expects this layout:

```text
ts-sdk/example/dist/
bundle.mjs
airflow-metadata.yaml
```

## Airflow Configuration
Expand Down
3 changes: 1 addition & 2 deletions ts-sdk/example/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
"type": "module",
"license": "Apache-2.0",
"scripts": {
"build": "esbuild src/main.ts --bundle --platform=node --format=esm --target=node22 --outfile=dist/bundle.mjs",
"build": "airflow-ts-pack src/main.ts --outdir dist",
"typecheck": "tsc --noEmit"
},
"dependencies": {
"@apache-airflow/ts-sdk": "file:.."
},
"devDependencies": {
"@types/node": "^22.19.17",
"esbuild": "^0.28.1",
"typescript": "^6.0.2"
}
}
6 changes: 5 additions & 1 deletion ts-sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
"type": "module",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"bin": {
"airflow-ts-pack": "./dist/cli/main.js"
},
"homepage": "https://airflow.apache.org",
"repository": {
"type": "git",
Expand Down Expand Up @@ -57,7 +60,8 @@
"node": ">=22"
},
"dependencies": {
"@msgpack/msgpack": "^3.1.2"
"@msgpack/msgpack": "^3.1.2",
"esbuild": "^0.28.1"
},
"devDependencies": {
"@eslint/js": "^10.0.1",
Expand Down
Loading