-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Description
What happened?
The Python trigger AfterProcessingTime behaves different than Java's AfterProcessingTime.pastFirstElementInPane().plusDelayOf.
While Java behaves as "wait X time since the first element to trigger", Python behaves similar to a Session Window, where the wait is since the previous element instead of the first element in pane:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/trigger.py#L387
def on_element(self, element, window, context):
context.set_timer(
'', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
You can see this in this example:
Python
def get_input_stream():
stream = (
TestStream().add_elements([
TimestampedValue("1", timestamp=0)
])
.advance_watermark_to(1.5)
.advance_processing_time(1.5)
.add_elements([
TimestampedValue("2", timestamp=1)
])
.advance_watermark_to(2.5)
.advance_processing_time(1) # Running Processing time 2.5 (it should trigger now)
.add_elements([
TimestampedValue("3", timestamp=3)
])
.advance_watermark_to(3)
.advance_processing_time(0.5) # Running Processing time 3
.add_elements([
TimestampedValue("4", timestamp=4)
])
.advance_watermark_to(5)
.advance_processing_time(2) # Running Processing time 5, it triggers now since it's 2s since last element
.add_elements([
TimestampedValue("5", timestamp=4)
])
.advance_watermark_to(6)
.advance_processing_time(1.5)
.add_elements([
TimestampedValue("6", timestamp=8)
])
.advance_watermark_to(9)
.advance_processing_time(1.5)
.add_elements([
TimestampedValue("7", timestamp=7),
])
.advance_watermark_to(10)
.advance_processing_time(1.5)
.add_elements([
TimestampedValue("8", timestamp=8),
])
.advance_watermark_to(11)
.advance_processing_time(1.5)
.add_elements([
TimestampedValue("9", timestamp=7),
])
.advance_watermark_to(12)
.advance_processing_time(1.5) # Running Processing time 11.5, not triggering since 5
.add_elements([
TimestampedValue("10", timestamp=9),
])
.advance_processing_time(2)
.advance_watermark_to_infinity()
)
return stream
options = PipelineOptions(streaming=True)
p = TestPipeline(options=options)
window_size_seconds = 10
window_allowed_lateness_seconds = 5
count_pass = 3
delay = 2
stream = get_input_stream()
(p | stream
| WindowInto(
FixedWindows(size=window_size_seconds),
allowed_lateness=window_allowed_lateness_seconds,
accumulation_mode=trigger.AccumulationMode.DISCARDING,
trigger=trigger.Repeatedly(trigger.AfterProcessingTime(delay))
)
| Map(lambda e: ("key", e))
| GroupByKey()
| Map(print)
)
p.run()
Java
Pipeline p = Pipeline.create(options);
Integer windowLength = 10;
Integer allowLateSize = 5;
Integer delay = 2;
TestStream<String> streamEvents = TestStream.create(StringUtf8Coder.of())
.addElements(
TimestampedValue.of("1", new Instant(0))
)
.advanceWatermarkTo(new Instant(1500))
.advanceProcessingTime(Duration.millis(1500))
.addElements(
TimestampedValue.of("2", new Instant(1000))
)
.advanceWatermarkTo(new Instant(2500))
.advanceProcessingTime(Duration.millis(1000)) // Running Processing time 2.5, it triggers here
.addElements(
TimestampedValue.of("3", new Instant(3000))
)
.advanceWatermarkTo(new Instant(3000))
.advanceProcessingTime(Duration.millis(1500))
.addElements(
TimestampedValue.of("4", new Instant(4000))
)
.advanceWatermarkTo(new Instant(5000))
.advanceProcessingTime(Duration.standardSeconds(2))
.addElements(
TimestampedValue.of("5", new Instant(4000))
)
.advanceWatermarkTo(new Instant(6000))
.advanceProcessingTime(Duration.millis(1500))
.addElements(
TimestampedValue.of("6", new Instant(8000))
)
.advanceWatermarkTo(new Instant(9000))
.advanceProcessingTime(Duration.millis(1500))
.addElements(
TimestampedValue.of("7", new Instant(7000))
)
.advanceWatermarkTo(new Instant(10000))
.advanceProcessingTime(Duration.millis(1500))
.addElements(
TimestampedValue.of("8", new Instant(8000))
)
.advanceWatermarkTo(new Instant(11000))
.advanceProcessingTime(Duration.millis(1500))
.addElements(
TimestampedValue.of("9", new Instant(7000))
)
.advanceWatermarkTo(new Instant(12000))
.advanceProcessingTime(Duration.millis(1500))
.addElements(
TimestampedValue.of("10", new Instant(9000))
)
.advanceWatermarkToInfinity();
p.apply(streamEvents)
.apply("KVs", ParDo.of(new DoFn<String, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of("key", c.element()));
}
}))
.apply(Window.<KV<String, String>>into(
FixedWindows.of(Duration.standardSeconds(windowLength)))
.withAllowedLateness(Duration.standardSeconds(allowLateSize))
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(delay)))
)
.discardingFiredPanes()
)
.apply(GroupByKey.create())
.apply("Log", ParDo.of(new DoFn<KV<String, Iterable<String>>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
LOG.info("\n TRIGGER " + c.element().getValue().toString());
c.output(c.pane().toString());
}
}));
p.run();
The output in Python is two panes ['1', '2', '3', '4'], ['5', '6', '7', '8', '9', '10'] and Java is the "right" output ['1', '2'], ['3', '4'], ['5', '6'], ['7', '8'], ['9', '10'].
The fix doesn't seem hard (worse thing to say ever), but given that users may be using this trigger already, I am not sure how to proceed.
Issue Priority
Priority: 2
Issue Component
Component: sdk-py-core