Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions providers/ssh/docs/operators/ssh_remote_job.rst
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ Parameters

* ``remote_os`` (str, optional): Remote OS type (``"auto"``, ``"posix"``, ``"windows"``). Default: ``"auto"``
* ``skip_on_exit_code`` (int or list, optional): Exit code(s) that should cause task to skip instead of fail
* ``conn_timeout`` (int, optional): SSH connection timeout in seconds
* ``banner_timeout`` (float, optional): Seconds to wait for the SSH banner. Default: 30.0
* ``conn_retry_attempts`` (int, optional): How many times to attempt the initial SSH connection for
submission and cleanup before failing. Default: 5. Raise this for large fan-outs where the remote
``sshd`` transiently refuses connections (see :ref:`High Fan-out <howto/operator:SSHRemoteJobOperator:fanout>`)
* ``cleanup_retries`` (int, optional): How many times to retry remote directory cleanup before giving up
and leaving the directory in place. Default: 3
Comment thread
kaxil marked this conversation as resolved.

Remote OS Detection
-------------------
Expand Down Expand Up @@ -213,7 +220,9 @@ Limitations and Considerations
-------------------------------

**Network Interruptions**: While the operator is resilient to disconnections during monitoring,
the initial job submission must succeed. If submission fails, the task will fail immediately.
the initial job submission must succeed. The connection used for submission is retried
(``conn_retry_attempts``); if every attempt fails, the task fails immediately. The trigger also
reconnects automatically if the monitoring connection drops mid-job.

**Remote Process Management**: Jobs are detached using ``nohup`` (POSIX) or ``Start-Process`` (Windows).
If the remote host reboots during job execution, the job will be lost.
Expand All @@ -231,7 +240,36 @@ tasks can run on the same remote host without conflicts.

**Cleanup**: Use ``cleanup="on_success"`` or ``cleanup="always"`` to avoid accumulating
job directories on the remote host. For debugging, use ``cleanup="never"`` and manually
inspect the job directory.
inspect the job directory. Cleanup runs only when the job reaches completion, so tasks that
are killed or time out can still leave a directory behind; for those, add a server-side TTL
reaper (for example ``systemd-tmpfiles`` or a cron job) for the base directory.

.. _howto/operator:SSHRemoteJobOperator:fanout:

High Fan-out (Many Concurrent Tasks)
-------------------------------------

Many tasks targeting the same SSH server at once (a large ``.expand()`` fan-out, parallel DAG
runs, or just high concurrency) can overwhelm it. Each remote
command opens a new SSH connection, and the remote ``sshd`` throttles concurrent
*unauthenticated* connections via ``MaxStartups`` (default ``10:30:100``: start randomly
dropping at 10 concurrent, reaching 100% at 100). A dropped connection surfaces on the client
as::

paramiko ... Error reading SSH protocol banner

This is the server closing the socket before the handshake, not a slow banner, so raising
``banner_timeout`` does not help.

The operator and trigger keep the connection rate low: submission reuses a single connection
for OS detection and the submit itself, and the trigger holds **one** connection for the whole
poll loop instead of reconnecting on every status check. To push a high fan-out further:

* Raise ``MaxStartups`` (and ``MaxSessions``) on the remote ``sshd`` -- this is the direct fix.
* Increase ``conn_retry_attempts`` so transient refusals during the initial burst are retried.
* Cap how many mapped tasks run at once with ``max_active_tis_per_dag`` (or a pool) instead of
releasing the entire fan-out simultaneously. See the "Placing Limits on Mapped Tasks" section of
:doc:`apache-airflow:authoring-and-scheduling/dynamic-task-mapping` for the available limits.

Comparison with SSHOperator
----------------------------
Expand Down
25 changes: 24 additions & 1 deletion providers/ssh/src/airflow/providers/ssh/hooks/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ class SSHHook(BaseHook):
lifetime of the transport
:param ciphers: list of ciphers to use in order of preference
:param auth_timeout: timeout (in seconds) for the attempt to authenticate with the remote_host
:param conn_retry_attempts: number of times to attempt the initial SSH connection before
Comment thread
kaxil marked this conversation as resolved.
giving up (default 3). Raising this helps when many tasks target the same SSH server at
once and some connections are transiently refused (e.g. ``sshd`` ``MaxStartups`` throttling).
"""

# List of classes to try loading private keys as, ordered (roughly) by most common to least common
Expand Down Expand Up @@ -130,9 +133,11 @@ def __init__(
ciphers: list[str] | None = None,
auth_timeout: int | None = None,
host_proxy_cmd: str | None = None,
conn_retry_attempts: int = 3,
) -> None:
super().__init__()
self.ssh_conn_id = ssh_conn_id
self.conn_retry_attempts = max(1, conn_retry_attempts)
self.remote_host = remote_host
self.username = username
self.password = password
Expand Down Expand Up @@ -344,7 +349,7 @@ def log_before_sleep(retry_state):
for attempt in Retrying(
reraise=True,
wait=wait_fixed(3) + wait_random(0, 2),
stop=stop_after_attempt(3),
stop=stop_after_attempt(self.conn_retry_attempts),
before_sleep=log_before_sleep,
):
with attempt:
Expand Down Expand Up @@ -553,6 +558,7 @@ def __init__(
key_file: str = "",
passphrase: str = "",
private_key: str = "",
keepalive_interval: int = 30,
) -> None:
super().__init__()
self.ssh_conn_id = ssh_conn_id
Expand All @@ -564,6 +570,7 @@ def __init__(
self.key_file = key_file
self.passphrase = passphrase
self.private_key = private_key
self.keepalive_interval = keepalive_interval

def _parse_extras(self, conn: Any) -> None:
"""Parse extra fields from the connection into instance fields."""
Expand Down Expand Up @@ -631,10 +638,26 @@ def _get_value(self_val, conn_val, default=None):
conn_config["client_keys"] = [_private_key]
if self.passphrase:
conn_config["passphrase"] = self.passphrase
if self.keepalive_interval:
# The trigger holds one connection for the whole job; a keepalive stops idle
# NAT/firewall timeouts from silently dropping it between long poll intervals.
conn_config["keepalive_interval"] = self.keepalive_interval

ssh_client_conn = await asyncssh.connect(**conn_config)
return ssh_client_conn

async def get_conn(self):
"""
Open an asyncssh connection that can be reused for multiple commands.

Unlike :meth:`run_command`, the returned connection is **not** closed
automatically; the caller owns its lifecycle (e.g.
``async with await hook.get_conn() as conn: ...`` or an explicit
``conn.close()``). Reusing one connection avoids a new TCP/SSH handshake
per command, which matters when many tasks poll the same SSH server.
"""
return await self._get_conn()

async def run_command(self, command: str, timeout: float | None = None) -> tuple[int, str, str]:
"""
Execute a command on the remote host asynchronously.
Expand Down
Loading
Loading