diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 4a9e8f242..7ed16d445 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -160,6 +160,17 @@ where pub fn operator(&self) -> &OperatorInfo { &self.operator } + + /// Downgrade the trace handle. + /// + /// The weak trace agent can be upgraded to inspect the trace if there are other agents still holding + /// on to the trace. The weak handle does not prevent logical and physical compaction, and cannot + /// be upgraded to a [`TraceAgent`]. + pub fn downgrade(&self) -> TraceAgentWeak { + TraceAgentWeak { + trace: Rc::downgrade(&self.trace), + } + } } impl TraceAgent @@ -593,3 +604,54 @@ where self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow()); } } + +/// A weak handle to a trace. +pub struct TraceAgentWeak +where + Tr: TraceReader, + Tr::Time: Lattice+Ord+Clone+'static, +{ + /// The weak trace handle + trace: Weak>>, +} + +/// A strong handle to a trace, but without preventing compaction. +pub struct TraceAgentGuard + where + Tr: TraceReader, + Tr::Time: Lattice+Ord+Clone+'static, +{ + trace: Rc>>, +} + +impl TraceAgentWeak + where + Tr: TraceReader, + Tr::Time: Timestamp+Lattice, +{ + /// Upgrade the weak handle to gain access to the trace. + /// + /// Returns a value when the trace still exists, and `None` otherwise. + #[inline] + pub fn upgrade(&self) -> Option> { + if let Some(trace) = self.trace.upgrade() { + Some(TraceAgentGuard { + trace, + }) + } else { + None + } + } +} + +impl TraceAgentGuard + where + Tr: TraceReader, + Tr::Time: Timestamp+Lattice, +{ + /// Access the underlying trace, only grant access to immutable functions. + #[inline] + pub fn as_deref(&self) -> impl std::ops::Deref + '_ { + std::cell::Ref::map(self.trace.borrow(), |trace_box| &trace_box.trace) + } +}