Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,18 @@ repos:
additional_dependencies: ['pnpm@10.28.1']
pass_filenames: false
require_serial: true
- id: check-ts-sdk-supervisor-schema
name: Check TypeScript SDK supervisor schema is up to date
entry: ./ts-sdk/scripts/ci/prek/check_supervisor_schema.py
language: node
files: >
(?x)
^task-sdk/src/airflow/sdk/execution_time/schema/schema\.json$|
^ts-sdk/src/generated/supervisor\.ts$|
^ts-sdk/scripts/generate-supervisor\.mjs$
additional_dependencies: ['pnpm@10.28.1']
pass_filenames: false
require_serial: true
- id: check-ci-workflows-in-sync
name: Check ci-arm.yml and ci-amd.yml stay in sync
entry: ./scripts/ci/prek/check_ci_workflows_in_sync.py
Expand Down
12 changes: 12 additions & 0 deletions task-sdk/src/airflow/sdk/execution_time/schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -4515,6 +4515,18 @@
"format": "date-time",
"title": "Timestamp",
"type": "string"
},
"partition_key": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Partition Key"
}
},
"required": [
Expand Down
37 changes: 27 additions & 10 deletions ts-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ Public TypeScript interfaces for writing Apache Airflow task handlers.

**Status:** alpha · API will change · Node 22+ · ESM-only

This package defines the user-facing task handler contract: task registration,
runtime context types, and the `TaskClient` interface used for Airflow
Variables, Connections, and XCom. Runtime transports implement this interface
separately.
This package defines the user-facing task handler contract and the coordinator
runtime used to execute registered TypeScript handlers from Airflow.

## Install

Expand All @@ -50,7 +48,7 @@ registerTask({ dagId: "example_dag", taskId: "say_hello" }, sayHello);
Non-`undefined` return values are pushed to XCom under the `"return_value"`
key by the active runtime, matching Python `@task` behavior.

## Intended Coordinator Usage
## Coordinator Usage

Airflow runs TypeScript task bundles through the Python-side
`airflow.sdk.coordinators.node.NodeCoordinator`. Declaring Airflow Dags in
Expand Down Expand Up @@ -95,10 +93,10 @@ queue_to_coordinator = {"typescript": "ts"}
Each configured bundle directory must contain `bundle.mjs` and
`airflow-metadata.yaml`.

TypeScript handlers:
TypeScript entrypoint:

```ts
import { registerTask, type TaskHandlerArgs } from "@apache-airflow/ts-sdk";
import { registerTask, startCoordinator, type TaskHandlerArgs } from "@apache-airflow/ts-sdk";

export async function extract({ client }: TaskHandlerArgs) {
const connection = await client.getConnection("sales_db");
Expand All @@ -123,15 +121,34 @@ export async function transform({ client }: TaskHandlerArgs) {

registerTask({ dagId: "sales_pipeline", taskId: "extract" }, extract);
registerTask({ dagId: "sales_pipeline", taskId: "transform" }, transform);

await startCoordinator();
```

The Python stub defines the Dag dependency graph. The TypeScript handler does
the work and uses `TaskClient` for task-time Airflow data access. Register each
handler with the Python Dag's `dag_id` and the stub task's `task_id`. The
handler function is the reusable task implementation; `registerTask` binds that
handler to a Python stub Dag/task identity for coordinator mode. A future
TypeScript Dag authoring API can attach the same handlers without changing the
handler code.
handler to a Python stub Dag/task identity for coordinator mode.

For larger projects, keep one Airflow entrypoint that imports every module that
registers tasks, then starts the coordinator:

```ts
import "./sales/tasks";
import "./billing/tasks";
import { startCoordinator } from "@apache-airflow/ts-sdk";

await startCoordinator();
```

Airflow launches the bundled entrypoint with `--comm=host:port` and
`--logs=host:port`. `startCoordinator()` connects to those sockets, receives
the task startup message, finds the registered handler for the Dag/task pair,
and reports the terminal task state back to Airflow.

See [`example/`](example/) for a coordinator-runtime example that builds a
`bundle.mjs` with `esbuild` and uses a Python stub Dag.

## TaskClient

Expand Down
2 changes: 1 addition & 1 deletion ts-sdk/eslint.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import tseslint from "typescript-eslint";

export default tseslint.config(
{
ignores: ["dist/**", "node_modules/**", "coverage/**"],
ignores: ["dist/**", "node_modules/**", "coverage/**", "src/generated/**"],
},
js.configs.recommended,
...tseslint.configs.recommended,
Expand Down
3 changes: 3 additions & 0 deletions ts-sdk/example/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.pnpm-store/
dist/
node_modules/
101 changes: 101 additions & 0 deletions ts-sdk/example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# TypeScript Coordinator Runtime Example

This example shows the coordinator-mode shape for TypeScript task handlers:

- `dags/typescript_example.py` declares the Airflow Dag and stub tasks.
- `src/main.ts` registers TypeScript handlers for the same Dag/task IDs and
starts the coordinator runtime.
- `dist/bundle.mjs` is the generated Node.js bundle that Airflow launches.

The TypeScript SDK does not include a packer yet, so this example builds the
bundle with `esbuild` and writes the Airflow metadata file manually.

## Build

Build the SDK first so the example can import the local package:

```bash
cd ts-sdk
pnpm install
pnpm run build
```

Build the example bundle:

```bash
cd ts-sdk/example
pnpm install
pnpm run build
```

Create the metadata file next to the generated bundle:

```bash
node --input-type=module > dist/airflow-metadata.yaml <<'EOF'
import { SUPERVISOR_API_VERSION } from "@apache-airflow/ts-sdk";

console.log(`sdk:
supervisor_schema_version: "${SUPERVISOR_API_VERSION}"`);
EOF
```

The coordinator expects this layout:

```text
ts-sdk/example/dist/
bundle.mjs
airflow-metadata.yaml
```

## Airflow Configuration

Configure Airflow to route the `typescript` queue to the Node coordinator and
point it at the example bundle directory:

```bash
export AIRFLOW__SDK__COORDINATORS='{
"node": {
"classpath": "airflow.sdk.coordinators.node.NodeCoordinator",
"kwargs": {"bundles_root": ["/absolute/path/to/airflow/ts-sdk/example/dist"]}
}
}'
export AIRFLOW__SDK__QUEUE_TO_COORDINATOR='{"typescript": "node"}'
```

Copy `dags/typescript_example.py` into your Airflow Dags folder.

The example also uses one Variable and one Connection:

```bash
airflow variables set typescript_example_greeting "hello from Airflow"
airflow connections add typescript_example_http \
--conn-type http \
--conn-host example.com \
--conn-login user \
--conn-password pass
```

Then start Airflow and trigger the Dag:

```bash
airflow dags trigger typescript_example
```
45 changes: 45 additions & 0 deletions ts-sdk/example/dags/typescript_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from airflow.sdk import dag, task


@task
def python_start():
return "hello from Python"


@task.stub(queue="typescript")
def build_message(): ...


@task.stub(queue="typescript")
def read_connection(): ...


@dag(dag_id="typescript_example", schedule=None, catchup=False, tags=["typescript", "example"])
def typescript_example():
start = python_start()
message = build_message()
read_connection()

start >> message


typescript_example()
19 changes: 19 additions & 0 deletions ts-sdk/example/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "@apache-airflow/ts-sdk-example",
"private": true,
"version": "0.0.0",
"type": "module",
"license": "Apache-2.0",
"scripts": {
"build": "esbuild src/main.ts --bundle --platform=node --format=esm --target=node22 --outfile=dist/bundle.mjs",
"typecheck": "tsc --noEmit"
},
"dependencies": {
"@apache-airflow/ts-sdk": "file:.."
},
"devDependencies": {
"@types/node": "^22.19.17",
"esbuild": "^0.28.1",
"typescript": "^6.0.2"
}
}
55 changes: 55 additions & 0 deletions ts-sdk/example/src/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { registerTask, startCoordinator, type TaskHandlerArgs } from "@apache-airflow/ts-sdk";

const DAG_ID = "typescript_example";

export async function buildMessage({ client }: TaskHandlerArgs) {
const upstream = await client.getXCom<string>({
key: "return_value",
taskId: "python_start",
});
const greeting = await client.getVariable("typescript_example_greeting");
const message = `${greeting ?? "hello from TypeScript"}; upstream=${upstream ?? "missing"}`;

await client.setXCom({ key: "typescript_message", value: message });

return {
message,
upstream,
};
}

export async function readConnection({ client }: TaskHandlerArgs) {
const connection = await client.getConnection("typescript_example_http");

return {
id: connection?.id ?? null,
type: connection?.type ?? null,
host: connection?.host ?? null,
login: connection?.login ?? null,
hasPassword: connection?.password != null,
};
}

registerTask({ dagId: DAG_ID, taskId: "build_message" }, buildMessage);
registerTask({ dagId: DAG_ID, taskId: "read_connection" }, readConnection);

await startCoordinator();
8 changes: 8 additions & 0 deletions ts-sdk/example/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"extends": "../tsconfig.json",
"compilerOptions": {
"noEmit": true,
"rootDir": "."
},
"include": ["src/**/*.ts"]
}
Loading