Skip to content

Non-blocking out-of-core stream utilities#2347

Closed
janniklinde wants to merge 4 commits into
apache:mainfrom
janniklinde:OOCThreading
Closed

Non-blocking out-of-core stream utilities#2347
janniklinde wants to merge 4 commits into
apache:mainfrom
janniklinde:OOCThreading

Conversation

@janniklinde

Copy link
Copy Markdown
Contributor

This patch introduces common OOC stream processing functions such as mapOOC(in, out, mapper) and joinOOC(in1, in2, out, mapper, joinOn). It transitions to a non-blocking, item based scheduling model to eliminate blocking on dequeue operations. To support this, a subscriber system for LocalTaskQueue has been implemented.

To enable parallel processing of ResettableStream, a new CachingStream (replacing ResettableStream) and corresponding PlaybackStream are introduced. This design decouples cache storage from playback, allowing multiple concurrent playbacks over a shared stream. When a CachingStream is used, getStreamHandle() now returns a new PlaybackStream if a CachingStream is present.

@codecov

codecov Bot commented Nov 1, 2025

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 89.55823% with 26 lines in your changes missing coverage. Please review.
✅ Project coverage is 72.29%. Comparing base (d38e56c) to head (44c98c8).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
.../sysds/runtime/instructions/ooc/CachingStream.java 81.63% 6 Missing and 3 partials ⚠️
...sysds/runtime/instructions/ooc/OOCInstruction.java 90.36% 8 Missing ⚠️
.../runtime/controlprogram/parfor/LocalTaskQueue.java 91.17% 1 Missing and 2 partials ⚠️
...in/java/org/apache/sysds/runtime/util/OOCJoin.java 91.66% 1 Missing and 1 partial ⚠️
src/main/java/org/apache/sysds/hops/BinaryOp.java 0.00% 0 Missing and 1 partial ⚠️
.../runtime/controlprogram/caching/CacheableData.java 50.00% 0 Missing and 1 partial ⚠️
.../instructions/ooc/CentralMomentOOCInstruction.java 88.88% 1 Missing ⚠️
...che/sysds/runtime/matrix/operators/CMOperator.java 80.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2347      +/-   ##
============================================
+ Coverage     72.26%   72.29%   +0.02%     
- Complexity    46736    46786      +50     
============================================
  Files          1503     1505       +2     
  Lines        177262   177396     +134     
  Branches      34836    34853      +17     
============================================
+ Hits         128104   128248     +144     
+ Misses        39488    39475      -13     
- Partials       9670     9673       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@mboehm7

mboehm7 commented Nov 1, 2025

Copy link
Copy Markdown
Contributor

Thanks @janniklinde for the initiative creating these primitives. Could you please look into resolving the merge conflicts with the new eviction logic we merged in yesterday? Furthermore, I ran a few experiments with parfor(i in 1:nrow(X), opt=CONSTRAINED, mode=LOCAL, taskpartitioner=NAIVE) to test the overhead (because parfor uses the same task queue), but the runtime fluctuations where larger than the differences between old and new.

@j143

j143 commented Nov 1, 2025

Copy link
Copy Markdown
Member

Hi @janniklinde - it's a great redesign! :) thanks

…e callback handling

Remove unnecessary TODO

Merge OOCEvicitionManager support into CachingStream

Update CachingStream.java

Create subscribable abstractions for OOC streams without affecting LocalTaskQueue

Remove unnecessary implement
@janniklinde

janniklinde commented Nov 3, 2025

Copy link
Copy Markdown
Contributor Author

I have now resolved the conflicts @mboehm7. Additionally, I reverted (most) of the changes of LocalTaskQueue, adding the subscription mechanism to SubscribableTaskQueue, which only extends LocalTaskQueue in the out-of-core context.

Further, I added two abstractions OOCStream<T> and OOCStreamable<T> to allow custom implementations of stream handles without having to extend LocalTaskQueue.

@mboehm7

mboehm7 commented Nov 9, 2025

Copy link
Copy Markdown
Contributor

Thanks @janniklinde for the improved patch. During the merge I only removed unnecessary imports and made some overwritten methods synchronized (which might have caused the flaky tests).

@mboehm7 mboehm7 closed this in 6514887 Nov 9, 2025
@github-project-automation github-project-automation Bot moved this from In Progress to Done in SystemDS PR Queue Nov 9, 2025
@janniklinde janniklinde deleted the OOCThreading branch February 20, 2026 08:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

3 participants