Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def sizeof_fmt(num, suffix='B'):
'data to start at the same time, all captured data has been '
'cleared and a new segment of data will be recorded.')

ie.current_env().cleanup()
ie.current_env().cleanup(user_pipeline)
ie.current_env().set_cached_source_signature(
user_pipeline, current_signature)
return is_changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ def _setup_test_streaming_cache(pipeline):
sys.version_info < (3, 6), 'The tests require at least Python 3.6 to work.')
class BackgroundCachingJobTest(unittest.TestCase):
def tearDown(self):
for _, job in ie.current_env()._background_caching_jobs.items():
job.cancel()
ie.new_env()

# TODO(BEAM-8335): remove the patches when there are appropriate test sources
Expand Down Expand Up @@ -302,9 +300,11 @@ def visit_transform(self, transform_node):
def test_determine_a_test_stream_service_running(self):
pipeline = _build_an_empty_stream_pipeline()
test_stream_service = TestStreamServiceController(reader=None)
test_stream_service.start()
ie.current_env().set_test_stream_service_controller(
pipeline, test_stream_service)
self.assertTrue(bcj.is_a_test_stream_service_running(pipeline))
# the test_stream_service will be cleaned up on teardown.

def test_stop_a_running_test_stream_service(self):
pipeline = _build_an_empty_stream_pipeline()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,32 @@ def inspector(self):
return self._inspector

def cleanup(self, pipeline=None):
"""Cleans up cached states for the given pipeline. Cleans up
for all pipelines if no specific pipeline is given."""
"""Cleans up cached states for the given pipeline. Noop if the given
pipeline is absent from the environment. Cleans up for all pipelines
if no pipeline is specified."""
if pipeline:
from apache_beam.runners.interactive import background_caching_job as bcj
bcj.attempt_to_cancel_background_caching_job(pipeline)
bcj.attempt_to_stop_test_stream_service(pipeline)
cache_manager = self.get_cache_manager(pipeline)
if cache_manager:
cache_manager.cleanup()
else:
for _, job in self._background_caching_jobs.items():
if job:
job.cancel()
for _, controller in self._test_stream_service_controllers.items():
if controller:
controller.stop()
for _, cache_manager in self._cache_managers.items():
cache_manager.cleanup()
if cache_manager:
cache_manager.cleanup()

self.evict_background_caching_job(pipeline)
self.evict_test_stream_service_controller(pipeline)
self.evict_computed_pcollections(pipeline)
self.evict_cached_source_signature(pipeline)
self.evict_pipeline_result(pipeline)

