diff --git a/distributed/comm/core.py b/distributed/comm/core.py index 319e5580874..82f2965d8ec 100644 --- a/distributed/comm/core.py +++ b/distributed/comm/core.py @@ -21,6 +21,10 @@ class CommClosedError(IOError): pass +class FatalCommClosedError(CommClosedError): + pass + + class Comm(with_metaclass(ABCMeta)): """ A message-oriented communication object, representing an established @@ -184,6 +188,8 @@ def _raise(error): comm = yield gen.with_timeout(timedelta(seconds=deadline - time()), future, quiet_exceptions=EnvironmentError) + except FatalCommClosedError: + raise except EnvironmentError as e: error = str(e) if time() < deadline: diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 4db9ca2ad41..a838b08f360 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -24,7 +24,7 @@ from .registry import Backend, backends from .addressing import parse_host_port, unparse_host_port -from .core import Comm, Connector, Listener, CommClosedError +from .core import Comm, Connector, Listener, CommClosedError, FatalCommClosedError from .utils import (to_frames, from_frames, get_tcp_server_address, ensure_concrete_host) @@ -121,6 +121,9 @@ def convert_stream_closed_error(obj, exc): if exc.real_error is not None: # The stream was closed because of an underlying OS error exc = exc.real_error + if ssl and isinstance(exc, ssl.SSLError): + if 'UNKNOWN_CA' in exc.reason: + raise FatalCommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc)) raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc)) else: raise CommClosedError("in %s: %s" % (obj, exc)) @@ -329,6 +332,13 @@ def connect(self, address, deserialize=True, **connection_args): stream = yield client.connect(ip, port, max_buffer_size=MAX_BUFFER_SIZE, **kwargs) + # Under certain circumstances tornado will have a closed connnection with an error and not raise + # a StreamClosedError. + # + # This occurs with tornado 5.x and openssl 1.1+ + if stream.closed() and stream.error: + raise StreamClosedError(stream.error) + except StreamClosedError as e: # The socket connect() call failed convert_stream_closed_error(self, e) diff --git a/distributed/tests/test_security.py b/distributed/tests/test_security.py index 76003f3c73b..7ebd414ca24 100644 --- a/distributed/tests/test_security.py +++ b/distributed/tests/test_security.py @@ -25,6 +25,14 @@ # Note this cipher uses RSA auth as this matches our test certs FORCED_CIPHER = 'ECDHE-RSA-AES128-GCM-SHA256' +TLS_13_CIPHERS = [ + 'TLS_AES_128_GCM_SHA256', + 'TLS_AES_256_GCM_SHA384', + 'TLS_CHACHA20_POLY1305_SHA256', + 'TLS_AES_128_CCM_SHA256', + 'TLS_AES_128_CCM_8_SHA256', +] + def test_defaults(): with new_config({}): @@ -201,7 +209,12 @@ def many_ciphers(ctx): ctx = d['ssl_context'] basic_checks(ctx) if sys.version_info >= (3, 6): - assert len(ctx.get_ciphers()) == 1 + supported_ciphers = ctx.get_ciphers() + tls_12_ciphers = [c for c in supported_ciphers if c['protocol'] == 'TLSv1.2'] + assert len(tls_12_ciphers) == 1 + tls_13_ciphers = [c for c in supported_ciphers if c['protocol'] == 'TLSv1.3'] + if len(tls_13_ciphers): + assert len(tls_13_ciphers) == 3 def test_listen_args(): @@ -255,7 +268,12 @@ def many_ciphers(ctx): ctx = d['ssl_context'] basic_checks(ctx) if sys.version_info >= (3, 6): - assert len(ctx.get_ciphers()) == 1 + supported_ciphers = ctx.get_ciphers() + tls_12_ciphers = [c for c in supported_ciphers if c['protocol'] == 'TLSv1.2'] + assert len(tls_12_ciphers) == 1 + tls_13_ciphers = [c for c in supported_ciphers if c['protocol'] == 'TLSv1.3'] + if len(tls_13_ciphers): + assert len(tls_13_ciphers) == 3 @gen_test() @@ -306,7 +324,7 @@ def handle_comm(comm): comm = yield connect(listener.contact_address, connection_args=forced_cipher_sec.get_connection_args('worker')) cipher, _, _, = comm.extra_info['cipher'] - assert cipher == FORCED_CIPHER + assert cipher in [FORCED_CIPHER] + TLS_13_CIPHERS comm.abort()