Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,17 @@ where
pub fn operator(&self) -> &OperatorInfo {
&self.operator
}

/// Downgrade the trace handle.
///
/// The weak trace agent can be upgraded to inspect the trace if there are other agents still holding
/// on to the trace. The weak handle does not prevent logical and physical compaction, and cannot
/// be upgraded to a [`TraceAgent`].
pub fn downgrade(&self) -> TraceAgentWeak<Tr> {
TraceAgentWeak {
trace: Rc::downgrade(&self.trace),
}
}
}

impl<Tr> TraceAgent<Tr>
Expand Down Expand Up @@ -593,3 +604,54 @@ where
self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow());
}
}

/// A weak handle to a trace.
pub struct TraceAgentWeak<Tr>
where
Tr: TraceReader,
Tr::Time: Lattice+Ord+Clone+'static,
{
/// The weak trace handle
trace: Weak<RefCell<TraceBox<Tr>>>,
}

/// A strong handle to a trace, but without preventing compaction.
pub struct TraceAgentGuard<Tr>
where
Tr: TraceReader,
Tr::Time: Lattice+Ord+Clone+'static,
{
trace: Rc<RefCell<TraceBox<Tr>>>,
}

impl<Tr> TraceAgentWeak<Tr>
where
Tr: TraceReader,
Tr::Time: Timestamp+Lattice,
{
/// Upgrade the weak handle to gain access to the trace.
///
/// Returns a value when the trace still exists, and `None` otherwise.
#[inline]
pub fn upgrade(&self) -> Option<TraceAgentGuard<Tr>> {
if let Some(trace) = self.trace.upgrade() {
Some(TraceAgentGuard {
trace,
})
} else {
None
}
}
}

impl<Tr> TraceAgentGuard<Tr>
where
Tr: TraceReader,
Tr::Time: Timestamp+Lattice,
{
/// Access the underlying trace, only grant access to immutable functions.
#[inline]
pub fn as_deref(&self) -> impl std::ops::Deref<Target=Tr> + '_ {
std::cell::Ref::map(self.trace.borrow(), |trace_box| &trace_box.trace)
}
}