diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job.py b/sdks/python/apache_beam/runners/interactive/background_caching_job.py index 117cd598509b..1b05285dd222 100644 --- a/sdks/python/apache_beam/runners/interactive/background_caching_job.py +++ b/sdks/python/apache_beam/runners/interactive/background_caching_job.py @@ -291,7 +291,7 @@ def sizeof_fmt(num, suffix='B'): 'data to start at the same time, all captured data has been ' 'cleared and a new segment of data will be recorded.') - ie.current_env().cleanup() + ie.current_env().cleanup(user_pipeline) ie.current_env().set_cached_source_signature( user_pipeline, current_signature) return is_changed diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py b/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py index 45c65dd1abdc..803f6ceb8b07 100644 --- a/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py +++ b/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py @@ -91,8 +91,6 @@ def _setup_test_streaming_cache(pipeline): sys.version_info < (3, 6), 'The tests require at least Python 3.6 to work.') class BackgroundCachingJobTest(unittest.TestCase): def tearDown(self): - for _, job in ie.current_env()._background_caching_jobs.items(): - job.cancel() ie.new_env() # TODO(BEAM-8335): remove the patches when there are appropriate test sources @@ -302,9 +300,11 @@ def visit_transform(self, transform_node): def test_determine_a_test_stream_service_running(self): pipeline = _build_an_empty_stream_pipeline() test_stream_service = TestStreamServiceController(reader=None) + test_stream_service.start() ie.current_env().set_test_stream_service_controller( pipeline, test_stream_service) self.assertTrue(bcj.is_a_test_stream_service_running(pipeline)) + # the test_stream_service will be cleaned up on teardown. def test_stop_a_running_test_stream_service(self): pipeline = _build_an_empty_stream_pipeline() diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment.py b/sdks/python/apache_beam/runners/interactive/interactive_environment.py index 1d28517451a8..4363d177498b 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py @@ -251,18 +251,32 @@ def inspector(self): return self._inspector def cleanup(self, pipeline=None): - """Cleans up cached states for the given pipeline. Cleans up - for all pipelines if no specific pipeline is given.""" + """Cleans up cached states for the given pipeline. Noop if the given + pipeline is absent from the environment. Cleans up for all pipelines + if no pipeline is specified.""" if pipeline: + from apache_beam.runners.interactive import background_caching_job as bcj + bcj.attempt_to_cancel_background_caching_job(pipeline) + bcj.attempt_to_stop_test_stream_service(pipeline) cache_manager = self.get_cache_manager(pipeline) if cache_manager: cache_manager.cleanup() else: + for _, job in self._background_caching_jobs.items(): + if job: + job.cancel() + for _, controller in self._test_stream_service_controllers.items(): + if controller: + controller.stop() for _, cache_manager in self._cache_managers.items(): - cache_manager.cleanup() + if cache_manager: + cache_manager.cleanup() + self.evict_background_caching_job(pipeline) + self.evict_test_stream_service_controller(pipeline) self.evict_computed_pcollections(pipeline) self.evict_cached_source_signature(pipeline) + self.evict_pipeline_result(pipeline) def watch(self, watchable): """Watches a watchable. @@ -343,9 +357,13 @@ def set_pipeline_result(self, pipeline, result): 'apache_beam.runners.runner.PipelineResult or its subclass') self._main_pipeline_results[str(id(pipeline))] = result - def evict_pipeline_result(self, pipeline): - """Evicts the tracking of given pipeline run. Noop if absent.""" - return self._main_pipeline_results.pop(str(id(pipeline)), None) + def evict_pipeline_result(self, pipeline=None): + """Evicts the last run result of the given pipeline. Noop if the pipeline + is absent from the environment. If no pipeline is specified, evicts for all + pipelines.""" + if pipeline: + return self._main_pipeline_results.pop(str(id(pipeline)), None) + self._main_pipeline_results.clear() def pipeline_result(self, pipeline): """Gets the pipeline run result. None if absent.""" @@ -364,16 +382,24 @@ def get_background_caching_job(self, pipeline): """Gets the background caching job started from the given pipeline.""" return self._background_caching_jobs.get(str(id(pipeline)), None) + def evict_background_caching_job(self, pipeline=None): + """Evicts the background caching job started from the given pipeline. Noop + if the given pipeline is absent from the environment. If no pipeline is + specified, evicts for all pipelines.""" + if pipeline: + return self._background_caching_jobs.pop(str(id(pipeline)), None) + self._background_caching_jobs.clear() + def set_test_stream_service_controller(self, pipeline, controller): """Sets the test stream service controller that has started a gRPC server - serving the test stream for any job started from the given user-defined + serving the test stream for any job started from the given user defined pipeline. """ self._test_stream_service_controllers[str(id(pipeline))] = controller def get_test_stream_service_controller(self, pipeline): """Gets the test stream service controller that has started a gRPC server - serving the test stream for any job started from the given user-defined + serving the test stream for any job started from the given user defined pipeline. """ return self._test_stream_service_controllers.get(str(id(pipeline)), None) @@ -381,9 +407,12 @@ def get_test_stream_service_controller(self, pipeline): def evict_test_stream_service_controller(self, pipeline): """Evicts and pops the test stream service controller that has started a gRPC server serving the test stream for any job started from the given - user-defined pipeline. + user defined pipeline. Noop if the given pipeline is absent from the + environment. If no pipeline is specified, evicts for all pipelines. """ - return self._test_stream_service_controllers.pop(str(id(pipeline)), None) + if pipeline: + return self._test_stream_service_controllers.pop(str(id(pipeline)), None) + self._test_stream_service_controllers.clear() def is_terminated(self, pipeline): """Queries if the most recent job (by executing the given pipeline) state @@ -400,13 +429,15 @@ def get_cached_source_signature(self, pipeline): return self._cached_source_signature.get(str(id(pipeline)), set()) def evict_cached_source_signature(self, pipeline=None): + """Evicts the signature generated for each recorded source of the given + pipeline. Noop if the given pipeline is absent from the environment. If no + pipeline is specified, evicts for all pipelines.""" if pipeline: - self._cached_source_signature.pop(str(id(pipeline)), None) - else: - self._cached_source_signature.clear() + return self._cached_source_signature.pop(str(id(pipeline)), None) + self._cached_source_signature.clear() def track_user_pipelines(self): - """Record references to all user-defined pipeline instances watched in + """Record references to all user defined pipeline instances watched in current environment. Current static global singleton interactive environment holds references to @@ -416,11 +447,17 @@ def track_user_pipelines(self): then handle them differently. This is invoked every time a PTransform is to be applied if the current - code execution is under ipython due to the possibility that any user-defined + code execution is under ipython due to the possibility that any user defined pipeline can be re-evaluated through notebook cell re-execution at any time. Each time this is invoked, it will check if there is a cache manager already created for each user defined pipeline. If not, create one for it. + + If a pipeline is no longer watched due to re-execution while its + PCollections are still in watched scope, the pipeline becomes anonymous but + still accessible indirectly through references to its PCollections. This + function also clears up internal states for those anonymous pipelines once + all their PCollections are anonymous. """ self._tracked_user_pipelines = set() for watching in self.watching(): @@ -428,6 +465,17 @@ def track_user_pipelines(self): if isinstance(val, beam.pipeline.Pipeline): self._tracked_user_pipelines.add(val) _ = self.get_cache_manager(val, create_if_absent=True) + all_tracked_pipeline_ids = set(self._background_caching_jobs.keys()).union( + set(self._test_stream_service_controllers.keys()), + set(self._cache_managers.keys()), + {str(id(pcoll.pipeline)) + for pcoll in self._computed_pcolls}, + set(self._cached_source_signature.keys()), + set(self._main_pipeline_results.keys())) + inspectable_pipelines = self._inspector.inspectable_pipelines + for pipeline in all_tracked_pipeline_ids: + if pipeline not in inspectable_pipelines: + self.cleanup(pipeline) @property def tracked_user_pipelines(self): diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py index 6f44dac7f2df..6650c63e60b5 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py @@ -236,14 +236,33 @@ def test_get_cache_manager_creates_cache_manager_if_absent(self): @patch( 'apache_beam.runners.interactive.interactive_environment' '.InteractiveEnvironment.cleanup') - def test_cleanup_invoked_when_cache_manager_is_evicted(self, mocked_cleanup): + def test_track_user_pipeline_cleanup_non_inspectable_pipeline( + self, mocked_cleanup): ie._interactive_beam_env = None ie.new_env() - dummy_pipeline = 'dummy' + dummy_pipeline_1 = beam.Pipeline() + dummy_pipeline_2 = beam.Pipeline() + dummy_pipeline_3 = beam.Pipeline() + dummy_pipeline_4 = beam.Pipeline() + dummy_pcoll = dummy_pipeline_4 | beam.Create([1]) + dummy_pipeline_5 = beam.Pipeline() + dummy_non_inspectable_pipeline = 'dummy' + ie.current_env().watch(locals()) + from apache_beam.runners.interactive.background_caching_job import BackgroundCachingJob + ie.current_env().set_background_caching_job( + dummy_pipeline_1, + BackgroundCachingJob( + runner.PipelineResult(runner.PipelineState.DONE), limiters=[])) + ie.current_env().set_test_stream_service_controller(dummy_pipeline_2, None) ie.current_env().set_cache_manager( - cache.FileBasedCacheManager(), dummy_pipeline) + cache.FileBasedCacheManager(), dummy_pipeline_3) + ie.current_env().mark_pcollection_computed([dummy_pcoll]) + ie.current_env().set_cached_source_signature( + dummy_non_inspectable_pipeline, None) + ie.current_env().set_pipeline_result( + dummy_pipeline_5, runner.PipelineResult(runner.PipelineState.RUNNING)) mocked_cleanup.assert_not_called() - ie.current_env().evict_cache_manager(dummy_pipeline) + ie.current_env().track_user_pipelines() mocked_cleanup.assert_called_once() diff --git a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py index 3bce182ba1c7..a4a9f02c01f4 100644 --- a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py +++ b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py @@ -41,6 +41,7 @@ class InteractiveEnvironmentInspector(object): def __init__(self): self._inspectables = {} self._anonymous = {} + self._inspectable_pipelines = set() @property def inspectables(self): @@ -49,6 +50,20 @@ def inspectables(self): self._inspectables = inspect() return self._inspectables + @property + def inspectable_pipelines(self): + """Returns a dictionary of all inspectable pipelines. The keys are + stringified id of pipeline instances. + + This includes user defined pipeline assigned to variables and anonymous + pipelines with inspectable PCollections. + If a user defined pipeline is not within the returned dict, it can be + considered out of scope, and all resources and memory states related to it + should be released. + """ + _ = self.list_inspectables() + return self._inspectable_pipelines + @as_json def list_inspectables(self): """Lists inspectables in JSON format. @@ -89,6 +104,8 @@ def list_inspectables(self): pipeline_identifier = obfuscate(meta(pipelines[pipeline], pipeline)) listing[pipeline_identifier]['pcolls'][identifier] = inspectable[ 'metadata'] + self._inspectable_pipelines = dict( + (str(id(pipeline)), pipeline) for pipeline in pipelines) return listing def get_val(self, identifier): diff --git a/sdks/python/apache_beam/runners/interactive/options/capture_control.py b/sdks/python/apache_beam/runners/interactive/options/capture_control.py index 12e901f46920..ab877b5c8e3f 100644 --- a/sdks/python/apache_beam/runners/interactive/options/capture_control.py +++ b/sdks/python/apache_beam/runners/interactive/options/capture_control.py @@ -29,7 +29,6 @@ from datetime import timedelta from apache_beam.io.gcp.pubsub import ReadFromPubSub -from apache_beam.runners.interactive import background_caching_job as bcj from apache_beam.runners.interactive import interactive_environment as ie from apache_beam.runners.interactive.options import capture_limiters @@ -71,15 +70,7 @@ def evict_captured_data(pipeline=None): runs, Interactive Beam will capture fresh data.""" if ie.current_env().options.enable_capture_replay: _LOGGER.info( - 'You have requested Interactive Beam to evict all captured ' + 'You have requested Interactive Beam to evict all recorded' 'data that could be deterministically replayed among multiple ' 'pipeline runs.') - ie.current_env().track_user_pipelines() - if pipeline: - bcj.attempt_to_cancel_background_caching_job(pipeline) - bcj.attempt_to_stop_test_stream_service(pipeline) - else: - for user_pipeline in ie.current_env().tracked_user_pipelines: - bcj.attempt_to_cancel_background_caching_job(user_pipeline) - bcj.attempt_to_stop_test_stream_service(user_pipeline) ie.current_env().cleanup(pipeline)