From 973f316e7a97bda12dcc4592ddbccfcccb00c5ad Mon Sep 17 00:00:00 2001 From: Ning Kang Date: Tue, 4 Aug 2020 11:55:22 -0700 Subject: [PATCH 1/2] [BEAM-10637] fix: test stream service start/stop Fixed the start/stop logic so that a controller: 1. can only be started/stopped once; 2. can only be stopped when is started while never stopped before; 3. does not hang indefinitely but noop for redundant or invalid start/stop calls. --- .../testing/test_stream_service.py | 12 ++- .../testing/test_stream_service_test.py | 81 +++++++++++++++++++ 2 files changed, 92 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/testing/test_stream_service.py b/sdks/python/apache_beam/testing/test_stream_service.py index d69465f697e1..798156b52864 100644 --- a/sdks/python/apache_beam/testing/test_stream_service.py +++ b/sdks/python/apache_beam/testing/test_stream_service.py @@ -34,6 +34,8 @@ class TestStreamServiceController(TestStreamServiceServicer): """ def __init__(self, reader, endpoint=None, exception_handler=None): self._server = grpc.server(ThreadPoolExecutor(max_workers=10)) + self._server_started = False + self._server_stopped = False if endpoint: self.endpoint = endpoint @@ -50,9 +52,17 @@ def __init__(self, reader, endpoint=None, exception_handler=None): self._exception_handler = lambda _: False def start(self): - self._server.start() + # A server can only be started if never started and never stopped before. + if not self._server_started and not self._server_stopped: + self._server_started = True + self._server.start() def stop(self): + # A server can only be stopped if already started and never stopped before. + if not self._server_started or self._server_stopped: + return + self._server_started = False + self._server_stopped = True self._server.stop(0) # This was introduced in grpcio 1.24 and might be gone in the future. Keep # this check in case the runtime is on a older, current or future grpcio. diff --git a/sdks/python/apache_beam/testing/test_stream_service_test.py b/sdks/python/apache_beam/testing/test_stream_service_test.py index 01b16a1f7b91..7a5b403d36d7 100644 --- a/sdks/python/apache_beam/testing/test_stream_service_test.py +++ b/sdks/python/apache_beam/testing/test_stream_service_test.py @@ -19,6 +19,7 @@ from __future__ import absolute_import +import sys import unittest import grpc @@ -30,6 +31,13 @@ from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload from apache_beam.testing.test_stream_service import TestStreamServiceController +# TODO(BEAM-8288): clean up the work-around of nose tests using Python2 without +# unittest.mock module. +try: + from unittest.mock import patch +except ImportError: + from mock import patch # type: ignore[misc] + # Nose automatically detects tests if they match a regex. Here, it mistakens # these protos as tests. For more info see the Nose docs at: # https://nose.readthedocs.io/en/latest/writing_tests.html @@ -116,5 +124,78 @@ def test_multiple_sessions(self): self.assertEqual(events_b, expected_events) +@unittest.skipIf( + sys.version_info < (3, 6), 'The tests require at least Python 3.6 to work.') +class TestStreamServiceStartStopTest(unittest.TestCase): + + # Weak internal use needs to be explicitly imported. + from grpc import _server + + def setUp(self): + self.controller = TestStreamServiceController( + EventsReader(expected_key=[('full', EXPECTED_KEY)])) + self.assertFalse(self.controller._server_started) + self.assertFalse(self.controller._server_stopped) + + def tearDown(self): + self.controller.stop() + + def test_start_when_never_started(self): + with patch.object(self._server._Server, + 'start', + wraps=self.controller._server.start) as mock_start: + self.controller.start() + mock_start.assert_called_once() + self.assertTrue(self.controller._server_started) + self.assertFalse(self.controller._server_stopped) + + def test_start_noop_when_already_started(self): + with patch.object(self._server._Server, + 'start', + wraps=self.controller._server.start) as mock_start: + self.controller.start() + mock_start.assert_called_once() + self.controller.start() + mock_start.assert_called_once() + + def test_start_noop_when_already_stopped(self): + with patch.object(self._server._Server, + 'start', + wraps=self.controller._server.start) as mock_start: + self.controller.start() + self.controller.stop() + mock_start.assert_called_once() + self.controller.start() + mock_start.assert_called_once() + + def test_stop_noop_when_not_started(self): + with patch.object(self._server._Server, + 'stop', + wraps=self.controller._server.stop) as mock_stop: + self.controller.stop() + mock_stop.assert_not_called() + + def test_stop_when_already_started(self): + with patch.object(self._server._Server, + 'stop', + wraps=self.controller._server.stop) as mock_stop: + self.controller.start() + mock_stop.assert_not_called() + self.controller.stop() + mock_stop.assert_called_once() + self.assertFalse(self.controller._server_started) + self.assertTrue(self.controller._server_stopped) + + def test_stop_noop_when_already_stopped(self): + with patch.object(self._server._Server, + 'stop', + wraps=self.controller._server.stop) as mock_stop: + self.controller.start() + self.controller.stop() + mock_stop.assert_called_once() + self.controller.stop() + mock_stop.assert_called_once() + + if __name__ == '__main__': unittest.main() From 12684dbc222b8bcc1303703cc6d8f0e74856ef55 Mon Sep 17 00:00:00 2001 From: Ning Kang Date: Tue, 4 Aug 2020 14:56:18 -0700 Subject: [PATCH 2/2] Normalize the if statements in a consistent way. --- sdks/python/apache_beam/testing/test_stream_service.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/testing/test_stream_service.py b/sdks/python/apache_beam/testing/test_stream_service.py index 798156b52864..618e208fbb98 100644 --- a/sdks/python/apache_beam/testing/test_stream_service.py +++ b/sdks/python/apache_beam/testing/test_stream_service.py @@ -53,9 +53,10 @@ def __init__(self, reader, endpoint=None, exception_handler=None): def start(self): # A server can only be started if never started and never stopped before. - if not self._server_started and not self._server_stopped: - self._server_started = True - self._server.start() + if self._server_started or self._server_stopped: + return + self._server_started = True + self._server.start() def stop(self): # A server can only be stopped if already started and never stopped before.