Skip to content

[WIP][BEAM-10670] Make SparkRunner opt-out for using an SDF powered Read transform.#12603

Closed
lukecwik wants to merge 3 commits intoapache:masterfrom
lukecwik:beam10670
Closed

[WIP][BEAM-10670] Make SparkRunner opt-out for using an SDF powered Read transform.#12603
lukecwik wants to merge 3 commits intoapache:masterfrom
lukecwik:beam10670

Conversation

@lukecwik
Copy link
Copy Markdown
Member


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status --- Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels

See CI.md for more information about GitHub Actions CI.

@lukecwik
Copy link
Copy Markdown
Member Author

Run Spark ValidatesRunner

@lukecwik
Copy link
Copy Markdown
Member Author

Run Spark StructuredStreaming ValidatesRunner

@lukecwik
Copy link
Copy Markdown
Member Author

CC: @iemejia I got this passing all the tests but what is the state of streaming pipelines? I see that there is code for evaluating UnboundedSources but don't see any test coverage.

@lukecwik
Copy link
Copy Markdown
Member Author

Run Spark Runner Nexmark Tests

@lukecwik lukecwik changed the title [WIP][BEAM-10670] Make SparkRunner opt-out for using an SDF powered Read transform. [BEAM-10670] Make SparkRunner opt-out for using an SDF powered Read transform. Aug 18, 2020
@lukecwik lukecwik changed the title [BEAM-10670] Make SparkRunner opt-out for using an SDF powered Read transform. [WIP][BEAM-10670] Make SparkRunner opt-out for using an SDF powered Read transform. Aug 18, 2020
Copy link
Copy Markdown
Member

@iemejia iemejia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right Streaming pipelines are not well tested on the Spark Runner, at the moment there is not a full run of the ValidatesRunner suite for streaming on the classic runner (the only one supporting streaming at the moment). I remember there were issues with test pipelines never stopping when we tried to enable them ~2.5 years ago.

The 'consistency' of watermark handling is validated using a Spark specific transform called CreateStream that precedes TestStream but well that's probably not really useful for this use case where I suppose we intend to validate that Read is not broken for both the direct translation and the SDF based one.

I don't have immediately a suggestion for how to do so, maybe try to enable the Read VR Test only for streaming, but still I doubt it will work easily out of the box otherwise maybe add a specific test for the runner temporarily.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@ibzib
Copy link
Copy Markdown

ibzib commented Aug 19, 2020

cc: @ibzib @annaqin418

@lukecwik lukecwik force-pushed the beam10670 branch 3 times, most recently from 920da35 to 92b6e67 Compare September 16, 2020 17:47
@lukecwik
Copy link
Copy Markdown
Member Author

@iemejia I have updated the code and added a SparkProcessedKeyedElements using updateStateByKey to evaluate a splittable DoFn. I based the logic off of the SparkGroupAlsoByWindowViaWindowSet logic. I also added some SplittableDoFnTests and have been using org.apache.beam.runners.spark.translation.streaming.SplittableDoFnTest#testPairWithIndexBasicUnbounded as my base test to get it working. So far I am able to get output produced including having the splittable DoFn state saved and restored for the next round of execution. I do see most of output via the LoggingDoFn that I added to the test but the PAssert is failing because it doesn't see any of the output and it is also triggering too early as not all the output has been produced. Any suggestions as to what to take a look at?

@lukecwik
Copy link
Copy Markdown
Member Author

lukecwik commented Sep 19, 2020

@iemejia I figured out that the issue is that watermark holds aren't implemented for spark so the first batch completes which computes new watermarks so the watermark hold that was set by the splittable dofn implementation is ignored. This leads to timers being dropped and hence only some of the results being produced.

This is also the likely cause for why the PAssert is dropping the elements that were produced as well but I haven't validated this yet.

Can you explain how the GlobalWatermarkHolder works, can I register anything as a sourceId?

Since watermark holds don't seem to be implemented, does the GroupAlsoViaWindowSet hold back the watermark for elements that it currently has buffered?

@iemejia
Copy link
Copy Markdown
Member

iemejia commented Sep 23, 2020

The phenomenon of microbatches producing results early I noticed it too in the past when trying to enable the Read.Unbounded tests. I could not understand why, and I thought it was probably due to some glitch in Spark implementation or us screwing their scheduling but I struggled to debug the issue properly.

Since watermark holds don't seem to be implemented, does the GroupAlsoViaWindowSet hold back the watermark for elements that it currently has buffered?

Probably, at least that may explain some of the inconsistencies.

Can you explain how the GlobalWatermarkHolder works, can I register anything as a sourceId?

In all honesty I am not so familiar with watermark handling on the Spark runner. I took a look at the GlobalWatermarkHolder class and tried to figure out but it was not really evident.

My impression is that the sourceId is aligned somehow with Spark's assigned streamId, but I might be misinterpreting it.
https://github.com/apache/spark/blob/13664434387e338a5029e73a4388943f34e3fc07/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala#L30

I wish I could help more but that part of the code is also not so well documented. I doubt that the original authors of the code still remember the details but maybe they remember at least the intentions of GlobalWatermarkHolder and its use, and maybe if there were any open issues. Just in case 🤞 maybe: @amitsela @aviemzur @staslev

@lukecwik
Copy link
Copy Markdown
Member Author

lukecwik commented Oct 1, 2020

