persist: take persistence since into account when calculating source since#9656
Conversation
danhhz
left a comment
There was a problem hiding this comment.
lgtm but we should get a coord person to look at it
| // because `seal` is `seal(u64)` but the return value suggests there could be more. Also: if | ||
| // the upper is a true multi-dimensional frontier in the future, the logic that determines a | ||
| // common upper seal timestamp will become a bit more complicated. | ||
| // TODO: We have a mismatch here: we know that these frontiers always contains only one |
There was a problem hiding this comment.
super nit: now it should be "these frontiers always contain" (don't bother redoing ci just for this)
| SourceConnector::External { persist, .. } => { | ||
| persist.as_ref().map(|p| p.primary_stream.since_ts) | ||
| } | ||
| _ => None, |
There was a problem hiding this comment.
nit: seems like you could save yourself the unwrap and make this Some(0) for the default case (also below)
There was a problem hiding this comment.
Yep, I thought about that, but I wanted to keep it that way because the Some(since_ts) really is a weird way of turning the one since_ts into an iterator. I wanted to keep that signalling, though I can understand why it's weird.
b220bf8 to
adc4fb9
Compare
| self.sources.insert(entry.id(), frontiers); | ||
| } | ||
| CatalogItem::Table(table) => { | ||
| println!("bootstrapping table: {:?}", table); |
There was a problem hiding this comment.
intentional or debugging info?
There was a problem hiding this comment.
Sorry, this one's not meant for merging. I have to clean this up once #9659 is in because this PR partially contains changes from that other one.
| /// And when we restart, we start from step 1., at which time we are guaranteed not to have a | ||
| /// source running already. | ||
| pub upper_seal_ts: u64, | ||
| // TODO: This duplicates the above description but seems fine? |
There was a problem hiding this comment.
yeah seems fine, let's remove the TODO :)
…since Before, all sources started out with a since frontier of [0] when created, without looking at the since frontier (aka compaction frontier) of persisted streams that are involved in a persistent source. Now, we look at the since of the primary stream when starting to read from a source. We only need to look at the primary stream because this is what carries the actual data of the source that we replay when starting up. This was not strictly a bug, because we never compact persistent sources. We do have to resolve this issue, though, in order to enable compaction. Fixes #9655
adc4fb9 to
39cca42
Compare
Before, all sources started out with a since frontier of [0] when
created, without looking at the since frontier (aka compaction
frontier) of persisted streams that are involved in a persistent
source.
Now, we look at the since of the primary stream when starting to read from a
source. We only need to look at the primary stream because this is what
carries the actual data of the source that we replay when starting up.
This was not strictly a bug, because we never compact persistent
sources. We do have to resolve this issue, though, in order to
enable compaction.
Fixes MaterializeInc/database-issues#2940
Note: I'll have to find someone from the coordinator team to also take a look at this because I'm messing with sinces and that often has the potential to introduce incorrectness.
Checklist
This change is