This class is not instantiated directly. Its nested annotations drive the + * [BuilderProcessor] annotation processor, which generates a {@code *Builder} class + * for each class annotated with [Dag]. + * + *
Example: + *
{@code
+ * @Builder.Dag(id = "my_pipeline")
+ * public class MyPipeline {
+ *
+ * @Builder.Task(id = "extract")
+ * public long extract(Client client) { ... }
+ *
+ * @Builder.Task(id = "transform")
+ * public long transform(Client client,
+ * @Builder.XCom(task = "extract") long extracted) { ... }
+ * }
+ * }
+ *
+ * The processor generates {@code MyPipelineBuilder.build()}, which returns a + * fully-wired [Dag] ready to add to a [Bundle]. + */ +class Builder internal constructor() { + /** + * Annotation to automate a Dag-builder pattern. + * + * When applied on a class Foo, this generates a FooBuilder class with a static build method + * to create the Dag structure automatically. + * + * @param id Override the Dag ID. If empty or not provided, the annotated class's name is used by default. + * @param to Name of the Dag-builder class. If empty or not provided, use the annotated class name + "Builder". + */ + @Target(AnnotationTarget.CLASS) + @MustBeDocumented + annotation class Dag( + val id: String = "", + val to: String = "", + ) + + /** + * Annotation to automate task definition in a Dag-builder pattern. + * + * @param id Override the task ID. If empty or not provided, the annotated function's name is used by default. + */ + @Target(AnnotationTarget.FUNCTION) + @MustBeDocumented + annotation class Task( + val id: String = "", + ) + + /** + * Annotation to mark a task definition's method parameter as an XCom input. + * + * @param task The task ID to pull. If empty or not given, the annotated parameter's name is used by default. + * @param key The XCom key to pull. Defaults to the task's return value. + */ + @Target(AnnotationTarget.VALUE_PARAMETER) + @MustBeDocumented + annotation class XCom( + val task: String = "", + val key: String = Client.XCOM_RETURN_KEY, + ) +} + +/** + * @suppress + * + * Annotation processor for [Builder.Dag]. Registered as a standard javac processor via + * {@code META-INF/services/javax.annotation.processing.Processor}; not intended to be + * instantiated or referenced directly. + * + *
For each class annotated with [Builder.Dag], generates a {@code *Builder} class + * containing: + *
[Builder.XCom]-annotated parameters are resolved via {@code client.getXCom} in the
+ * generated {@code execute} body, with the result cast to the parameter's declared type.
+ * Non-{@code void} return values are forwarded to {@code client.setXCom}.
+ */
+@SupportedAnnotationTypes("org.apache.airflow.sdk.Builder.Dag")
+@SupportedSourceVersion(SourceVersion.RELEASE_11)
+class BuilderProcessor : AbstractProcessor() {
+ override fun process(
+ annotations: Set Build a [Bundle] by implementing [BundleBuilder], then pass it to [Server.serve]
+ * to start accepting task-execution requests.
+ *
+ * @property version Implementation version read from the JAR manifest
+ * ({@code Implementation-Version}); falls back to {@code "0"} if absent.
+ * @property dags All registered Dags keyed by [Dag.id]. Insertion order is preserved.
+ */
+class Bundle(
+ val version: String,
+ dags: Iterable Implement this interface in the class named as {@code Main-Class} in your JAR
+ * manifest. The build tooling instantiates it at compile time to record Dag and task
+ * IDs in the manifest, enabling inspection without running the full process.
+ *
+ * Called once during [build]; Dag IDs must be unique across the returned collection.
+ */
+ fun getDags(): Iterable The bundle version is taken from the JAR's {@code Implementation-Version} manifest
+ * attribute, or {@code "0"} if that attribute is absent.
+ *
+ * @throws IllegalArgumentException if any two Dags share the same ID.
+ */
+ fun build(): Bundle = Bundle(this::class.java.`package`.implementationVersion ?: "0", getDags())
+}
diff --git a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Client.kt b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Client.kt
new file mode 100644
index 0000000000000..f4dee2d2f6e1d
--- /dev/null
+++ b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Client.kt
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.airflow.sdk
+
+import org.apache.airflow.sdk.execution.Client
+import org.apache.airflow.sdk.execution.comm.StartupDetails
+
+/**
+ * A connection registered in Airflow's connection store.
+ *
+ * @property id Connection ID as configured in Airflow.
+ * @property type Connection type (e.g. {@code "http"}, {@code "postgres"}), if configured.
+ * @property host Hostname, if configured.
+ * @property schema Schema or database name, if configured.
+ * @property login Username, if configured.
+ * @property password Password, if configured.
+ * @property port Port number, if configured.
+ * @property extra JSON blob of extra connection parameters, if configured.
+ */
+data class Connection(
+ @JvmField val id: String,
+ @JvmField val type: String?,
+ @JvmField val host: String?,
+ @JvmField val schema: String?,
+ @JvmField val login: String?,
+ @JvmField val password: String?,
+ @JvmField val port: Int?,
+ @JvmField val extra: Any?,
+)
+
+/**
+ * Client for Airflow API calls scoped to the current task instance.
+ *
+ * An instance is provided when a task is being executed. All reads and writes are
+ * automatically scoped to the current Dag run and task instance unless you pass
+ * explicit IDs.
+ */
+class Client internal constructor(
+ internal val details: StartupDetails,
+ internal val impl: Client,
+) {
+ internal companion object {
+ /**
+ * Default XCom key used for a task's return value ({@value}).
+ */
+ const val XCOM_RETURN_KEY = "return_value"
+ }
+
+ /**
+ * Retrieves a connection from the Airflow connection store.
+ *
+ * @param id Connection ID as configured in Airflow.
+ * @return The connection.
+ * @throws ApiError if the connection does not exist or the API call fails.
+ */
+ fun getConnection(id: String): Connection =
+ with(impl.getConnection(id)) {
+ Connection(
+ id = connId,
+ type = connType,
+ host = host as String?,
+ schema = schema as String?,
+ login = login as String?,
+ password = password as String?,
+ port = port as Int?,
+ extra = extra,
+ )
+ }
+
+ /**
+ * Retrieves an Airflow variable.
+ *
+ * @param key Variable key.
+ * @return The variable value, or {@code null} if the variable is not set.
+ * @throws ApiError if the API call fails.
+ */
+ fun getVariable(key: String): Any? = impl.getVariable(key).value
+
+ /**
+ * Reads an XCom value pushed by another task.
+ *
+ * The current Dag run's [dagId][TaskInstance.dagId] and [runId][TaskInstance.runId]
+ * are used by default; override them only when reading across Dags or runs.
+ *
+ * @param key XCom key to read; defaults to [XCOM_RETURN_KEY].
+ * @param dagId Dag that owns the XCom; defaults to the current Dag.
+ * @param taskId Task that pushed the XCom.
+ * @param runId Run that produced the XCom; defaults to the current run.
+ * @param mapIndex Map index of the source task instance, or {@code null} for non-mapped tasks.
+ * @param includePriorDates If {@code true}, also search earlier Dag-run dates.
+ * @return The XCom value, or {@code null} if none was pushed.
+ * @throws ApiError if the API call fails.
+ */
+ @JvmOverloads fun getXCom(
+ key: String = XCOM_RETURN_KEY,
+ dagId: String = details.ti.dagId,
+ taskId: String,
+ runId: String = details.ti.runId,
+ mapIndex: Int? = null,
+ includePriorDates: Boolean = false,
+ ): Any? =
+ impl
+ .getXCom(
+ key = key,
+ dagId = dagId,
+ taskId = taskId,
+ runId = runId,
+ mapIndex = mapIndex,
+ includePriorDates = includePriorDates,
+ ).value
+
+ /**
+ * Pushes an XCom value for downstream tasks to read.
+ *
+ * The current task instance's identifiers are used automatically.
+ *
+ * @param key XCom key; defaults to [XCOM_RETURN_KEY].
+ * @param value Value to push. Must be JSON-serializable.
+ * @throws ApiError if the API call fails.
+ */
+ @JvmOverloads fun setXCom(
+ key: String = XCOM_RETURN_KEY,
+ value: Any,
+ ) = impl.setXCom(
+ key = key,
+ value = value,
+ dagId = details.ti.dagId,
+ taskId = details.ti.taskId,
+ runId = details.ti.runId,
+ mapIndex = details.ti.mapIndex ?: -1,
+ )
+}
diff --git a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Context.kt b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Context.kt
new file mode 100644
index 0000000000000..4842c393154e6
--- /dev/null
+++ b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Context.kt
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.airflow.sdk
+
+import org.apache.airflow.sdk.execution.comm.StartupDetails
+
+/**
+ * Identifies the Dag run that the current task instance belongs to.
+ *
+ * @property dagId ID of the Dag being run.
+ * @property runId Unique identifier for this Dag run.
+ */
+data class DagRun(
+ @JvmField val dagId: String,
+ @JvmField val runId: String,
+)
+
+/**
+ * Identifies the task instance that is currently executing.
+ *
+ * @property dagId ID of the parent Dag.
+ * @property runId ID of the Dag run that triggered this instance.
+ * @property taskId ID of the task within the Dag.
+ * @property mapIndex Index within a mapped task group, if this is a mapped task instance.
+ * @property tryNumber How many times this task instance has been attempted (1-based).
+ */
+data class TaskInstance(
+ @JvmField val dagId: String,
+ @JvmField val runId: String,
+ @JvmField val taskId: String,
+ @JvmField val mapIndex: Int?,
+ @JvmField val tryNumber: Int,
+)
+
+/**
+ * Runtime context passed to the task execution.
+ *
+ * Provides metadata about the current Dag run and task instance.
+ * Use [Client] to interact with Airflow at runtime (connections, variables, XComs).
+ *
+ * @property dagRun Dag run the currently executing task instance belongs to.
+ * @property ti Currently executing task instance.
+ */
+data class Context(
+ @JvmField val dagRun: DagRun,
+ @JvmField val ti: TaskInstance,
+) {
+ internal companion object {
+ fun from(request: StartupDetails): Context =
+ Context(
+ dagRun = with(request.tiContext.dagRun) { DagRun(dagId, runId) },
+ ti = with(request.ti) { TaskInstance(dagId, runId, taskId, mapIndex, tryNumber) },
+ )
+ }
+}
diff --git a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Dag.kt b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Dag.kt
new file mode 100644
index 0000000000000..c7bb2277b55c6
--- /dev/null
+++ b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Dag.kt
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.airflow.sdk
+
+import kotlin.Throws
+
+/**
+ * A collection of tasks with directional dependencies.
+ *
+ * Create a [Dag] directly and register tasks with [addTask].
+ *
+ * The [Builder.Dag] annotation should generally be preferred in user code, where the
+ * annotation processor generates the wiring for you. Only use this class directly if
+ * you need to do low-level plumbing.
+ *
+ * @param id Dag identifier. Must contain only ASCII alphanumeric characters, dashes,
+ * dots, or underscores; must be unique within a [Bundle].
+ *
+ * @see Builder.Dag
+ */
+class Dag(
+ val id: String, // TODO: charset check?
+) {
+ internal var tasks = mutableMapOf The class must have a public no-argument constructor and implement [Task].
+ * Task IDs must be unique within a Dag.
+ *
+ * @param id Task identifier, unique within this Dag.
+ * @param definition Class that implements [Task]. Must have a public no-arg constructor.
+ * @return This Dag, for chaining.
+ */
+ fun addTask(
+ id: String,
+ definition: Class Prefer using the [Builder.Task] annotation with [Builder.Dag] to have the
+ * annotation processor generate an implementation for you. Only use this interface if
+ * you need to do low-level plumbing.
+ *
+ * Implement this interface to define task logic. Airflow instantiates the class via
+ * its no-argument constructor, then calls [execute] once per task-instance run.
+ *
+ * @see Builder.Dag
+ * @see Builder.Task
+ */
+interface Task {
+ /**
+ * Executes this task.
+ *
+ * Any exception thrown marks the task instance as failed. Use [client] to read
+ * connections, variables, pull XComs, or to push an XCom for downstream tasks.
+ *
+ * @param context Runtime metadata for the current Dag run and task instance.
+ * @param client Client for Airflow API calls scoped to this task instance.
+ * @throws Exception on failure; the task instance is marked failed.
+ */
+ @Throws(Exception::class)
+ fun execute(
+ context: Context,
+ client: Client,
+ )
+}
diff --git a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Server.kt b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Server.kt
new file mode 100644
index 0000000000000..e963863533213
--- /dev/null
+++ b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Server.kt
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.airflow.sdk
+
+import com.xenomachina.argparser.ArgParser
+import io.ktor.network.selector.SelectorManager
+import io.ktor.network.sockets.InetSocketAddress
+import io.ktor.network.sockets.aSocket
+import io.ktor.network.sockets.openReadChannel
+import io.ktor.network.sockets.openWriteChannel
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.apache.airflow.sdk.execution.CoordinatorComm
+import org.apache.airflow.sdk.execution.LogSender
+import org.apache.airflow.sdk.execution.Logger
+import kotlin.text.substringAfterLast
+import kotlin.text.substringBeforeLast
+
+private class Args(
+ parser: ArgParser,
+) {
+ private fun parseAddress(address: String): InetSocketAddress =
+ InetSocketAddress(
+ address.substringBeforeLast(':'),
+ address.substringAfterLast(':').toInt(),
+ )
+
+ val comm by parser.storing("--comm", help = "Address (host:port) to communicate with parent") {
+ parseAddress(this)
+ }
+ val logs by parser.storing("--logs", help = "Address (host:port) to send Airflow logs to") {
+ parseAddress(this)
+ }
+}
+
+/**
+ * Thrown when an Airflow API call returns an error response.
+ *
+ * Extends [IllegalStateException] so callers can handle it without checked-exception
+ * machinery.
+ */
+class ApiError(
+ message: String,
+) : IllegalStateException(message)
+
+/**
+ * Connects this JVM process to the Airflow coordinator and dispatches task-execution
+ * requests to the registered [Bundle].
+ *
+ * The typical entry point is:
+ * The process exits when the coordinator closes the connection (normally after
+ * one task-instance execution).
+ */
+class Server(
+ private val comm: InetSocketAddress,
+ private val logs: InetSocketAddress,
+) {
+ companion object {
+ /**
+ * Parses coordinator addresses from command-line arguments and returns a
+ * ready-to-use [Server].
+ *
+ * The arguments are supplied automatically by Airflow and are not intended
+ * to be constructed by hand:
+ * This is a convenience wrapper around [serveAsync] for use from a plain
+ * {@code main} method. Prefer [serveAsync] when calling from an existing coroutine.
+ * The call returns when the coordinator closes the connection (normally after
+ * one task-instance execution).
+ *
+ * @param bundle Bundle containing all Dags this process can execute.
+ *
+ * @see [serveAsync]
+ */
+ fun serve(bundle: Bundle) {
+ runBlocking { launch { serveAsync(bundle) } }
+ }
+
+ /**
+ * Suspending entry point: connects to the coordinator and serves task-execution
+ * requests from the given [bundle].
+ *
+ * Opens both the task-execution channel ({@code --comm}) and the log-forwarding
+ * channel ({@code --logs}) concurrently, then processes incoming requests until the
+ * coordinator closes the connection (normally after one task-instance execution).
+ * The coroutine returns once both channels have been closed.
+ *
+ * Use this variant when calling from an existing coroutine scope; use the
+ * blocking [serve] from a plain {@code main} method.
+ *
+ * @param bundle Bundle containing all Dags this process can execute.
+ *
+ * @see [serve]
+ */
+ suspend fun serveAsync(bundle: Bundle) =
+ coroutineScope {
+ launch {
+ aSocket(SelectorManager(Dispatchers.IO)).tcp().connect(comm).use { socket ->
+ logger.debug("Connected comm", mapOf("addr" to comm))
+ CoordinatorComm(
+ bundle,
+ socket.openReadChannel(),
+ socket.openWriteChannel(autoFlush = true),
+ ).startProcessing()
+ }
+ }
+ launch {
+ aSocket(SelectorManager(Dispatchers.IO)).tcp().connect(logs).use { socket ->
+ logger.debug("Connected logs", mapOf("addr" to logs))
+ LogSender.configure(socket.openWriteChannel(autoFlush = true))
+ }
+ }
+ }
+}
diff --git a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Client.kt b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Client.kt
new file mode 100644
index 0000000000000..306e23640df1a
--- /dev/null
+++ b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Client.kt
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.airflow.sdk.execution
+
+import kotlinx.coroutines.runBlocking
+import org.apache.airflow.sdk.execution.comm.ConnectionResult
+import org.apache.airflow.sdk.execution.comm.GetConnection
+import org.apache.airflow.sdk.execution.comm.GetVariable
+import org.apache.airflow.sdk.execution.comm.GetXCom
+import org.apache.airflow.sdk.execution.comm.SetXCom
+import org.apache.airflow.sdk.execution.comm.VariableResult
+import org.apache.airflow.sdk.execution.comm.XComResult
+
+/**
+ * @suppress
+ *
+ * Transport contract between [org.apache.airflow.sdk.Client] and the coordinator.
+ *
+ * Implementations translate each SDK method call into the appropriate
+ * message and unwrap the raw response model into the value expected by the public
+ * SDK layer.
+ *
+ * Currently, the only production implementation is [CoordinatorClient]. A test
+ * double can be supplied via the internal [org.apache.airflow.sdk.Client]
+ * constructor to exercise task logic without a live coordinator.
+ */
+interface Client {
+ fun getConnection(id: String): ConnectionResult
+
+ fun getVariable(key: String): VariableResult
+
+ fun getXCom(
+ key: String,
+ dagId: String,
+ taskId: String,
+ runId: String,
+ mapIndex: Int? = null,
+ includePriorDates: Boolean = false,
+ ): XComResult
+
+ fun setXCom(
+ key: String,
+ value: Any,
+ dagId: String,
+ taskId: String,
+ runId: String,
+ mapIndex: Int,
+ )
+}
+
+/**
+ * @suppress
+ *
+ * Production [Client] implementation backed by a live comm.
+ *
+ * Each method serializes the request into the appropriate message type (e.g.
+ * [GetConnection], [GetXCom]), sends it over the comm, and returns the
+ * unwrapped response model. All calls block the calling thread because task
+ * [execute][org.apache.airflow.sdk.Task.execute] runs on a plain thread, not
+ * inside a coroutine.
+ */
+class CoordinatorClient(
+ val exec: CoordinatorComm,
+) : Client {
+ override fun getConnection(id: String) =
+ runBlocking {
+ exec.communicate{@code
+ * public class MyBundleBuilder implements BundleBuilder {
+ * @Override
+ * public Iterable
+ */
+interface BundleBuilder {
+ /**
+ * Returns all [Dag]s that belong to this bundle.
+ *
+ * {@code
+ * public static void main(String[] args) {
+ * Server.create(args).serve(new MyBundleBuilder().build());
+ * }
+ * }
+ *
+ *
+ *
+ *
+ * @param args Command-line arguments as received by {@code main}.
+ * @return A configured [Server] ready to call [serve].
+ */
+ @JvmStatic
+ fun create(args: Array