From f77547dc3f789ea2292f46fa2363008d54f0dacf Mon Sep 17 00:00:00 2001 From: Ruchir Khaitan Date: Fri, 20 Aug 2021 15:34:21 -0400 Subject: [PATCH] persist: relax the requirements on allow_compaction and seal to >= Previously, seal and allow_compaction both required their arguments to be strictly greater than any previous argument passed to that function. This causes, among other things, this log line: ERROR coord::coord: failed to compact persisted tables: invalid compaction less than or equal to trace since Antichain { elements: [0] }: Antichain { elements: [0] } to be emitted when the coordinator tries to allow compactions to 0 at dataflow initialization time. allow_compaction and seal map to differential's set_{logical,physical}_compaction and timely's Capability::downgrade. Capability downgrades already accept arguments that are greater than or equal to the previous argument and set_{logical,physical}_compaction will be changed to do so as well. We can also revert the temporary fix to avoid the noisy log lines introduced in #7984 --- src/coord/src/coord.rs | 11 ------- src/persist/src/indexed/mod.rs | 19 ++++------- src/persist/src/indexed/runtime.rs | 6 ++-- src/persist/src/indexed/trace.rs | 48 ++++++++++++++++++++++++---- src/persist/src/nemesis/validator.rs | 4 +-- 5 files changed, 53 insertions(+), 35 deletions(-) diff --git a/src/coord/src/coord.rs b/src/coord/src/coord.rs index 51eb5347dba5d..7fb276286de56 100644 --- a/src/coord/src/coord.rs +++ b/src/coord/src/coord.rs @@ -1192,17 +1192,6 @@ impl Coordinator { fn persisted_table_allow_compaction(&self, since_updates: &[(GlobalId, Antichain)]) { let mut table_since_updates = vec![]; for (id, frontier) in since_updates.iter() { - // HACK: Avoid the "failed to compact persisted tables" error log at - // startup, by not trying to allow compaction on the minimum - // timestamp. Real fix in #7977. - if !frontier - .elements() - .iter() - .any(|x| *x > Timestamp::minimum()) - { - continue; - } - // Not all ids will be present in the catalog however, those that are // in the catalog must also have their dependencies in the catalog as // well. diff --git a/src/persist/src/indexed/mod.rs b/src/persist/src/indexed/mod.rs index b691d64ba8be2..7b549787570bc 100644 --- a/src/persist/src/indexed/mod.rs +++ b/src/persist/src/indexed/mod.rs @@ -855,9 +855,8 @@ impl Indexed { /// This frontier represents a contract of time such that all updates with a /// time less than it have arrived. This frontier is advanced though the /// `seal` method. Once a time has been sealed for an id, it becomes an - /// error to later seal it at an time less than or equal to the sealed - /// frontier. It is also an error to write new data with a time less than - /// the sealed frontier. + /// error to later seal it at an time less than the sealed frontier. It is + /// also an error to write new data with a time less than the sealed frontier. fn sealed_frontier(&self, id: Id) -> Result, Error> { let trace = self .traces @@ -910,16 +909,12 @@ impl Indexed { Ok(()) } - /// Permit compaction of updates at times < since to since. + /// Permit compaction of updates at times <= since to since. /// - /// The compaction frontier can only monotonically increase and it is an error - /// to call this function with a since argument that is less than or equal to - /// the current compaction frontier. It is also an error to advance the - /// compaction frontier beyond the current sealed frontier. - /// - /// TODO: it's unclear whether this function needs to be so restrictive about - /// calls with a frontier <= current_compaction_frontier. We chose to mirror - /// the `seal` API here but if that doesn't make sense, remove the restrictions. + /// The compaction frontier can never decrease and it is an error to call + /// this function with a since argument that is less than the current compaction + /// frontier. It is also an error to advance the compaction frontier beyond the + /// current sealed frontier. pub fn allow_compaction( &mut self, id_sinces: Vec<(Id, Antichain)>, diff --git a/src/persist/src/indexed/runtime.rs b/src/persist/src/indexed/runtime.rs index b6db981f73cdd..adc9ed2a80ff7 100644 --- a/src/persist/src/indexed/runtime.rs +++ b/src/persist/src/indexed/runtime.rs @@ -849,10 +849,10 @@ mod tests { let ids = &[c1s1.stream_id(), c1s2.stream_id()]; multi.seal(ids, 2).recv()?; // We don't expose reading the seal directly, so hack it a bit here by - // verifying that we can't re-seal at the same timestamp (which is + // verifying that we can't re-seal at a prior timestamp (which is // disallowed). - assert_eq!(c1s1.seal(2).recv(), Err(Error::from("invalid seal for Id(0): 2 not in advance of current seal frontier Antichain { elements: [2] }"))); - assert_eq!(c1s2.seal(2).recv(), Err(Error::from("invalid seal for Id(1): 2 not in advance of current seal frontier Antichain { elements: [2] }"))); + assert_eq!(c1s1.seal(1).recv(), Err(Error::from("invalid seal for Id(0): 1 not at or in advance of current seal frontier Antichain { elements: [2] }"))); + assert_eq!(c1s2.seal(1).recv(), Err(Error::from("invalid seal for Id(1): 1 not at or in advance of current seal frontier Antichain { elements: [2] }"))); // Cannot write to streams not specified during construction. let (c1s3, _) = client1.create_or_load::<(), ()>("3")?; diff --git a/src/persist/src/indexed/trace.rs b/src/persist/src/indexed/trace.rs index d5e41611149e2..928f7ecba753f 100644 --- a/src/persist/src/indexed/trace.rs +++ b/src/persist/src/indexed/trace.rs @@ -152,9 +152,9 @@ impl Trace { /// [Trace::update_seal]. pub fn validate_seal(&self, ts: u64) -> Result<(), Error> { let prev = self.get_seal(); - if !prev.less_than(&ts) { + if !prev.less_equal(&ts) { return Err(Error::from(format!( - "invalid seal for {:?}: {:?} not in advance of current seal frontier {:?}", + "invalid seal for {:?}: {:?} not at or in advance of current seal frontier {:?}", self.id, ts, prev ))); } @@ -177,9 +177,9 @@ impl Trace { ))); } - if PartialOrder::less_equal(since, &self.since) { + if PartialOrder::less_than(since, &self.since) { return Err(Error::from(format!( - "invalid compaction less than or equal to trace since {:?}: {:?}", + "invalid compaction less than trace since {:?}: {:?}", self.since, since ))); } @@ -391,12 +391,12 @@ mod tests { t.allow_compaction(Antichain::from_elem(6)); // Repeat same since frontier. - assert_eq!(t.validate_allow_compaction(&Antichain::from_elem(6)), - Err(Error::from("invalid compaction less than or equal to trace since Antichain { elements: [6] }: Antichain { elements: [6] }"))); + t.validate_allow_compaction(&Antichain::from_elem(6))?; + t.allow_compaction(Antichain::from_elem(6)); // Regress since frontier. assert_eq!(t.validate_allow_compaction(&Antichain::from_elem(5)), - Err(Error::from("invalid compaction less than or equal to trace since Antichain { elements: [6] }: Antichain { elements: [5] }"))); + Err(Error::from("invalid compaction less than trace since Antichain { elements: [6] }: Antichain { elements: [5] }"))); // Advance since frontier to seal assert_eq!(t.validate_allow_compaction(&Antichain::from_elem(10)), @@ -409,6 +409,40 @@ mod tests { Ok(()) } + #[test] + fn trace_seal() -> Result<(), Error> { + let mut t: Trace = Trace::new(TraceMeta { + id: Id(0), + batches: vec![TraceBatchMeta { + key: "key1".to_string(), + desc: Description::new( + Antichain::from_elem(0), + Antichain::from_elem(10), + Antichain::from_elem(5), + ), + level: 1, + size_bytes: 0, + }], + since: Antichain::from_elem(5), + seal: Antichain::from_elem(10), + next_blob_id: 0, + }); + + // Normal case: advance seal frontier. + t.validate_seal(11)?; + t.update_seal(11); + + // Repeat same seal frontier. + t.validate_seal(11)?; + t.update_seal(11); + + // Regress seal frontier. + assert_eq!(t.validate_seal(10), + Err(Error::from("invalid seal for Id(0): 10 not at or in advance of current seal frontier Antichain { elements: [11] }"))); + + Ok(()) + } + #[test] fn trace_compact() -> Result<(), Error> { let mut blob = BlobCache::new( diff --git a/src/persist/src/nemesis/validator.rs b/src/persist/src/nemesis/validator.rs index 886c6e25baa11..205eaacc8f9af 100644 --- a/src/persist/src/nemesis/validator.rs +++ b/src/persist/src/nemesis/validator.rs @@ -234,7 +234,7 @@ impl Validator { let should_succeed = self.runtime_available && self.storage_available && req.ts - > self + >= self .seal_frontier .get(&req.stream) .copied() @@ -254,7 +254,7 @@ impl Validator { let should_succeed = self.runtime_available && self.storage_available && req.ts - > self + >= self .since_frontier .get(&req.stream) .copied()