From d60158d83b467416ae4e9ea23516761497b1824f Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 4 Jul 2023 17:38:03 +0200 Subject: [PATCH] Downgrade/upgrade trace agents Add the ability to downgrade trace agents to only hold weak references to the trace. The weak reference can be upgraded and dereferenced to the trace, but not upgraded to a full agent. This is because the caller already gave up the right to prevent compaction, and re-acquiring it, while technically possible, introduces uncertainty about what time can be selected. ## Example ```rust let weak_trace = arranged.trace.downgrade(); if let Some(trace) = weak_trace.upgrade() { trace.as_deref().map_batches(|_| {}); } ``` Signed-off-by: Moritz Hoffmann --- src/operators/arrange/agent.rs | 62 ++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 4a9e8f242..7ed16d445 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -160,6 +160,17 @@ where pub fn operator(&self) -> &OperatorInfo { &self.operator } + + /// Downgrade the trace handle. + /// + /// The weak trace agent can be upgraded to inspect the trace if there are other agents still holding + /// on to the trace. The weak handle does not prevent logical and physical compaction, and cannot + /// be upgraded to a [`TraceAgent`]. + pub fn downgrade(&self) -> TraceAgentWeak { + TraceAgentWeak { + trace: Rc::downgrade(&self.trace), + } + } } impl TraceAgent @@ -593,3 +604,54 @@ where self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow()); } } + +/// A weak handle to a trace. +pub struct TraceAgentWeak +where + Tr: TraceReader, + Tr::Time: Lattice+Ord+Clone+'static, +{ + /// The weak trace handle + trace: Weak>>, +} + +/// A strong handle to a trace, but without preventing compaction. +pub struct TraceAgentGuard + where + Tr: TraceReader, + Tr::Time: Lattice+Ord+Clone+'static, +{ + trace: Rc>>, +} + +impl TraceAgentWeak + where + Tr: TraceReader, + Tr::Time: Timestamp+Lattice, +{ + /// Upgrade the weak handle to gain access to the trace. + /// + /// Returns a value when the trace still exists, and `None` otherwise. + #[inline] + pub fn upgrade(&self) -> Option> { + if let Some(trace) = self.trace.upgrade() { + Some(TraceAgentGuard { + trace, + }) + } else { + None + } + } +} + +impl TraceAgentGuard + where + Tr: TraceReader, + Tr::Time: Timestamp+Lattice, +{ + /// Access the underlying trace, only grant access to immutable functions. + #[inline] + pub fn as_deref(&self) -> impl std::ops::Deref + '_ { + std::cell::Ref::map(self.trace.borrow(), |trace_box| &trace_box.trace) + } +}