2020import dev .responsive .examples .regression .RegressionSchema ;
2121import dev .responsive .examples .regression .model .GroupedOrder ;
2222import dev .responsive .examples .regression .model .Order ;
23- import dev .responsive .examples .regression .model .StoredOrder ;
23+ import dev .responsive .examples .regression .model .OrderMetadata ;
2424import dev .responsive .kafka .api .stores .ResponsiveStores ;
2525import java .time .Duration ;
2626import java .util .ArrayList ;
2727import java .util .Map ;
28- import java .util .Optional ;
28+ import java .util .Set ;
2929import org .apache .kafka .common .serialization .Serdes ;
3030import org .apache .kafka .streams .KeyValue ;
3131import org .apache .kafka .streams .StreamsBuilder ;
3232import org .apache .kafka .streams .Topology ;
3333import org .apache .kafka .streams .kstream .Consumed ;
3434import org .apache .kafka .streams .kstream .Produced ;
35- import org .apache .kafka .streams .kstream .Transformer ;
36- import org .apache .kafka .streams .processor .ProcessorContext ;
3735import org .apache .kafka .streams .processor .PunctuationType ;
36+ import org .apache .kafka .streams .processor .api .Processor ;
37+ import org .apache .kafka .streams .processor .api .ProcessorContext ;
38+ import org .apache .kafka .streams .processor .api .ProcessorSupplier ;
39+ import org .apache .kafka .streams .processor .api .Record ;
3840import org .apache .kafka .streams .state .KeyValueIterator ;
3941import org .apache .kafka .streams .state .KeyValueStore ;
42+ import org .apache .kafka .streams .state .StoreBuilder ;
4043import org .apache .kafka .streams .state .Stores ;
4144
4245public class KeyBatchExample extends AbstractKSExampleService {
4346
47+ private static final String METADATA_STORE_NAME = "metadata" ;
48+ private static final String PURCHASES_STORE_NAME = "purchases" ;
49+
4450 private final UrandomGenerator randomGenerator = new UrandomGenerator ();
4551
4652 public KeyBatchExample (final Map <String , Object > props , final boolean responsive ) {
@@ -51,26 +57,12 @@ public KeyBatchExample(final Map<String, Object> props, final boolean responsive
5157 );
5258 }
5359
54- @ SuppressWarnings ("deprecation" ) // using Transformer interface for compatibility testing
5560 @ Override
5661 protected Topology buildTopology () {
5762 final StreamsBuilder builder = new StreamsBuilder ();
58- if (responsive ) {
59- builder .addStateStore (ResponsiveStores .keyValueStoreBuilder (
60- ResponsiveStores .keyValueStore ("grouped-orders-store" ),
61- Serdes .String (),
62- RegressionSchema .storedOrderSerde ()
63- ));
64- } else {
65- builder .addStateStore (Stores .keyValueStoreBuilder (
66- Stores .inMemoryKeyValueStore ("grouped-orders-store" ),
67- Serdes .String (),
68- RegressionSchema .storedOrderSerde ()
69- ));
70- }
7163
7264 builder .stream (ORDERS , Consumed .with (Serdes .String (), RegressionSchema .orderSerde ()))
73- .transform ( BatchTransformer :: new , "grouped-orders-store" )
65+ .process ( new BatchProcessorSupplier ( responsive ), PURCHASES_STORE_NAME , METADATA_STORE_NAME )
7466 .peek ((k , v ) -> {
7567 if (responsive ) {
7668 final var random = Math .abs (randomGenerator .nextLong () % 10000 );
@@ -84,117 +76,145 @@ protected Topology buildTopology() {
8476 return builder .build ();
8577 }
8678
87- private static class BatchTransformer
88- implements Transformer <String , Order , KeyValue <String , GroupedOrder >> {
79+ private static class BatchProcessorSupplier implements
80+ ProcessorSupplier <String , Order , String , GroupedOrder > {
81+
82+ final boolean responsive ;
8983
90- private ProcessorContext context ;
91- private KeyValueStore <String , StoredOrder > store ;
84+ public BatchProcessorSupplier (final boolean responsive ) {
85+ this .responsive = responsive ;
86+ }
87+
88+ @ Override
89+ public Processor <String , Order , String , GroupedOrder > get () {
90+ return new BatchProcessor ();
91+ }
9292
9393 @ Override
94- public void init (final ProcessorContext context ) {
94+ public Set <StoreBuilder <?>> stores () {
95+ if (responsive ) {
96+ return Set .of (
97+ ResponsiveStores .keyValueStoreBuilder (
98+ ResponsiveStores .keyValueStore (PURCHASES_STORE_NAME ),
99+ Serdes .String (),
100+ RegressionSchema .orderSerde ()
101+ ),
102+ ResponsiveStores .keyValueStoreBuilder (
103+ ResponsiveStores .keyValueStore (METADATA_STORE_NAME ),
104+ Serdes .String (),
105+ RegressionSchema .orderMetadataSerde ()
106+ )
107+ );
108+ } else {
109+ return Set .of (
110+ Stores .keyValueStoreBuilder (
111+ Stores .inMemoryKeyValueStore (PURCHASES_STORE_NAME ),
112+ Serdes .String (),
113+ RegressionSchema .orderSerde ()
114+ ),
115+ Stores .keyValueStoreBuilder (
116+ Stores .inMemoryKeyValueStore (METADATA_STORE_NAME ),
117+ Serdes .String (),
118+ RegressionSchema .orderMetadataSerde ()
119+ )
120+ );
121+ }
122+ }
123+ }
124+
125+ // TODO: use FixedKeyProcessor after fixing https://issues.apache.org/jira/browse/KAFKA-16585
126+ private static class BatchProcessor implements Processor <String , Order , String , GroupedOrder > {
127+
128+ private ProcessorContext <String , GroupedOrder > context ;
129+ private KeyValueStore <String , Order > purchasesStore ;
130+ private KeyValueStore <String , OrderMetadata > metadataStore ;
131+
132+ @ Override
133+ public void init (final ProcessorContext <String , GroupedOrder > context ) {
95134 this .context = context ;
96- this .store = context .getStateStore ("grouped-orders-store" );
135+ this .purchasesStore = context .getStateStore (PURCHASES_STORE_NAME );
136+ this .metadataStore = context .getStateStore (METADATA_STORE_NAME );
97137 this .context .schedule (
98138 Duration .ofSeconds (30 ),
99139 PunctuationType .STREAM_TIME ,
100- this ::flushExpired
140+ this ::flushReadyOrders
101141 );
102142 }
103143
104144 @ Override
105- public KeyValue <String , GroupedOrder > transform (final String key , final Order value ) {
106- final long ts = context .timestamp ();
107-
108- // first add the order to the list of orders that are stored
109- store .put (storedKey (key , ts ), new StoredOrder (Optional .of (value ), Optional .empty ()));
110-
111- // next, we need to update the tracked metadata row to
112- // check whether the value ought to be emitted
113- final String mKey = metaKey (key );
114- final StoredOrder .Meta meta = Optional .ofNullable (store .get (mKey ))
115- .orElse (new StoredOrder (Optional .empty (), Optional .of (new StoredOrder .Meta (ts , 0 , 0 ))))
116- .meta ()
117- .orElseThrow ();
118-
119- // instead of computing the actual size, for now just
120- // use the value amount and assume that it should be emitted
121- // after a certain amount of $$$ is spent
122- final StoredOrder .Meta newMeta = new StoredOrder .Meta (
123- ts ,
124- meta .count () + 1 ,
125- meta .size () + (long ) value .amount ()
126- );
127-
128- if (shouldFlush (newMeta , ts )) {
129- doFlush (key );
130- store .delete (mKey );
145+ public void process (final Record <String , Order > newPurchase ) {
146+ final String key = newPurchase .key ();
147+ final long newPurchaseTimestamp = newPurchase .timestamp ();
148+ final long newPurchaseSize = (long ) newPurchase .value ().amount ();
149+
150+ // first store the purchase under the key+timestamp
151+ purchasesStore .put (storedKey (key , newPurchaseTimestamp ), newPurchase .value ());
152+
153+ // next, we need to look up and update the tracked metadata for this key
154+ final OrderMetadata orderMetadata = metadataStore .get (key );
155+
156+ final OrderMetadata newOrderMetadata =
157+ orderMetadata == null
158+ ? new OrderMetadata (newPurchaseTimestamp , 1 , newPurchaseSize )
159+ : new OrderMetadata (
160+ orderMetadata .timestamp (),
161+ orderMetadata .count () + 1 ,
162+ orderMetadata .size () + newPurchaseSize
163+ );
164+
165+ // check if the key's purchases are ready to be batched and flushed,
166+ // otherwise just overwrite the metadata row with the new info
167+ if (shouldFlush (newOrderMetadata , newPurchaseTimestamp )) {
168+ doFlush (key , newOrderMetadata .timestamp ());
131169 } else {
132- store .put (mKey , new StoredOrder ( Optional . empty (), Optional . of ( newMeta )) );
170+ metadataStore .put (key , newOrderMetadata );
133171 }
134-
135- return null ;
136172 }
137173
138- private void flushExpired (long ts ) {
139- // iterate through all the metadata keys and check whether
140- // the corresponding values should be flushed - we can end
141- // at "null" because all keys that are not metadata keys
142- // start with "k." (which is less than "m.")
143- try (KeyValueIterator <String , StoredOrder > range = store .range ("m." , "n." )) {
174+ private void flushReadyOrders (long ts ) {
175+ // iterate through all the metadata rows and check whether the purchases
176+ // for each key are ready to be batched and flushed
177+ try (KeyValueIterator <String , OrderMetadata > range = metadataStore .all ()) {
144178 while (range .hasNext ()) {
145- final KeyValue <String , StoredOrder > kv = range .next ();
146- final StoredOrder .Meta meta = kv .value .meta ()
147- .orElseThrow (() -> new IllegalStateException (
148- "Got stored meta key with no meta: " + kv ));
149- if (shouldFlush (meta , ts )) {
150- doFlush (kv .key .split ("\\ ." )[1 ]);
151- store .delete (kv .key );
179+ final KeyValue <String , OrderMetadata > kv = range .next ();
180+ final OrderMetadata orderMetadata = kv .value ;
181+ if (shouldFlush (orderMetadata , ts )) {
182+ doFlush (kv .key , orderMetadata .timestamp ());
152183 }
153184 }
154185 }
155186 }
156187
157- private void doFlush (final String key ) {
188+ private void doFlush (final String key , final long batchTimestamp ) {
158189 try (
159- KeyValueIterator <String , StoredOrder > range = store .range (
190+ KeyValueIterator <String , Order > range = purchasesStore .range (
160191 storedKey (key , 0 ),
161192 storedKey (key , Long .MAX_VALUE )
162193 )
163194 ) {
164- final GroupedOrder result = new GroupedOrder (new ArrayList <>());
195+ final GroupedOrder groupedOrder = new GroupedOrder (new ArrayList <>());
165196
166197 while (range .hasNext ()) {
167- final KeyValue <String , StoredOrder > kv = range .next ();
168- store .delete (kv .key );
169-
170- final StoredOrder value = kv .value ;
171- result .orders ()
172- .add (value .order ()
173- .orElseThrow (() -> new IllegalStateException (
174- "Got stored order with no order! %s" .formatted (value ))));
198+ final KeyValue <String , Order > kv = range .next ();
199+ purchasesStore .delete (kv .key );
200+ groupedOrder .orders ().add (kv .value );
175201 }
176202
177- context .forward (key , result );
203+ context .forward (new Record <>( key , groupedOrder , batchTimestamp ) );
178204 }
179- }
180-
181- @ Override
182- public void close () {
183-
184- }
185205
186- static boolean shouldFlush (final StoredOrder .Meta meta , final long now ) {
187- return ((meta .timestamp () - now ) > 60_000 )
188- || (meta .count () > 50 )
189- || (meta .size () > 1_000 );
206+ // make sure to delete from the metadata store once the key is fully flushed
207+ metadataStore .delete (key );
190208 }
191209
192- static String metaKey (final String key ) {
193- return "m." + key ;
210+ private static boolean shouldFlush (final OrderMetadata orderMetadata , final long now ) {
211+ return ((orderMetadata .timestamp () - now ) > 60_000 )
212+ || (orderMetadata .count () > 50 )
213+ || (orderMetadata .size () > 1_000 );
194214 }
195215
196- static String storedKey (final String key , final long ts ) {
197- return "s. %s.%d" .formatted (key , ts );
216+ private static String storedKey (final String key , final long ts ) {
217+ return "%s.%d" .formatted (key , ts );
198218 }
199219 }
200220}
0 commit comments