Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions src/coord/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1192,17 +1192,6 @@ impl Coordinator {
fn persisted_table_allow_compaction(&self, since_updates: &[(GlobalId, Antichain<Timestamp>)]) {
let mut table_since_updates = vec![];
for (id, frontier) in since_updates.iter() {
// HACK: Avoid the "failed to compact persisted tables" error log at
// startup, by not trying to allow compaction on the minimum
// timestamp. Real fix in #7977.
if !frontier
.elements()
.iter()
.any(|x| *x > Timestamp::minimum())
{
continue;
}

// Not all ids will be present in the catalog however, those that are
// in the catalog must also have their dependencies in the catalog as
// well.
Expand Down
19 changes: 7 additions & 12 deletions src/persist/src/indexed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,9 +855,8 @@ impl<L: Log, B: Blob> Indexed<L, B> {
/// This frontier represents a contract of time such that all updates with a
/// time less than it have arrived. This frontier is advanced though the
/// `seal` method. Once a time has been sealed for an id, it becomes an
/// error to later seal it at an time less than or equal to the sealed
/// frontier. It is also an error to write new data with a time less than
/// the sealed frontier.
/// error to later seal it at an time less than the sealed frontier. It is
/// also an error to write new data with a time less than the sealed frontier.
fn sealed_frontier(&self, id: Id) -> Result<Antichain<u64>, Error> {
let trace = self
.traces
Expand Down Expand Up @@ -910,16 +909,12 @@ impl<L: Log, B: Blob> Indexed<L, B> {
Ok(())
}

/// Permit compaction of updates at times < since to since.
/// Permit compaction of updates at times <= since to since.
///
/// The compaction frontier can only monotonically increase and it is an error
/// to call this function with a since argument that is less than or equal to
/// the current compaction frontier. It is also an error to advance the
/// compaction frontier beyond the current sealed frontier.
///
/// TODO: it's unclear whether this function needs to be so restrictive about
/// calls with a frontier <= current_compaction_frontier. We chose to mirror
/// the `seal` API here but if that doesn't make sense, remove the restrictions.
/// The compaction frontier can never decrease and it is an error to call
/// this function with a since argument that is less than the current compaction
/// frontier. It is also an error to advance the compaction frontier beyond the
/// current sealed frontier.
pub fn allow_compaction(
&mut self,
id_sinces: Vec<(Id, Antichain<u64>)>,
Expand Down
6 changes: 3 additions & 3 deletions src/persist/src/indexed/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -849,10 +849,10 @@ mod tests {
let ids = &[c1s1.stream_id(), c1s2.stream_id()];
multi.seal(ids, 2).recv()?;
// We don't expose reading the seal directly, so hack it a bit here by
// verifying that we can't re-seal at the same timestamp (which is
// verifying that we can't re-seal at a prior timestamp (which is
// disallowed).
assert_eq!(c1s1.seal(2).recv(), Err(Error::from("invalid seal for Id(0): 2 not in advance of current seal frontier Antichain { elements: [2] }")));
assert_eq!(c1s2.seal(2).recv(), Err(Error::from("invalid seal for Id(1): 2 not in advance of current seal frontier Antichain { elements: [2] }")));
assert_eq!(c1s1.seal(1).recv(), Err(Error::from("invalid seal for Id(0): 1 not at or in advance of current seal frontier Antichain { elements: [2] }")));
assert_eq!(c1s2.seal(1).recv(), Err(Error::from("invalid seal for Id(1): 1 not at or in advance of current seal frontier Antichain { elements: [2] }")));

// Cannot write to streams not specified during construction.
let (c1s3, _) = client1.create_or_load::<(), ()>("3")?;
Expand Down
48 changes: 41 additions & 7 deletions src/persist/src/indexed/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ impl Trace {
/// [Trace::update_seal].
pub fn validate_seal(&self, ts: u64) -> Result<(), Error> {
let prev = self.get_seal();
if !prev.less_than(&ts) {
if !prev.less_equal(&ts) {
return Err(Error::from(format!(
"invalid seal for {:?}: {:?} not in advance of current seal frontier {:?}",
"invalid seal for {:?}: {:?} not at or in advance of current seal frontier {:?}",
self.id, ts, prev
)));
}
Expand All @@ -177,9 +177,9 @@ impl Trace {
)));
}

if PartialOrder::less_equal(since, &self.since) {
if PartialOrder::less_than(since, &self.since) {
return Err(Error::from(format!(
"invalid compaction less than or equal to trace since {:?}: {:?}",
"invalid compaction less than trace since {:?}: {:?}",
self.since, since
)));
}
Expand Down Expand Up @@ -391,12 +391,12 @@ mod tests {
t.allow_compaction(Antichain::from_elem(6));

// Repeat same since frontier.
assert_eq!(t.validate_allow_compaction(&Antichain::from_elem(6)),
Err(Error::from("invalid compaction less than or equal to trace since Antichain { elements: [6] }: Antichain { elements: [6] }")));
t.validate_allow_compaction(&Antichain::from_elem(6))?;
t.allow_compaction(Antichain::from_elem(6));

// Regress since frontier.
assert_eq!(t.validate_allow_compaction(&Antichain::from_elem(5)),
Err(Error::from("invalid compaction less than or equal to trace since Antichain { elements: [6] }: Antichain { elements: [5] }")));
Err(Error::from("invalid compaction less than trace since Antichain { elements: [6] }: Antichain { elements: [5] }")));

// Advance since frontier to seal
assert_eq!(t.validate_allow_compaction(&Antichain::from_elem(10)),
Expand All @@ -409,6 +409,40 @@ mod tests {
Ok(())
}

#[test]
fn trace_seal() -> Result<(), Error> {
let mut t: Trace = Trace::new(TraceMeta {
id: Id(0),
batches: vec![TraceBatchMeta {
key: "key1".to_string(),
desc: Description::new(
Antichain::from_elem(0),
Antichain::from_elem(10),
Antichain::from_elem(5),
),
level: 1,
size_bytes: 0,
}],
since: Antichain::from_elem(5),
seal: Antichain::from_elem(10),
next_blob_id: 0,
});

// Normal case: advance seal frontier.
t.validate_seal(11)?;
t.update_seal(11);

// Repeat same seal frontier.
t.validate_seal(11)?;
t.update_seal(11);

// Regress seal frontier.
assert_eq!(t.validate_seal(10),
Err(Error::from("invalid seal for Id(0): 10 not at or in advance of current seal frontier Antichain { elements: [11] }")));

Ok(())
}

#[test]
fn trace_compact() -> Result<(), Error> {
let mut blob = BlobCache::new(
Expand Down
4 changes: 2 additions & 2 deletions src/persist/src/nemesis/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl Validator {
let should_succeed = self.runtime_available
&& self.storage_available
&& req.ts
> self
>= self
.seal_frontier
.get(&req.stream)
.copied()
Expand All @@ -254,7 +254,7 @@ impl Validator {
let should_succeed = self.runtime_available
&& self.storage_available
&& req.ts
> self
>= self
.since_frontier
.get(&req.stream)
.copied()
Expand Down