KAFKA-8410: KTableProcessor migration groundwork#10744
Conversation
| class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { | ||
| private final KTableImpl<K, ?, V> parent; | ||
| private final Predicate<? super K, ? super V> predicate; | ||
| class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn, VIn> { |
There was a problem hiding this comment.
This is the only processor we migrate here. The point is to use this processor to make sure that the groundwork in the rest of these changes is sufficient.
| public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { | ||
| // This is the old processor context for compatibility with the other KTable processors. | ||
| // Once we migrte them all, we can swap this out. |
There was a problem hiding this comment.
This particular interface was too much trouble to migrate now, and it's not terribly significant, since the value getter never forwards.
| // Temporarily setting the processorSupplier to type Object so that we can transition from the | ||
| // old ProcessorSupplier to the new api.ProcessorSupplier. This works because all accesses to | ||
| // this field are guarded by typechecks anyway. | ||
| private final Object processorSupplier; |
There was a problem hiding this comment.
Calling this out as well. Hopefully the comment itself is clear.
| } else if (processorSupplier instanceof KTableNewProcessorSupplier) { | ||
| return ((KTableNewProcessorSupplier<?, ?, K, V>) processorSupplier).view(); |
There was a problem hiding this comment.
We have to add a new typecheck for the new supplier type.
| } else if (processorSupplier instanceof KTableNewProcessorSupplier) { | ||
| final KTableNewProcessorSupplier<?, ?, ?, ?> tableProcessorSupplier = | ||
| (KTableNewProcessorSupplier<?, ?, ?, ?>) processorSupplier; | ||
| if (!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
We have to add a new typecheck for the new supplier type.
| import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerializer; | ||
|
|
||
| public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> { | ||
| public class SinkNode<KIn, VIn> extends ProcessorNode<KIn, VIn, Void, Void> { |
There was a problem hiding this comment.
Here's where we declare the sink node cannot forward and hence only needs input parameters.
There was a problem hiding this comment.
That brought up a good question, as whether we need to override getChild and getChildren in SinkNode to throw as it should never be called?
There was a problem hiding this comment.
We already throw an exception if you try and add a child. I think it would complicate any of our processor graph traversal algorithms if we made it illegal to even call getChildren, as they would have to type-check the nodes before traversing.
| import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueDeserializer; | ||
|
|
||
| public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> { | ||
| public class SourceNode<KIn, VIn> extends ProcessorNode<KIn, VIn, KIn, VIn> { |
There was a problem hiding this comment.
Here, we declare that the source node can only forward the same type it receives.
| rawOldValue != null ? serdes.valueFrom(rawOldValue) : null, | ||
| timestamp | ||
| ), | ||
| new CacheFlushListener<byte[], byte[]>() { |
There was a problem hiding this comment.
Since the interface has two methods, it can't be a lambda anymore.
| replay(context); | ||
|
|
||
| new TimestampedCacheFlushListener<>(context).apply( | ||
| new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<String>>) context).apply( |
There was a problem hiding this comment.
Lines like this are because we have to cast to differentiate between the two constructors. Since the context is an IPC, it actually implements both interfaces, and it doesn't matter which one we cast to.
| final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics); | ||
| final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet()); | ||
| final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics); | ||
| final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet()); |
There was a problem hiding this comment.
For complicated java-type-system reasons, I had to switch from wildcards to Object in some places.
|
Filed ticket for Connect test: https://issues.apache.org/jira/browse/KAFKA-12842 |
| private static final String SINK_NAME = "KTABLE-SINK-"; | ||
|
|
||
| private final ProcessorSupplier<?, ?> processorSupplier; | ||
| // Temporarily setting the processorSupplier to type Object so that we can transition from the |
|
|
||
| import org.apache.kafka.streams.processor.api.ProcessorSupplier; | ||
|
|
||
| public interface KTableNewProcessorSupplier<KIn, VIn, KOut, VOut> extends ProcessorSupplier<KIn, Change<VIn>, KOut, Change<VOut>> { |
There was a problem hiding this comment.
Could we add TODO to the old interface to easily remind the potential removal work? Or we already have tickets to do it?
| final VOut oldValue, | ||
| final long timestamp) { | ||
| final ProcessorNode prev = context.currentNode(); | ||
| @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); |
There was a problem hiding this comment.
Why do we put suppression inline, instead of putting it on the top of function?
|
|
||
| @Override | ||
| public String toString() { | ||
| return "To{" + |
There was a problem hiding this comment.
nit: could we do a string format for this to read easier?
There was a problem hiding this comment.
I just didn't bother because there's no place it would actually be printed unless a test is failing. We can give more thought to the string format later on as needed.
| import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerializer; | ||
|
|
||
| public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> { | ||
| public class SinkNode<KIn, VIn> extends ProcessorNode<KIn, VIn, Void, Void> { |
There was a problem hiding this comment.
That brought up a good question, as whether we need to override getChild and getChildren in SinkNode to throw as it should never be called?
| /** | ||
| * Called when records are flushed from the {@link ThreadCache} | ||
| */ | ||
| void apply(final Record<K, Change<V>> record); |
There was a problem hiding this comment.
Do we want to to deprecate the old apply method and use the new one? If so, could we rename one of them to applyOld or applyNew to differentiate?
Additionally, we want a parameter signature for record
There was a problem hiding this comment.
I don't want to deprecate it right now, but as with the rest of the "compatibility mode" changes, the old member should become unused by the time @jeqo is done and we can remove it at that time.
…ls/KTableNewProcessorSupplier.java
This is an effort to help break up #10507
into multiple PRs.
Committer Checklist (excluded from commit message)