The phenomenon of microbatches producing results early I noticed it too in the past when trying to enable the Read.Unbounded tests. I could not understand why, and I thought it was probably due to some glitch in Spark implementation or us screwing their scheduling but I struggled to debug the issue properly.

Since watermark holds don't seem to be implemented, does the GroupAlsoViaWindowSet hold back the watermark for elements that it currently has buffered?

Probably, at least that may explain some of the inconsistencies.

The Java based trigger implementation relies on this to produce correct results. Implementing this would like enable a bunch of streaming use cases.

Can you explain how the GlobalWatermarkHolder works, can I register anything as a sourceId?

In all honesty I am not so familiar with watermark handling on the Spark runner. I took a look at the GlobalWatermarkHolder class and tried to figure out but it was not really evident.

My impression is that the sourceId is aligned somehow with Spark's assigned streamId, but I might be misinterpreting it.
https://github.com/apache/spark/blob/13664434387e338a5029e73a4388943f34e3fc07/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala#L30

I wish I could help more but that part of the code is also not so well documented. I doubt that the original authors of the code still remember the details but maybe they remember at least the intentions of GlobalWatermarkHolder and its use, and maybe if there were any open issues. Just in case 🤞 maybe: @amitsela @aviemzur @staslev

That would be great if someone could give guidance here.

@lukecwik
Copy link
Copy Markdown
Member Author

lukecwik commented Oct 1, 2020

@iemejia Since streaming is effectively broken due to lack of support for watermark holds. What do you think about enabling SDF for Spark and it only working in batch?

I'll see what I can do about watermark holds but decoupling the two would be convenient.

@iemejia
Copy link
Copy Markdown
Member

iemejia commented Oct 5, 2020

@iemejia Since streaming is effectively broken due to lack of support for watermark holds. What do you think about enabling SDF for Spark and it only working in batch?

Can you be more explicit on what you mean on 'it only working in batch'? Isn't it the current case for Bounded SDF?
Or what you mean is to have now Unbounded Reads translated using SDF still with the same level of functionality than the old translation (no holds)?

@lukecwik
Copy link
Copy Markdown
Member Author

lukecwik commented Oct 5, 2020

@iemejia Since streaming is effectively broken due to lack of support for watermark holds. What do you think about enabling SDF for Spark and it only working in batch?

Can you be more explicit on what you mean on 'it only working in batch'? Isn't it the current case for Bounded SDF?
Or what you mean is to have now Unbounded Reads translated using SDF still with the same level of functionality than the old translation (no holds)?

BoundedSource converted to an SDF will work just as well as the current BoundedSource implementation since they don't rely on watermark holds. The current implementation of UnboundedSource and the new implementation using UnboundedSource as an SDF both set the watermark but triggers don't honor it since there is lack of support for watermark holds.

@iemejia
Copy link
Copy Markdown
Member

iemejia commented Oct 6, 2020

I see, so it is the full switch from Read.Bounded/Unbounded to SDF by default. Can you get this one green so we can test it and then merge it, I would like to see if there is some perf impact, and probably that we document how to get the previous Unbounded translation in case any existing users find any difference.

If I understood correctly you might intend to tackle watermark holds in the 'future'? Just for learning curiosity I assume this will be done in SparkProcessKeyedElements for Gbk/Stateful translation, might this need some extra changes? asking just because I am reading the translation of Portable Streaming runner and I see watermarks are taken into account from Impulse so I was wondering if something was missing here or if this is done in a different place maybe in core.

@lukecwik
Copy link
Copy Markdown
Member Author

lukecwik commented Oct 6, 2020

I see, so it is the full switch from Read.Bounded/Unbounded to SDF by default. Can you get this one green so we can test it and then merge it, I would like to see if there is some perf impact, and probably that we document how to get the previous Unbounded translation in case any existing users find any difference.

If I understood correctly you might intend to tackle watermark holds in the 'future'? Just for learning curiosity I assume this will be done in SparkProcessKeyedElements for Gbk/Stateful translation, might this need some extra changes? asking just because I am reading the translation of Portable Streaming runner and I see watermarks are taken into account from Impulse so I was wondering if something was missing here or if this is done in a different place maybe in core.

I'll try to see what I can get working with the GlobalWatermarkHolder implementation that exists. I think we should be able to use arbitrary ids in it it just might be really slow since the readers/writers should really care about their upstream watermarks (main and side input) so having a global broadcast seems less then desirable.

For now lets break up this change into multiple PRs (Spark already supports bounded SDFs via the SplittableParDoNaiveBounded.OverrideFactory):

  1. Enable impulse ([BEAM-10670] Support impulse within non-portable Spark implementations. #13018)
  2. Swap bounded reads to use SDFs (you can test bounded read as an SDF impact) ([BEAM-10670] Make Spark by default execute Read.Bounded using SplittableDoFn. #13021)
  3. Add support for watermark holds ([WIP] Spark Watermarks #13101)
  4. Enable streaming SDFs using the SparkProcessKeyedElements implementation that is part of this PR
  5. Swap unbounded reads to use SDFs (you can test unbounded read as an SDF impact)

@iemejia
Copy link
Copy Markdown
Member

iemejia commented Oct 6, 2020

Excellent idea to break the PR into little ones, first one merged. waiting for the next!

@stale
Copy link
Copy Markdown

stale bot commented Dec 13, 2020

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Dec 13, 2020
@stale
Copy link
Copy Markdown

stale bot commented Dec 25, 2020

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@stale stale bot closed this Dec 25, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants