Kafka 12317: Relax non-null key requirement in Kafka Streams#14174
Kafka 12317: Relax non-null key requirement in Kafka Streams#14174wcarlson5 merged 17 commits intoapache:trunkfrom
Conversation
32c0370 to
88037b5
Compare
edf866a to
0006e20
Compare
|
@mjsax @guozhangwang I'd say this PR is ready to be reviewd. May I ask for your input or do you know who to poke? |
380dbb7 to
f17f6b0
Compare
|
Hey @florin-akermann I can take a look at this soon. I'll probably be able to take a first look next week if you are still willing to push this |
|
Hey @wcarlson5 |
wcarlson5
left a comment
There was a problem hiding this comment.
Hey I left some comments, I think thats a good place to start, but I might need to think about a few things more on a second pass. So sorry if I missed something that might come up later.
I didn't have a chance to really dig into the testing yet, but it mostly looked sufficient.
I don't think we are too far off and I hope this lets you make some progress!
There was a problem hiding this comment.
This is to prevent null keys to go into reparation topics, right? Will that effect results if a manual reparation is added?
There was a problem hiding this comment.
This is related to this change: d1301b1
Previously, as an optimization, null key records were filtered out upon repartitioning.
Now we need null-key records to propagate if there is a left- or outer-join operator further downstream.
@mjsax hinted in KAFKA-12317 that this optimization can no longer be applied everywhere.
So now, by default, the optimization is no longer applied.
rewriteRepartitionNodes adds this optimization if no left- or outer-join operation is downstream to the partitionNode in question.
In general, a manual repartition is covered by this as well.
Though, now I wonder whether there should be some kind of config/flag whether you want this optimization to be applied at all.
Because, you could define some topology with left joins etc. expecting null-key records to propagate all the way through.
However, if you chose to add a manual repartition at the very end of your topology they would then still be filtered out because no left- or outer-join operation is downstream of this last repartition.
I guess developers would like to be able to opt out of this optimization in this case?
There was a problem hiding this comment.
Hmm, I would think they would like to opt out. That would require an update to the kip. Maybe even a revote. I'm not sure what the odds are that someone manually repartitioning would be needing the null-keys to propagate. But its probably higher than you would think as manual repartitioner as are typically power-users.
I don't think we need to make it optional as we already filter all null keys and now we let some propagate. Maybe we should just make a ticket and we can come back to it. Being able to toggle the optimization should be pretty simple.
There was a problem hiding this comment.
Ok, so basically we remove this optimization completley for now?
Developers could still just filter out null keys with a 'filter' operator to achieve the old behavior.
And then we make a separate ticket where developers can opt in to this optimization?
There was a problem hiding this comment.
why should null keys not enter the window?
There was a problem hiding this comment.
Should a null key ever match? I thought/think in this context null != null?
Plus, my guess would be that most WindowStore implementations throw upon null as a key?
There was a problem hiding this comment.
That's likely true about the windowstore implemntations. Something to think about for later then. We should maybe clarify the semantics about null matching in the docs somewhere.
There was a problem hiding this comment.
I'm not a huge fan of splitting this out to a separate public method. I think you can just reuse the logic in skip record.
There was a problem hiding this comment.
agree - adjusted
There was a problem hiding this comment.
This is the check already in skip record, can you use that instead?
There was a problem hiding this comment.
agree - adjusted
There was a problem hiding this comment.
Thank you very much @wcarlson5. I have adjusted the part about the skipRecord and replied to the others.
There is also Jira-14174. I kept it separate as it is a separate Jira.
Would you like me to merge it into this PR?
There was a problem hiding this comment.
This is related to this change: d1301b1
Previously, as an optimization, null key records were filtered out upon repartitioning.
Now we need null-key records to propagate if there is a left- or outer-join operator further downstream.
@mjsax hinted in KAFKA-12317 that this optimization can no longer be applied everywhere.
So now, by default, the optimization is no longer applied.
rewriteRepartitionNodes adds this optimization if no left- or outer-join operation is downstream to the partitionNode in question.
In general, a manual repartition is covered by this as well.
Though, now I wonder whether there should be some kind of config/flag whether you want this optimization to be applied at all.
Because, you could define some topology with left joins etc. expecting null-key records to propagate all the way through.
However, if you chose to add a manual repartition at the very end of your topology they would then still be filtered out because no left- or outer-join operation is downstream of this last repartition.
I guess developers would like to be able to opt out of this optimization in this case?
There was a problem hiding this comment.
agree - adjusted
There was a problem hiding this comment.
agree - adjusted
There was a problem hiding this comment.
Should a null key ever match? I thought/think in this context null != null?
Plus, my guess would be that most WindowStore implementations throw upon null as a key?
wcarlson5
left a comment
There was a problem hiding this comment.
I think its really close, just a couple of follow ups and we should be done. Thanks for the changes @florin-akermann !
There was a problem hiding this comment.
Hmm, I would think they would like to opt out. That would require an update to the kip. Maybe even a revote. I'm not sure what the odds are that someone manually repartitioning would be needing the null-keys to propagate. But its probably higher than you would think as manual repartitioner as are typically power-users.
I don't think we need to make it optional as we already filter all null keys and now we let some propagate. Maybe we should just make a ticket and we can come back to it. Being able to toggle the optimization should be pretty simple.
There was a problem hiding this comment.
That's likely true about the windowstore implemntations. Something to think about for later then. We should maybe clarify the semantics about null matching in the docs somewhere.
024db29 to
ef9128a
Compare
wcarlson5
left a comment
There was a problem hiding this comment.
Test failures are unrelated
|
@wcarlson5 thanks for the merge. I think it would be good to tackle #14107 in the same release. On a different note, I noticed that the squashed commit message doesn't adhere to the typically Jira ref: 'Kafka 12317' vs 'KAFKA-12317'. |
…14174) [KIP-962](https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams) The key requirments got relaxed for the followinger streams dsl operator: left join Kstream-Kstream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value. outer join Kstream-Kstream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value. left-foreign-key join Ktable-Ktable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value. left join KStream-Ktable: no longer drop left records with null-key and call ValueJoiner with 'null' for right value. left join KStream-GlobalTable: no longer drop records when KeyValueMapper returns 'null' and call ValueJoiner with 'null' for right value. Reviewers: Walker Carlson <wcarlson@apache.org>
…14174) [KIP-962](https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams) The key requirments got relaxed for the followinger streams dsl operator: left join Kstream-Kstream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value. outer join Kstream-Kstream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value. left-foreign-key join Ktable-Ktable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value. left join KStream-Ktable: no longer drop left records with null-key and call ValueJoiner with 'null' for right value. left join KStream-GlobalTable: no longer drop records when KeyValueMapper returns 'null' and call ValueJoiner with 'null' for right value. Reviewers: Walker Carlson <wcarlson@apache.org>
KIP-962
The key requirments got relaxed for the followinger streams dsl operator:
Repartitioning:
If a repartition node is upstream to any of the above listed operators then it may not filter null-key-records.
Thus, the new default behavior for repartiton nodes is to no longer filter out null-key-records.
However, we would like to keep this optimization where possible.
Therefore, during the 'buildAndOptimizeTopology' the graph gets traversed to look for repartition nodes which have none of the above listed operators downstream.
These nodes then get updated with the filter perdicate to filter out null-key-records.
Test Strategy:
Tests for each of the targeted dsl operator on minimal topologies plus extending some of the existing Integration tests (e.g. StreamStreamJoinIntegrationTest.java)
Misc:
I suggest updating https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics after this has been merged.
Committer Checklist (excluded from commit message)