diff --git a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py index 9550fdd0bc438..a0c3f518fb068 100644 --- a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py +++ b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py @@ -72,16 +72,26 @@ def _socket_address(value: tuple | str) -> tuple[str, int] | None: return None host, port = value[:2] host = str(host) - # Canonicalize IPv4-mapped IPv6 ("::ffff:127.0.0.1" -> "127.0.0.1") so a dual-stack - # client (e.g. the JVM, shown v4-mapped in /proc/net/tcp6) matches the AF_INET - # supervisor socket's plain-IPv4 address in the ownership check below. + # Canonicalize an IPv4 address that a dual-stack client embeds in IPv6 so it matches + # the AF_INET supervisor socket's plain-IPv4 address in the ownership check below. A + # dual-stack JVM's loopback connection is rendered in two different forms depending on + # the platform, and both must collapse to plain "127.0.0.1": + # * IPv4-mapped "::ffff:127.0.0.1" -> "127.0.0.1" (Linux, via /proc/net/tcp6) + # * IPv4-compatible "::127.0.0.1" -> "127.0.0.1" (macOS, via psutil) + # Otherwise the JVM's connection fails the check and every Java task is rejected with + # "process exited with 1 before connecting". try: parsed = ipaddress.ip_address(host) except ValueError: pass else: - if isinstance(parsed, ipaddress.IPv6Address) and parsed.ipv4_mapped is not None: - host = str(parsed.ipv4_mapped) + if isinstance(parsed, ipaddress.IPv6Address): + if parsed.ipv4_mapped is not None: + host = str(parsed.ipv4_mapped) + elif 1 < int(parsed) <= 0xFFFFFFFF: + # IPv4-compatible IPv6: ::/96 with the IPv4 in the low 32 bits. Exclude + # "::" (unspecified) and "::1" (IPv6 loopback), which are not IPv4. + host = str(ipaddress.IPv4Address(int(parsed))) return host, int(port) diff --git a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py index 5a89c73e780d4..62b7fbcf39c17 100644 --- a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py +++ b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py @@ -372,6 +372,41 @@ def test_matches_dual_stack_ipv4_mapped_connection(self): client.close() server.close() + def test_matches_dual_stack_ipv4_compatible_connection(self): + """A dual-stack child whose loopback is rendered in IPv4-compatible form is accepted. + + Companion to :meth:`test_matches_dual_stack_ipv4_mapped_connection` for macOS + (#68938): there the JVM's loopback connection is reported by ``psutil`` as the + deprecated IPv4-compatible ``::127.0.0.1`` rather than the IPv4-mapped + ``::ffff:127.0.0.1`` seen on Linux. Both forms must canonicalize to plain + ``127.0.0.1`` or the ownership check rejects the Java task. The OS will not + reliably establish a routable ``::`` connection on demand, so ``psutil``'s view of + the child's connections is mocked to the form macOS actually reports. + """ + server = _start_server() + _, server_port = server.getsockname() + client = socket.socket() + client.connect(("127.0.0.1", server_port)) + conn, _ = server.accept() + child_port = conn.getpeername()[1] + mock_proc = MagicMock(spec=subprocess.Popen) + mock_proc.pid = os.getpid() + + # On macOS psutil reports the child's dual-stack loopback in IPv4-compatible form. + compat_conn = MagicMock( + laddr=("::127.0.0.1", child_port), + raddr=("::127.0.0.1", server_port), + ) + try: + with patch("airflow.sdk.coordinators._subprocess.psutil.Process") as mock_process: + mock_process.return_value.children.return_value = [] + mock_process.return_value.net_connections.return_value = [compat_conn] + assert _is_connection_from_process(conn, mock_proc) is True + finally: + conn.close() + client.close() + server.close() + def test_rejects_tcp_connection_not_owned_by_child_process(self): server = _start_server() _, port = server.getsockname()