def watch(self, watchable):
"""Watches a watchable.
Expand Down Expand Up @@ -343,9 +357,13 @@ def set_pipeline_result(self, pipeline, result):
'apache_beam.runners.runner.PipelineResult or its subclass')
self._main_pipeline_results[str(id(pipeline))] = result

def evict_pipeline_result(self, pipeline):
"""Evicts the tracking of given pipeline run. Noop if absent."""
return self._main_pipeline_results.pop(str(id(pipeline)), None)
def evict_pipeline_result(self, pipeline=None):
"""Evicts the last run result of the given pipeline. Noop if the pipeline
is absent from the environment. If no pipeline is specified, evicts for all
pipelines."""
if pipeline:
return self._main_pipeline_results.pop(str(id(pipeline)), None)
self._main_pipeline_results.clear()

def pipeline_result(self, pipeline):
"""Gets the pipeline run result. None if absent."""
Expand All @@ -364,26 +382,37 @@ def get_background_caching_job(self, pipeline):
"""Gets the background caching job started from the given pipeline."""
return self._background_caching_jobs.get(str(id(pipeline)), None)

def evict_background_caching_job(self, pipeline=None):
"""Evicts the background caching job started from the given pipeline. Noop
if the given pipeline is absent from the environment. If no pipeline is
specified, evicts for all pipelines."""
if pipeline:
return self._background_caching_jobs.pop(str(id(pipeline)), None)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check that it's running / stopped before evicting?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should not be necessary.

The idea is to let cleanup clear all states and evict_* functions only do eviction for each field.
Because if we stop a test stream, probably, we'll also have to stop the background caching job and any other in-memory states that is related. There is rarely a case that one of the fields get evicted without clearing other fields except in some unit tests, for example:

# In tests:
evict_xxx()

# Then in teardown, cleanup.

This also makes sure cleanup code doesn't redundantly show up in multiple places or cause infinite loop when evicting/cleanup are called by different modules.

self._background_caching_jobs.clear()

def set_test_stream_service_controller(self, pipeline, controller):
"""Sets the test stream service controller that has started a gRPC server
serving the test stream for any job started from the given user-defined
serving the test stream for any job started from the given user defined
pipeline.
"""
self._test_stream_service_controllers[str(id(pipeline))] = controller

def get_test_stream_service_controller(self, pipeline):
"""Gets the test stream service controller that has started a gRPC server
serving the test stream for any job started from the given user-defined
serving the test stream for any job started from the given user defined
pipeline.
"""
return self._test_stream_service_controllers.get(str(id(pipeline)), None)

def evict_test_stream_service_controller(self, pipeline):
"""Evicts and pops the test stream service controller that has started a
gRPC server serving the test stream for any job started from the given
user-defined pipeline.
user defined pipeline. Noop if the given pipeline is absent from the
environment. If no pipeline is specified, evicts for all pipelines.
"""
return self._test_stream_service_controllers.pop(str(id(pipeline)), None)
if pipeline:
return self._test_stream_service_controllers.pop(str(id(pipeline)), None)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check that it's running / stopped before evicting?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Evict_* only handles popping out from internal dictionaries. Other cleanup routine should be handled by cleanup.

self._test_stream_service_controllers.clear()

def is_terminated(self, pipeline):
"""Queries if the most recent job (by executing the given pipeline) state
Expand All @@ -400,13 +429,15 @@ def get_cached_source_signature(self, pipeline):
return self._cached_source_signature.get(str(id(pipeline)), set())

def evict_cached_source_signature(self, pipeline=None):
"""Evicts the signature generated for each recorded source of the given
pipeline. Noop if the given pipeline is absent from the environment. If no
pipeline is specified, evicts for all pipelines."""
if pipeline:
self._cached_source_signature.pop(str(id(pipeline)), None)
else:
self._cached_source_signature.clear()
return self._cached_source_signature.pop(str(id(pipeline)), None)
self._cached_source_signature.clear()

def track_user_pipelines(self):
"""Record references to all user-defined pipeline instances watched in
"""Record references to all user defined pipeline instances watched in
current environment.

Current static global singleton interactive environment holds references to
Expand All @@ -416,18 +447,35 @@ def track_user_pipelines(self):
then handle them differently.

This is invoked every time a PTransform is to be applied if the current
code execution is under ipython due to the possibility that any user-defined
code execution is under ipython due to the possibility that any user defined
pipeline can be re-evaluated through notebook cell re-execution at any time.

Each time this is invoked, it will check if there is a cache manager
already created for each user defined pipeline. If not, create one for it.

