Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions task-sdk/src/airflow/sdk/coordinators/_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,26 @@ def _socket_address(value: tuple | str) -> tuple[str, int] | None:
return None
host, port = value[:2]
host = str(host)
# Canonicalize IPv4-mapped IPv6 ("::ffff:127.0.0.1" -> "127.0.0.1") so a dual-stack
# client (e.g. the JVM, shown v4-mapped in /proc/net/tcp6) matches the AF_INET
# supervisor socket's plain-IPv4 address in the ownership check below.
# Canonicalize an IPv4 address that a dual-stack client embeds in IPv6 so it matches
# the AF_INET supervisor socket's plain-IPv4 address in the ownership check below. A
# dual-stack JVM's loopback connection is rendered in two different forms depending on
# the platform, and both must collapse to plain "127.0.0.1":
# * IPv4-mapped "::ffff:127.0.0.1" -> "127.0.0.1" (Linux, via /proc/net/tcp6)
# * IPv4-compatible "::127.0.0.1" -> "127.0.0.1" (macOS, via psutil)
# Otherwise the JVM's connection fails the check and every Java task is rejected with
# "process exited with 1 before connecting".
try:
parsed = ipaddress.ip_address(host)
except ValueError:
pass
else:
if isinstance(parsed, ipaddress.IPv6Address) and parsed.ipv4_mapped is not None:
host = str(parsed.ipv4_mapped)
if isinstance(parsed, ipaddress.IPv6Address):
if parsed.ipv4_mapped is not None:
host = str(parsed.ipv4_mapped)
elif 1 < int(parsed) <= 0xFFFFFFFF:
# IPv4-compatible IPv6: ::/96 with the IPv4 in the low 32 bits. Exclude
# "::" (unspecified) and "::1" (IPv6 loopback), which are not IPv4.
host = str(ipaddress.IPv4Address(int(parsed)))
return host, int(port)


Expand Down
35 changes: 35 additions & 0 deletions task-sdk/tests/task_sdk/coordinators/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,41 @@ def test_matches_dual_stack_ipv4_mapped_connection(self):
client.close()
server.close()

def test_matches_dual_stack_ipv4_compatible_connection(self):
"""A dual-stack child whose loopback is rendered in IPv4-compatible form is accepted.

Companion to :meth:`test_matches_dual_stack_ipv4_mapped_connection` for macOS
(#68938): there the JVM's loopback connection is reported by ``psutil`` as the
deprecated IPv4-compatible ``::127.0.0.1`` rather than the IPv4-mapped
``::ffff:127.0.0.1`` seen on Linux. Both forms must canonicalize to plain
``127.0.0.1`` or the ownership check rejects the Java task. The OS will not
reliably establish a routable ``::`` connection on demand, so ``psutil``'s view of
the child's connections is mocked to the form macOS actually reports.
"""
server = _start_server()
_, server_port = server.getsockname()
client = socket.socket()
client.connect(("127.0.0.1", server_port))
conn, _ = server.accept()
child_port = conn.getpeername()[1]
mock_proc = MagicMock(spec=subprocess.Popen)
mock_proc.pid = os.getpid()

# On macOS psutil reports the child's dual-stack loopback in IPv4-compatible form.
compat_conn = MagicMock(
laddr=("::127.0.0.1", child_port),
raddr=("::127.0.0.1", server_port),
)
try:
with patch("airflow.sdk.coordinators._subprocess.psutil.Process") as mock_process:
mock_process.return_value.children.return_value = []
mock_process.return_value.net_connections.return_value = [compat_conn]
assert _is_connection_from_process(conn, mock_proc) is True
finally:
conn.close()
client.close()
server.close()

def test_rejects_tcp_connection_not_owned_by_child_process(self):
server = _start_server()
_, port = server.getsockname()
Expand Down
Loading