Skip to content

Commit 542f3c6

Browse files
committed
apache#61 [euphoria-flink] Unit test covering out-of-order stream with allowed lateness
1 parent 1293823 commit 542f3c6

File tree

1 file changed

+45
-0
lines changed
  • sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming

1 file changed

+45
-0
lines changed

sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RBKTimeWindowTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,5 +226,50 @@ private List<String> fmt(List<Triple<TimeInterval, String, String>> xs) {
226226
.collect(Collectors.toList());
227227
}
228228

229+
@Test
230+
public void testEventWindowingWithAllowedLateness() throws Exception {
231+
ListDataSink<Triple<TimeInterval, String, Long>> output = ListDataSink.get(1);
232+
233+
ListDataSource<Pair<String, Integer>> source =
234+
ListDataSource.unbounded(
235+
asList(
236+
Pair.of("one", 1),
237+
Pair.of("one", 2),
238+
Pair.of("two", 7),
239+
Pair.of("two", 3), // latecomer, but in limit of allowed lateness
240+
Pair.of("two", 1), // latecomer, will be dropped
241+
Pair.of("two", 8),
242+
Pair.of("three", 8)))
243+
.withReadDelay(Duration.ofMillis(200));
244+
245+
Flow f = Flow.create("test-attached-windowing");
246+
Dataset<Pair<String, Long>> reduced =
247+
ReduceByKey.of(f.createInput(source))
248+
.keyBy(Pair::getFirst)
249+
.valueBy(e -> 1L)
250+
.combineBy(Sums.ofLongs())
251+
.windowBy(Time.of(Duration.ofMillis(5)),
252+
// ~ event time
253+
e -> (long) e.getSecond())
254+
.setNumPartitions(1)
255+
.output();
256+
257+
Util.extractWindows(reduced, TimeInterval.class).persist(output);
258+
259+
new TestFlinkExecutor()
260+
.setStateBackend(new RocksDBStateBackend("file:///tmp/flink-checkpoint"))
261+
.setAllowedLateness(Duration.ofMillis(4))
262+
.submit(f)
263+
.get();
264+
265+
assertEquals(
266+
asList("0:one-2", "0:two-1", "5:three-1", "5:two-2"),
267+
output.getOutput(0)
268+
.stream()
269+
.map(p -> p.getFirst().getStartMillis() + ":" + p.getSecond() + "-" + p.getThird())
270+
.sorted()
271+
.collect(Collectors.toList()));
272+
273+
}
229274

230275
}

0 commit comments

Comments
 (0)