From 1ae5c234599a023a576071ff7f2cd4054881f8e1 Mon Sep 17 00:00:00 2001 From: Ning Kang Date: Thu, 16 Jul 2020 10:44:39 -0700 Subject: [PATCH 1/2] Interactive: clean up when pipeline is out of scope 1. Completed the cleanup routine for all internal states held by the current interactive environment. 2. Utilized the environment inspector to determine whether a pipeline is out of scope: not assigned to variable and has no inspectable PCollections. 3. Invoked the cleanup every time the user defined pipelines in watched scope are refreshed. Change-Id: Ia0791b865def88e81e7b1595b8430d3a9df9516e --- .../interactive/background_caching_job.py | 2 +- .../interactive/interactive_environment.py | 78 +++++++++++++++---- .../interactive_environment_test.py | 27 ++++++- .../interactive_environment_inspector.py | 17 ++++ .../interactive/options/capture_control.py | 11 +-- 5 files changed, 105 insertions(+), 30 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job.py b/sdks/python/apache_beam/runners/interactive/background_caching_job.py index 117cd598509b..1b05285dd222 100644 --- a/sdks/python/apache_beam/runners/interactive/background_caching_job.py +++ b/sdks/python/apache_beam/runners/interactive/background_caching_job.py @@ -291,7 +291,7 @@ def sizeof_fmt(num, suffix='B'): 'data to start at the same time, all captured data has been ' 'cleared and a new segment of data will be recorded.') - ie.current_env().cleanup() + ie.current_env().cleanup(user_pipeline) ie.current_env().set_cached_source_signature( user_pipeline, current_signature) return is_changed diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment.py b/sdks/python/apache_beam/runners/interactive/interactive_environment.py index 1d28517451a8..4363d177498b 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py @@ -251,18 +251,32 @@ def inspector(self): return self._inspector def cleanup(self, pipeline=None): - """Cleans up cached states for the given pipeline. Cleans up - for all pipelines if no specific pipeline is given.""" + """Cleans up cached states for the given pipeline. Noop if the given + pipeline is absent from the environment. Cleans up for all pipelines + if no pipeline is specified.""" if pipeline: + from apache_beam.runners.interactive import background_caching_job as bcj + bcj.attempt_to_cancel_background_caching_job(pipeline) + bcj.attempt_to_stop_test_stream_service(pipeline) cache_manager = self.get_cache_manager(pipeline) if cache_manager: cache_manager.cleanup() else: + for _, job in self._background_caching_jobs.items(): + if job: + job.cancel() + for _, controller in self._test_stream_service_controllers.items(): + if controller: + controller.stop() for _, cache_manager in self._cache_managers.items(): - cache_manager.cleanup() + if cache_manager: + cache_manager.cleanup() + self.evict_background_caching_job(pipeline) + self.evict_test_stream_service_controller(pipeline) self.evict_computed_pcollections(pipeline) self.evict_cached_source_signature(pipeline) + self.evict_pipeline_result(pipeline) def watch(self, watchable): """Watches a watchable. @@ -343,9 +357,13 @@ def set_pipeline_result(self, pipeline, result): 'apache_beam.runners.runner.PipelineResult or its subclass') self._main_pipeline_results[str(id(pipeline))] = result - def evict_pipeline_result(self, pipeline): - """Evicts the tracking of given pipeline run. Noop if absent.""" - return self._main_pipeline_results.pop(str(id(pipeline)), None) + def evict_pipeline_result(self, pipeline=None): + """Evicts the last run result of the given pipeline. Noop if the pipeline + is absent from the environment. If no pipeline is specified, evicts for all + pipelines.""" + if pipeline: + return self._main_pipeline_results.pop(str(id(pipeline)), None) + self._main_pipeline_results.clear() def pipeline_result(self, pipeline): """Gets the pipeline run result. None if absent.""" @@ -364,16 +382,24 @@ def get_background_caching_job(self, pipeline): """Gets the background caching job started from the given pipeline.""" return self._background_caching_jobs.get(str(id(pipeline)), None) + def evict_background_caching_job(self, pipeline=None): + """Evicts the background caching job started from the given pipeline. Noop + if the given pipeline is absent from the environment. If no pipeline is + specified, evicts for all pipelines.""" + if pipeline: + return self._background_caching_jobs.pop(str(id(pipeline)), None) + self._background_caching_jobs.clear() + def set_test_stream_service_controller(self, pipeline, controller): """Sets the test stream service controller that has started a gRPC server - serving the test stream for any job started from the given user-defined + serving the test stream for any job started from the given user defined pipeline. """ self._test_stream_service_controllers[str(id(pipeline))] = controller def get_test_stream_service_controller(self, pipeline): """Gets the test stream service controller that has started a gRPC server - serving the test stream for any job started from the given user-defined + serving the test stream for any job started from the given user defined pipeline. """ return self._test_stream_service_controllers.get(str(id(pipeline)), None) @@ -381,9 +407,12 @@ def get_test_stream_service_controller(self, pipeline): def evict_test_stream_service_controller(self, pipeline): """Evicts and pops the test stream service controller that has started a gRPC server serving the test stream for any job started from the given - user-defined pipeline. + user defined pipeline. Noop if the given pipeline is absent from the + environment. If no pipeline is specified, evicts for all pipelines. """ - return self._test_stream_service_controllers.pop(str(id(pipeline)), None) + if pipeline: + return self._test_stream_service_controllers.pop(str(id(pipeline)), None) + self._test_stream_service_controllers.clear() def is_terminated(self, pipeline): """Queries if the most recent job (by executing the given pipeline) state @@ -400,13 +429,15 @@ def get_cached_source_signature(self, pipeline): return self._cached_source_signature.get(str(id(pipeline)), set()) def evict_cached_source_signature(self, pipeline=None): + """Evicts the signature generated for each recorded source of the given + pipeline. Noop if the given pipeline is absent from the environment. If no + pipeline is specified, evicts for all pipelines.""" if pipeline: - self._cached_source_signature.pop(str(id(pipeline)), None) - else: - self._cached_source_signature.clear() + return self._cached_source_signature.pop(str(id(pipeline)), None) + self._cached_source_signature.clear() def track_user_pipelines(self): - """Record references to all user-defined pipeline instances watched in + """Record references to all user defined pipeline instances watched in current environment. Current static global singleton interactive environment holds references to @@ -416,11 +447,17 @@ def track_user_pipelines(self): then handle them differently. This is invoked every time a PTransform is to be applied if the current - code execution is under ipython due to the possibility that any user-defined + code execution is under ipython due to the possibility that any user defined pipeline can be re-evaluated through notebook cell re-execution at any time. Each time this is invoked, it will check if there is a cache manager already created for each user defined pipeline. If not, create one for it. + + If a pipeline is no longer watched due to re-execution while its + PCollections are still in watched scope, the pipeline becomes anonymous but + still accessible indirectly through references to its PCollections. This + function also clears up internal states for those anonymous pipelines once + all their PCollections are anonymous. """ self._tracked_user_pipelines = set() for watching in self.watching(): @@ -428,6 +465,17 @@ def track_user_pipelines(self): if isinstance(val, beam.pipeline.Pipeline): self._tracked_user_pipelines.add(val) _ = self.get_cache_manager(val, create_if_absent=True) + all_tracked_pipeline_ids = set(self._background_caching_jobs.keys()).union( + set(self._test_stream_service_controllers.keys()), + set(self._cache_managers.keys()), + {str(id(pcoll.pipeline)) + for pcoll in self._computed_pcolls}, + set(self._cached_source_signature.keys()), + set(self._main_pipeline_results.keys())) + inspectable_pipelines = self._inspector.inspectable_pipelines + for pipeline in all_tracked_pipeline_ids: + if pipeline not in inspectable_pipelines: + self.cleanup(pipeline) @property def tracked_user_pipelines(self): diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py index 6f44dac7f2df..6650c63e60b5 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py @@ -236,14 +236,33 @@ def test_get_cache_manager_creates_cache_manager_if_absent(self): @patch( 'apache_beam.runners.interactive.interactive_environment' '.InteractiveEnvironment.cleanup') - def test_cleanup_invoked_when_cache_manager_is_evicted(self, mocked_cleanup): + def test_track_user_pipeline_cleanup_non_inspectable_pipeline( + self, mocked_cleanup): ie._interactive_beam_env = None ie.new_env() - dummy_pipeline = 'dummy' + dummy_pipeline_1 = beam.Pipeline() + dummy_pipeline_2 = beam.Pipeline() + dummy_pipeline_3 = beam.Pipeline() + dummy_pipeline_4 = beam.Pipeline() + dummy_pcoll = dummy_pipeline_4 | beam.Create([1]) + dummy_pipeline_5 = beam.Pipeline() + dummy_non_inspectable_pipeline = 'dummy' + ie.current_env().watch(locals()) + from apache_beam.runners.interactive.background_caching_job import BackgroundCachingJob + ie.current_env().set_background_caching_job( + dummy_pipeline_1, + BackgroundCachingJob( + runner.PipelineResult(runner.PipelineState.DONE), limiters=[])) + ie.current_env().set_test_stream_service_controller(dummy_pipeline_2, None) ie.current_env().set_cache_manager( - cache.FileBasedCacheManager(), dummy_pipeline) + cache.FileBasedCacheManager(), dummy_pipeline_3) + ie.current_env().mark_pcollection_computed([dummy_pcoll]) + ie.current_env().set_cached_source_signature( + dummy_non_inspectable_pipeline, None) + ie.current_env().set_pipeline_result( + dummy_pipeline_5, runner.PipelineResult(runner.PipelineState.RUNNING)) mocked_cleanup.assert_not_called() - ie.current_env().evict_cache_manager(dummy_pipeline) + ie.current_env().track_user_pipelines() mocked_cleanup.assert_called_once() diff --git a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py index 3bce182ba1c7..a4a9f02c01f4 100644 --- a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py +++ b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py @@ -41,6 +41,7 @@ class InteractiveEnvironmentInspector(object): def __init__(self): self._inspectables = {} self._anonymous = {} + self._inspectable_pipelines = set() @property def inspectables(self): @@ -49,6 +50,20 @@ def inspectables(self): self._inspectables = inspect() return self._inspectables + @property + def inspectable_pipelines(self): + """Returns a dictionary of all inspectable pipelines. The keys are + stringified id of pipeline instances. + + This includes user defined pipeline assigned to variables and anonymous + pipelines with inspectable PCollections. + If a user defined pipeline is not within the returned dict, it can be + considered out of scope, and all resources and memory states related to it + should be released. + """ + _ = self.list_inspectables() + return self._inspectable_pipelines + @as_json def list_inspectables(self): """Lists inspectables in JSON format. @@ -89,6 +104,8 @@ def list_inspectables(self): pipeline_identifier = obfuscate(meta(pipelines[pipeline], pipeline)) listing[pipeline_identifier]['pcolls'][identifier] = inspectable[ 'metadata'] + self._inspectable_pipelines = dict( + (str(id(pipeline)), pipeline) for pipeline in pipelines) return listing def get_val(self, identifier): diff --git a/sdks/python/apache_beam/runners/interactive/options/capture_control.py b/sdks/python/apache_beam/runners/interactive/options/capture_control.py index 12e901f46920..ab877b5c8e3f 100644 --- a/sdks/python/apache_beam/runners/interactive/options/capture_control.py +++ b/sdks/python/apache_beam/runners/interactive/options/capture_control.py @@ -29,7 +29,6 @@ from datetime import timedelta from apache_beam.io.gcp.pubsub import ReadFromPubSub -from apache_beam.runners.interactive import background_caching_job as bcj from apache_beam.runners.interactive import interactive_environment as ie from apache_beam.runners.interactive.options import capture_limiters @@ -71,15 +70,7 @@ def evict_captured_data(pipeline=None): runs, Interactive Beam will capture fresh data.""" if ie.current_env().options.enable_capture_replay: _LOGGER.info( - 'You have requested Interactive Beam to evict all captured ' + 'You have requested Interactive Beam to evict all recorded' 'data that could be deterministically replayed among multiple ' 'pipeline runs.') - ie.current_env().track_user_pipelines() - if pipeline: - bcj.attempt_to_cancel_background_caching_job(pipeline) - bcj.attempt_to_stop_test_stream_service(pipeline) - else: - for user_pipeline in ie.current_env().tracked_user_pipelines: - bcj.attempt_to_cancel_background_caching_job(user_pipeline) - bcj.attempt_to_stop_test_stream_service(user_pipeline) ie.current_env().cleanup(pipeline) From f8390afa46c8575ec88c0585215c0b81efa43937 Mon Sep 17 00:00:00 2001 From: Ning Kang Date: Fri, 24 Jul 2020 15:15:14 -0700 Subject: [PATCH 2/2] Fixed a test that didn't start a test stream server. With the new cleaning up routine, all test stream servers held by current interactive environment will be stopped in the test. If the grpc server has never been started (happens in tests), the stop operation will hang for a long time. Change-Id: I2ae7ecf5e3ac11f32888887d82cd885fc64cc82f --- .../runners/interactive/background_caching_job_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py b/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py index 45c65dd1abdc..803f6ceb8b07 100644 --- a/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py +++ b/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py @@ -91,8 +91,6 @@ def _setup_test_streaming_cache(pipeline): sys.version_info < (3, 6), 'The tests require at least Python 3.6 to work.') class BackgroundCachingJobTest(unittest.TestCase): def tearDown(self): - for _, job in ie.current_env()._background_caching_jobs.items(): - job.cancel() ie.new_env() # TODO(BEAM-8335): remove the patches when there are appropriate test sources @@ -302,9 +300,11 @@ def visit_transform(self, transform_node): def test_determine_a_test_stream_service_running(self): pipeline = _build_an_empty_stream_pipeline() test_stream_service = TestStreamServiceController(reader=None) + test_stream_service.start() ie.current_env().set_test_stream_service_controller( pipeline, test_stream_service) self.assertTrue(bcj.is_a_test_stream_service_running(pipeline)) + # the test_stream_service will be cleaned up on teardown. def test_stop_a_running_test_stream_service(self): pipeline = _build_an_empty_stream_pipeline()