If a pipeline is no longer watched due to re-execution while its
PCollections are still in watched scope, the pipeline becomes anonymous but
still accessible indirectly through references to its PCollections. This
function also clears up internal states for those anonymous pipelines once
all their PCollections are anonymous.
"""
self._tracked_user_pipelines = set()
for watching in self.watching():
for _, val in watching:
if isinstance(val, beam.pipeline.Pipeline):
self._tracked_user_pipelines.add(val)
_ = self.get_cache_manager(val, create_if_absent=True)
all_tracked_pipeline_ids = set(self._background_caching_jobs.keys()).union(
set(self._test_stream_service_controllers.keys()),
set(self._cache_managers.keys()),
{str(id(pcoll.pipeline))
for pcoll in self._computed_pcolls},
set(self._cached_source_signature.keys()),
set(self._main_pipeline_results.keys()))
inspectable_pipelines = self._inspector.inspectable_pipelines
for pipeline in all_tracked_pipeline_ids:
if pipeline not in inspectable_pipelines:
self.cleanup(pipeline)

@property
def tracked_user_pipelines(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,33 @@ def test_get_cache_manager_creates_cache_manager_if_absent(self):
@patch(
'apache_beam.runners.interactive.interactive_environment'
'.InteractiveEnvironment.cleanup')
def test_cleanup_invoked_when_cache_manager_is_evicted(self, mocked_cleanup):
def test_track_user_pipeline_cleanup_non_inspectable_pipeline(
self, mocked_cleanup):
ie._interactive_beam_env = None
ie.new_env()
dummy_pipeline = 'dummy'
dummy_pipeline_1 = beam.Pipeline()
dummy_pipeline_2 = beam.Pipeline()
dummy_pipeline_3 = beam.Pipeline()
dummy_pipeline_4 = beam.Pipeline()
dummy_pcoll = dummy_pipeline_4 | beam.Create([1])
dummy_pipeline_5 = beam.Pipeline()
dummy_non_inspectable_pipeline = 'dummy'
ie.current_env().watch(locals())
from apache_beam.runners.interactive.background_caching_job import BackgroundCachingJob
ie.current_env().set_background_caching_job(
dummy_pipeline_1,
BackgroundCachingJob(
runner.PipelineResult(runner.PipelineState.DONE), limiters=[]))
ie.current_env().set_test_stream_service_controller(dummy_pipeline_2, None)
ie.current_env().set_cache_manager(
cache.FileBasedCacheManager(), dummy_pipeline)
cache.FileBasedCacheManager(), dummy_pipeline_3)
ie.current_env().mark_pcollection_computed([dummy_pcoll])
ie.current_env().set_cached_source_signature(
dummy_non_inspectable_pipeline, None)
ie.current_env().set_pipeline_result(
dummy_pipeline_5, runner.PipelineResult(runner.PipelineState.RUNNING))
mocked_cleanup.assert_not_called()
ie.current_env().evict_cache_manager(dummy_pipeline)
ie.current_env().track_user_pipelines()
mocked_cleanup.assert_called_once()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class InteractiveEnvironmentInspector(object):
def __init__(self):
self._inspectables = {}
self._anonymous = {}
self._inspectable_pipelines = set()

@property
def inspectables(self):
Expand All @@ -49,6 +50,20 @@ def inspectables(self):
self._inspectables = inspect()
return self._inspectables

@property
def inspectable_pipelines(self):
"""Returns a dictionary of all inspectable pipelines. The keys are
stringified id of pipeline instances.

This includes user defined pipeline assigned to variables and anonymous
pipelines with inspectable PCollections.
If a user defined pipeline is not within the returned dict, it can be
considered out of scope, and all resources and memory states related to it
should be released.
"""
_ = self.list_inspectables()
return self._inspectable_pipelines

@as_json
def list_inspectables(self):
"""Lists inspectables in JSON format.
Expand Down Expand Up @@ -89,6 +104,8 @@ def list_inspectables(self):
pipeline_identifier = obfuscate(meta(pipelines[pipeline], pipeline))
listing[pipeline_identifier]['pcolls'][identifier] = inspectable[
'metadata']
self._inspectable_pipelines = dict(
(str(id(pipeline)), pipeline) for pipeline in pipelines)
return listing

def get_val(self, identifier):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from datetime import timedelta

from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.runners.interactive import background_caching_job as bcj
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive.options import capture_limiters

Expand Down Expand Up @@ -71,15 +70,7 @@ def evict_captured_data(pipeline=None):
runs, Interactive Beam will capture fresh data."""
if ie.current_env().options.enable_capture_replay:
_LOGGER.info(
'You have requested Interactive Beam to evict all captured '
'You have requested Interactive Beam to evict all recorded'
'data that could be deterministically replayed among multiple '
'pipeline runs.')
ie.current_env().track_user_pipelines()
if pipeline:
bcj.attempt_to_cancel_background_caching_job(pipeline)
bcj.attempt_to_stop_test_stream_service(pipeline)
else:
for user_pipeline in ie.current_env().tracked_user_pipelines:
bcj.attempt_to_cancel_background_caching_job(user_pipeline)
bcj.attempt_to_stop_test_stream_service(user_pipeline)
ie.current_env().cleanup(pipeline)