From a30a51c5ad83708cb902bd6bf2d11b0808c31025 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 26 Oct 2025 21:08:19 +0530 Subject: [PATCH 1/9] add base implementation for ooc eviction manager --- .../caching/OOCEvictionManager.java | 193 ++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java new file mode 100644 index 00000000000..7888c12f050 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.controlprogram.caching; + +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.data.MatrixIndexes; +import org.apache.sysds.runtime.util.LocalFileUtils; + +import java.io.IOException; +import java.util.Map.Entry; + +/** + * Eviction Manager for the Out-Of-Core stream cache + * This is the base implementation for LRU, FIFO + *

+ * This manager runtime memory management by caching serialized + * ByteBuffers and spilling them to disk when needed. + *

+ * * core function: Caches ByteBuffers (off-heap/direct) and + * spills them to disk + * * Eviction: Evicts a ByteBuffer by writing its contents to a file + * * Granularity: Evicts one IndexedMatrixValue block at a time + * * Data replay: get() will always return the data either from memory or + * by falling back to the disk + * * Memory: Since the datablocks are off-heap (in ByteBuffer) or disk, + * there won't be OOM. + */ +public class OOCEvictionManager { + private static OOCEvictionManager _instance; + + // Configuration: OOC buffer limit as percentage of heap + private static final double OOC_BUFFER_PERCENTAGE = 0.15; // 15% of heap + private static final int MIN_PREFETCH_DEPTH = 1; + private static final int MAX_PREFETCH_DEPTH = 5; + + // Memory limit for ByteBuffers + private long _limit; + private long _size; + + // Cache of ByteBuffers (off-heap serialized blocks) + private CacheEvictionQueue _mQueue; + + // I/O service for async spill/load + private CacheMaintenanceService _fClean; + + // Spill directory for evicted blocks + private String _spillDir; + + public enum RPolicy { + FIFO, LRU + } + private RPolicy _policy = RPolicy.FIFO; + + private OOCEvictionManager() { + _mQueue = new CacheEvictionQueue(); + _fClean = new CacheMaintenanceService(); + _limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE); // e.g., 20% of heap + _size = 0; + _spillDir = LocalFileUtils.getUniqueWorkingDir("ooc_stream"); + LocalFileUtils.createLocalFileIfNotExist(_spillDir); + } + + public static synchronized OOCEvictionManager getInstance() { + if(_instance == null) + _instance = new OOCEvictionManager(); + return _instance; + } + + /** + * Store a block in the OOC cache (serialize once) + */ + public void put(String key, IndexedMatrixValue value) throws IOException { + MatrixBlock mb = (MatrixBlock) value.getValue(); + // Serialize to ByteBuffer + long size = estimateSerializedSize(mb); + ByteBuffer bbuff = new ByteBuffer(size); + + synchronized(_mQueue) { + // Make space + evict(size); + + // Add to cache + _mQueue.addLast(key, bbuff); + _size += size; + } + + // Serialize outside lock + _fClean.serializeData(bbuff, mb); + } + + /** + * Get a block from the OOC cache (deserialize on read) + */ + public IndexedMatrixValue get(String key) throws IOException { + ByteBuffer bbuff = null; + + synchronized(_mQueue) { + bbuff = _mQueue.get(key); + + // LRU: move to end + if(_policy == RPolicy.LRU && bbuff != null) { + _mQueue.remove(key); + _mQueue.addLast(key, bbuff); + } + } + + if(bbuff != null) { + // Cache hit: deserialize from ByteBuffer + bbuff.checkSerialized(); + MatrixBlock mb = (MatrixBlock) bbuff.deserializeBlock(); + + MatrixIndexes ix = parseIndexesFromKey(key); + return new IndexedMatrixValue(ix, mb); + } else { + // Cache miss: load from disk + return loadFromDisk(key); + } + } + + /** + * Evict ByteBuffers to disk + */ + private void evict(long requiredSize) throws IOException { + while(_size + requiredSize > _limit && !_mQueue.isEmpty()) { + Entry entry = _mQueue.removeFirst(); + String key = entry.getKey(); + ByteBuffer bbuff = entry.getValue(); + + if(bbuff != null) { + // Wait for serialization + bbuff.checkSerialized(); + + // Spill to disk + String filename = _spillDir + "/" + key; + bbuff.evictBuffer(filename); + bbuff.freeMemory(); + _size -= bbuff.getSize(); + } + } + } + + /** + * Load block from spill file + */ + private IndexedMatrixValue loadFromDisk(String key) throws IOException { + String filename = _spillDir + "/" + key; + + // check if file exists + if (!LocalFileUtils.isExisting(filename)) { + throw new IOException("File " + filename + " does not exist"); + } + + // Read from disk + MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename); + MatrixIndexes ix = parseIndexesFromKey(key); + + // Put back in cache (may trigger eviction) + put(key, new IndexedMatrixValue(ix, mb)); + + return new IndexedMatrixValue(ix, mb); + } + + private long estimateSerializedSize(MatrixBlock mb) { + return mb.getExactSerializedSize(); + } + + private MatrixIndexes parseIndexesFromKey(String key) { + // Key format: "streamId_blockId" + // For now, use simple sequential block IDs + String[] parts = key.split("_"); + long blockId = Long.parseLong(parts[parts.length - 1]); + // Assume row-major ordering with block size + return new MatrixIndexes(blockId + 1, 1); + } +} \ No newline at end of file From 73d9ceec982b70d4fcf09ef6482e795b277c7823 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 26 Oct 2025 21:36:49 +0530 Subject: [PATCH 2/9] use ooc eviction manager in ResettableStream --- .../instructions/ooc/ResettableStream.java | 52 +++++++++++++++---- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java index 038e1a8b983..7d5726038da 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java @@ -19,14 +19,19 @@ package org.apache.sysds.runtime.instructions.ooc; +import org.apache.sysds.runtime.controlprogram.caching.OOCEvictionManager; import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import java.io.IOException; import java.util.ArrayList; +import java.util.UUID; /** * A wrapper around LocalTaskQueue to consume the source stream and reset to * consume again for other operators. + *

+ * Uses OOCEvictionManager for out-of-core caching. * */ public class ResettableStream extends LocalTaskQueue { @@ -34,16 +39,28 @@ public class ResettableStream extends LocalTaskQueue { // original live stream private final LocalTaskQueue _source; - // in-memory cache to store stream for re-play - private final ArrayList _cache; + // stream identifier + private final String _streamId; + + // list of block keys (only the keys) + private final ArrayList _blockKeys; + // state flags private boolean _cacheInProgress = true; // caching in progress, in the first pass. private int _replayPosition = 0; // slider position in the stream + private OOCEvictionManager _manager; + public ResettableStream(LocalTaskQueue source) { + this(source, UUID.randomUUID().toString()); + } + public ResettableStream(LocalTaskQueue source, String streamId) { _source = source; - _cache = new ArrayList<>(); + _streamId = streamId; + _blockKeys = new ArrayList<>(); +// _cache = new ArrayList<>(); + _manager = OOCEvictionManager.getInstance(); } /** @@ -60,18 +77,33 @@ public synchronized IndexedMatrixValue dequeueTask() // First pass: Read value from the source and cache it, and return. IndexedMatrixValue task = _source.dequeueTask(); if (task != NO_MORE_TASKS) { - _cache.add(new IndexedMatrixValue(task)); + String key = _streamId + "_" + _blockKeys.size(); +// _cache.add(new IndexedMatrixValue(task)); + _blockKeys.add(key); + + try { + _manager.put(key, task); // Serialize + } catch (IOException e) { + throw new RuntimeException(e); + } + return task; } else { _cacheInProgress = false; // caching is complete _source.closeInput(); // close source stream + + // Notify all the waiting consumers waiting for cache to fill with this stream + notifyAll(); + return (IndexedMatrixValue) NO_MORE_TASKS; } - notifyAll(); // Notify all the waiting consumers waiting for cache to fill with this stream - return task; } else { - // Replay pass: read directly from in-memory cache - if (_replayPosition < _cache.size()) { - // Return a copy to ensure consumer won't modify the cache - return new IndexedMatrixValue(_cache.get(_replayPosition++)); +// // Replay pass: read from the buffer + if (_replayPosition < _blockKeys.size()) { + String key = _blockKeys.get(_replayPosition++); + try { + return _manager.get(key); // Deserialize + } catch (IOException ex) { + throw new RuntimeException(ex); + } } else { return (IndexedMatrixValue) NO_MORE_TASKS; } From c90985bc69987571aa435c2532e7eb730b6cbf78 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Wed, 29 Oct 2025 20:27:53 +0530 Subject: [PATCH 3/9] use a static utility --- .../caching/OOCEvictionManager.java | 36 +++++++++---------- .../instructions/ooc/ResettableStream.java | 10 +++--- 2 files changed, 22 insertions(+), 24 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java index 7888c12f050..7f48774cbb2 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java @@ -44,7 +44,7 @@ * there won't be OOM. */ public class OOCEvictionManager { - private static OOCEvictionManager _instance; +// private static OOCEvictionManager _instance; // Configuration: OOC buffer limit as percentage of heap private static final double OOC_BUFFER_PERCENTAGE = 0.15; // 15% of heap @@ -52,24 +52,26 @@ public class OOCEvictionManager { private static final int MAX_PREFETCH_DEPTH = 5; // Memory limit for ByteBuffers - private long _limit; - private long _size; + private static long _limit; + private static long _size; // Cache of ByteBuffers (off-heap serialized blocks) - private CacheEvictionQueue _mQueue; + private static CacheEvictionQueue _mQueue; // I/O service for async spill/load - private CacheMaintenanceService _fClean; + private static CacheMaintenanceService _fClean; // Spill directory for evicted blocks - private String _spillDir; + private static String _spillDir; public enum RPolicy { FIFO, LRU } - private RPolicy _policy = RPolicy.FIFO; + private static RPolicy _policy = RPolicy.FIFO; - private OOCEvictionManager() { + private OOCEvictionManager() {} + + static { _mQueue = new CacheEvictionQueue(); _fClean = new CacheMaintenanceService(); _limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE); // e.g., 20% of heap @@ -78,16 +80,10 @@ private OOCEvictionManager() { LocalFileUtils.createLocalFileIfNotExist(_spillDir); } - public static synchronized OOCEvictionManager getInstance() { - if(_instance == null) - _instance = new OOCEvictionManager(); - return _instance; - } - /** * Store a block in the OOC cache (serialize once) */ - public void put(String key, IndexedMatrixValue value) throws IOException { + public static synchronized void put(String key, IndexedMatrixValue value) throws IOException { MatrixBlock mb = (MatrixBlock) value.getValue(); // Serialize to ByteBuffer long size = estimateSerializedSize(mb); @@ -109,7 +105,7 @@ public void put(String key, IndexedMatrixValue value) throws IOException { /** * Get a block from the OOC cache (deserialize on read) */ - public IndexedMatrixValue get(String key) throws IOException { + public static synchronized IndexedMatrixValue get(String key) throws IOException { ByteBuffer bbuff = null; synchronized(_mQueue) { @@ -138,7 +134,7 @@ public IndexedMatrixValue get(String key) throws IOException { /** * Evict ByteBuffers to disk */ - private void evict(long requiredSize) throws IOException { + private static void evict(long requiredSize) throws IOException { while(_size + requiredSize > _limit && !_mQueue.isEmpty()) { Entry entry = _mQueue.removeFirst(); String key = entry.getKey(); @@ -160,7 +156,7 @@ private void evict(long requiredSize) throws IOException { /** * Load block from spill file */ - private IndexedMatrixValue loadFromDisk(String key) throws IOException { + private static IndexedMatrixValue loadFromDisk(String key) throws IOException { String filename = _spillDir + "/" + key; // check if file exists @@ -178,11 +174,11 @@ private IndexedMatrixValue loadFromDisk(String key) throws IOException { return new IndexedMatrixValue(ix, mb); } - private long estimateSerializedSize(MatrixBlock mb) { + private static long estimateSerializedSize(MatrixBlock mb) { return mb.getExactSerializedSize(); } - private MatrixIndexes parseIndexesFromKey(String key) { + private static MatrixIndexes parseIndexesFromKey(String key) { // Key format: "streamId_blockId" // For now, use simple sequential block IDs String[] parts = key.split("_"); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java index 7d5726038da..dedbff0cdab 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java @@ -50,7 +50,7 @@ public class ResettableStream extends LocalTaskQueue { private boolean _cacheInProgress = true; // caching in progress, in the first pass. private int _replayPosition = 0; // slider position in the stream - private OOCEvictionManager _manager; +// private OOCEvictionManager _manager; public ResettableStream(LocalTaskQueue source) { this(source, UUID.randomUUID().toString()); @@ -60,7 +60,7 @@ public ResettableStream(LocalTaskQueue source, String stream _streamId = streamId; _blockKeys = new ArrayList<>(); // _cache = new ArrayList<>(); - _manager = OOCEvictionManager.getInstance(); +// _manager = OOCEvictionManager.getInstance(); } /** @@ -82,7 +82,8 @@ public synchronized IndexedMatrixValue dequeueTask() _blockKeys.add(key); try { - _manager.put(key, task); // Serialize + OOCEvictionManager.put(key, task); +// _manager.put(key, task); // Serialize } catch (IOException e) { throw new RuntimeException(e); } @@ -100,7 +101,8 @@ public synchronized IndexedMatrixValue dequeueTask() if (_replayPosition < _blockKeys.size()) { String key = _blockKeys.get(_replayPosition++); try { - return _manager.get(key); // Deserialize + return OOCEvictionManager.get(key); +// return _manager.get(key); // Deserialize } catch (IOException ex) { throw new RuntimeException(ex); } From 06dbebe4a4abcca590b5155ee608bf6d71ac8f17 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 30 Oct 2025 07:53:58 +0530 Subject: [PATCH 4/9] use IDSequence instead of UUID --- .../runtime/instructions/ooc/ResettableStream.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java index dedbff0cdab..153ad1d74e4 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java @@ -21,11 +21,12 @@ import org.apache.sysds.runtime.controlprogram.caching.OOCEvictionManager; import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; +import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import java.io.IOException; import java.util.ArrayList; -import java.util.UUID; +//import java.util.UUID; /** * A wrapper around LocalTaskQueue to consume the source stream and reset to @@ -39,8 +40,9 @@ public class ResettableStream extends LocalTaskQueue { // original live stream private final LocalTaskQueue _source; + private static final IDSequence _streamSeq = new IDSequence(); // stream identifier - private final String _streamId; + private final long _streamId; // list of block keys (only the keys) private final ArrayList _blockKeys; @@ -53,9 +55,9 @@ public class ResettableStream extends LocalTaskQueue { // private OOCEvictionManager _manager; public ResettableStream(LocalTaskQueue source) { - this(source, UUID.randomUUID().toString()); + this(source, _streamSeq.getNextID()); } - public ResettableStream(LocalTaskQueue source, String streamId) { + public ResettableStream(LocalTaskQueue source, long streamId) { _source = source; _streamId = streamId; _blockKeys = new ArrayList<>(); From 2d697ef92f880c1fb738029dcf3f1ad214f6840a Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 30 Oct 2025 09:13:12 +0530 Subject: [PATCH 5/9] do error handling in get(), put() methods themselves --- .../caching/OOCEvictionManager.java | 82 +++++++++++-------- .../instructions/ooc/ResettableStream.java | 25 +----- 2 files changed, 51 insertions(+), 56 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java index 7f48774cbb2..4c82ba98a23 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java @@ -19,6 +19,7 @@ package org.apache.sysds.runtime.controlprogram.caching; +import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; @@ -44,7 +45,6 @@ * there won't be OOM. */ public class OOCEvictionManager { -// private static OOCEvictionManager _instance; // Configuration: OOC buffer limit as percentage of heap private static final double OOC_BUFFER_PERCENTAGE = 0.15; // 15% of heap @@ -74,7 +74,7 @@ private OOCEvictionManager() {} static { _mQueue = new CacheEvictionQueue(); _fClean = new CacheMaintenanceService(); - _limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE); // e.g., 20% of heap + _limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE * 0.01); // e.g., 20% of heap _size = 0; _spillDir = LocalFileUtils.getUniqueWorkingDir("ooc_stream"); LocalFileUtils.createLocalFileIfNotExist(_spillDir); @@ -83,51 +83,61 @@ private OOCEvictionManager() {} /** * Store a block in the OOC cache (serialize once) */ - public static synchronized void put(String key, IndexedMatrixValue value) throws IOException { - MatrixBlock mb = (MatrixBlock) value.getValue(); - // Serialize to ByteBuffer - long size = estimateSerializedSize(mb); - ByteBuffer bbuff = new ByteBuffer(size); - - synchronized(_mQueue) { - // Make space - evict(size); - - // Add to cache - _mQueue.addLast(key, bbuff); - _size += size; - } + public static synchronized void put(String key, IndexedMatrixValue value) { + try { + MatrixBlock mb = (MatrixBlock) value.getValue(); + // Serialize to ByteBuffer + long size = estimateSerializedSize(mb); + ByteBuffer bbuff = new ByteBuffer(size); + + synchronized (_mQueue) { + // Make space + evict(size); + + // Add to cache + _mQueue.addLast(key, bbuff); + _size += size; + } - // Serialize outside lock - _fClean.serializeData(bbuff, mb); + // Serialize outside lock + _fClean.serializeData(bbuff, mb); + } + catch(Exception e) { + throw new DMLRuntimeException(e); + } } /** * Get a block from the OOC cache (deserialize on read) */ - public static synchronized IndexedMatrixValue get(String key) throws IOException { + public static synchronized IndexedMatrixValue get(String key) { ByteBuffer bbuff = null; - synchronized(_mQueue) { - bbuff = _mQueue.get(key); + try { + synchronized (_mQueue) { + bbuff = _mQueue.get(key); - // LRU: move to end - if(_policy == RPolicy.LRU && bbuff != null) { - _mQueue.remove(key); - _mQueue.addLast(key, bbuff); + // LRU: move to end + if (_policy == RPolicy.LRU && bbuff != null) { + _mQueue.remove(key); + _mQueue.addLast(key, bbuff); + } } - } - if(bbuff != null) { - // Cache hit: deserialize from ByteBuffer - bbuff.checkSerialized(); - MatrixBlock mb = (MatrixBlock) bbuff.deserializeBlock(); + if (bbuff != null) { + // Cache hit: deserialize from ByteBuffer + bbuff.checkSerialized(); + MatrixBlock mb = (MatrixBlock) bbuff.deserializeBlock(); - MatrixIndexes ix = parseIndexesFromKey(key); - return new IndexedMatrixValue(ix, mb); - } else { - // Cache miss: load from disk - return loadFromDisk(key); + MatrixIndexes ix = parseIndexesFromKey(key); + return new IndexedMatrixValue(ix, mb); + } else { + // Cache miss: load from disk + return loadFromDisk(key); + } + } + catch (IOException e) { + throw new DMLRuntimeException(e); } } @@ -136,6 +146,7 @@ public static synchronized IndexedMatrixValue get(String key) throws IOException */ private static void evict(long requiredSize) throws IOException { while(_size + requiredSize > _limit && !_mQueue.isEmpty()) { + System.out.println("_size + requiredSize: " + _size +" + "+ requiredSize + "; _limit: " + _limit); Entry entry = _mQueue.removeFirst(); String key = entry.getKey(); ByteBuffer bbuff = entry.getValue(); @@ -146,6 +157,7 @@ private static void evict(long requiredSize) throws IOException { // Spill to disk String filename = _spillDir + "/" + key; + System.out.println("Evicting directory: "+ filename); bbuff.evictBuffer(filename); bbuff.freeMemory(); _size -= bbuff.getSize(); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java index 153ad1d74e4..8b0de3c8f40 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java @@ -24,9 +24,7 @@ import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; -import java.io.IOException; import java.util.ArrayList; -//import java.util.UUID; /** * A wrapper around LocalTaskQueue to consume the source stream and reset to @@ -52,8 +50,6 @@ public class ResettableStream extends LocalTaskQueue { private boolean _cacheInProgress = true; // caching in progress, in the first pass. private int _replayPosition = 0; // slider position in the stream -// private OOCEvictionManager _manager; - public ResettableStream(LocalTaskQueue source) { this(source, _streamSeq.getNextID()); } @@ -61,8 +57,6 @@ public ResettableStream(LocalTaskQueue source, long streamId _source = source; _streamId = streamId; _blockKeys = new ArrayList<>(); -// _cache = new ArrayList<>(); -// _manager = OOCEvictionManager.getInstance(); } /** @@ -70,7 +64,6 @@ public ResettableStream(LocalTaskQueue source, long streamId * For subsequent passes it reads from the memory. * * @return The next matrix value in the stream, or NO_MORE_TASKS - * @throws InterruptedException */ @Override public synchronized IndexedMatrixValue dequeueTask() @@ -80,15 +73,10 @@ public synchronized IndexedMatrixValue dequeueTask() IndexedMatrixValue task = _source.dequeueTask(); if (task != NO_MORE_TASKS) { String key = _streamId + "_" + _blockKeys.size(); -// _cache.add(new IndexedMatrixValue(task)); _blockKeys.add(key); - try { - OOCEvictionManager.put(key, task); -// _manager.put(key, task); // Serialize - } catch (IOException e) { - throw new RuntimeException(e); - } + OOCEvictionManager.put(key, task); + return task; } else { _cacheInProgress = false; // caching is complete @@ -99,15 +87,10 @@ public synchronized IndexedMatrixValue dequeueTask() return (IndexedMatrixValue) NO_MORE_TASKS; } } else { -// // Replay pass: read from the buffer + // Replay pass: read from the buffer if (_replayPosition < _blockKeys.size()) { String key = _blockKeys.get(_replayPosition++); - try { - return OOCEvictionManager.get(key); -// return _manager.get(key); // Deserialize - } catch (IOException ex) { - throw new RuntimeException(ex); - } + return OOCEvictionManager.get(key); } else { return (IndexedMatrixValue) NO_MORE_TASKS; } From ee76d7d8f161802fbe04e341a9da3fb20cc314c1 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 30 Oct 2025 09:45:02 +0530 Subject: [PATCH 6/9] use streamId, block number --- .../caching/OOCEvictionManager.java | 34 ++++++++----------- .../instructions/ooc/ResettableStream.java | 15 ++++---- 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java index 4c82ba98a23..657dc34ba53 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java @@ -26,6 +26,8 @@ import org.apache.sysds.runtime.util.LocalFileUtils; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Map.Entry; /** @@ -48,8 +50,6 @@ public class OOCEvictionManager { // Configuration: OOC buffer limit as percentage of heap private static final double OOC_BUFFER_PERCENTAGE = 0.15; // 15% of heap - private static final int MIN_PREFETCH_DEPTH = 1; - private static final int MAX_PREFETCH_DEPTH = 5; // Memory limit for ByteBuffers private static long _limit; @@ -58,6 +58,7 @@ public class OOCEvictionManager { // Cache of ByteBuffers (off-heap serialized blocks) private static CacheEvictionQueue _mQueue; +// private static Map> cache = new HashMap<>(); // I/O service for async spill/load private static CacheMaintenanceService _fClean; @@ -83,7 +84,7 @@ private OOCEvictionManager() {} /** * Store a block in the OOC cache (serialize once) */ - public static synchronized void put(String key, IndexedMatrixValue value) { + public static synchronized void put(long streamId, int blockId, IndexedMatrixValue value) { try { MatrixBlock mb = (MatrixBlock) value.getValue(); // Serialize to ByteBuffer @@ -95,7 +96,7 @@ public static synchronized void put(String key, IndexedMatrixValue value) { evict(size); // Add to cache - _mQueue.addLast(key, bbuff); + _mQueue.addLast(streamId + "_" + blockId, bbuff); _size += size; } @@ -110,8 +111,9 @@ public static synchronized void put(String key, IndexedMatrixValue value) { /** * Get a block from the OOC cache (deserialize on read) */ - public static synchronized IndexedMatrixValue get(String key) { + public static synchronized IndexedMatrixValue get(long streamId, int blockId) { ByteBuffer bbuff = null; + String key = streamId + "_" + blockId; try { synchronized (_mQueue) { @@ -129,11 +131,11 @@ public static synchronized IndexedMatrixValue get(String key) { bbuff.checkSerialized(); MatrixBlock mb = (MatrixBlock) bbuff.deserializeBlock(); - MatrixIndexes ix = parseIndexesFromKey(key); + MatrixIndexes ix = new MatrixIndexes(blockId + 1, 1); return new IndexedMatrixValue(ix, mb); } else { // Cache miss: load from disk - return loadFromDisk(key); + return loadFromDisk(streamId, blockId); } } catch (IOException e) { @@ -168,8 +170,8 @@ private static void evict(long requiredSize) throws IOException { /** * Load block from spill file */ - private static IndexedMatrixValue loadFromDisk(String key) throws IOException { - String filename = _spillDir + "/" + key; + private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) throws IOException { + String filename = _spillDir + "/" + streamId + "_" + blockId; // check if file exists if (!LocalFileUtils.isExisting(filename)) { @@ -178,10 +180,12 @@ private static IndexedMatrixValue loadFromDisk(String key) throws IOException { // Read from disk MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename); - MatrixIndexes ix = parseIndexesFromKey(key); + + MatrixIndexes ix = new MatrixIndexes(blockId + 1, 1); // Put back in cache (may trigger eviction) - put(key, new IndexedMatrixValue(ix, mb)); + // get() operation should not modify cache + // put(streamId, blockId, new IndexedMatrixValue(ix, mb)); return new IndexedMatrixValue(ix, mb); } @@ -190,12 +194,4 @@ private static long estimateSerializedSize(MatrixBlock mb) { return mb.getExactSerializedSize(); } - private static MatrixIndexes parseIndexesFromKey(String key) { - // Key format: "streamId_blockId" - // For now, use simple sequential block IDs - String[] parts = key.split("_"); - long blockId = Long.parseLong(parts[parts.length - 1]); - // Assume row-major ordering with block size - return new MatrixIndexes(blockId + 1, 1); - } } \ No newline at end of file diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java index 8b0de3c8f40..f54df974e97 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java @@ -43,7 +43,8 @@ public class ResettableStream extends LocalTaskQueue { private final long _streamId; // list of block keys (only the keys) - private final ArrayList _blockKeys; +// private final ArrayList _blockKeys; + private int _numBlocks = 0; // state flags @@ -56,7 +57,7 @@ public ResettableStream(LocalTaskQueue source) { public ResettableStream(LocalTaskQueue source, long streamId) { _source = source; _streamId = streamId; - _blockKeys = new ArrayList<>(); +// _blockKeys = new ArrayList<>(); } /** @@ -72,10 +73,9 @@ public synchronized IndexedMatrixValue dequeueTask() // First pass: Read value from the source and cache it, and return. IndexedMatrixValue task = _source.dequeueTask(); if (task != NO_MORE_TASKS) { - String key = _streamId + "_" + _blockKeys.size(); - _blockKeys.add(key); - OOCEvictionManager.put(key, task); + OOCEvictionManager.put(_streamId, _numBlocks, task); + _numBlocks++; return task; } else { @@ -88,9 +88,8 @@ public synchronized IndexedMatrixValue dequeueTask() } } else { // Replay pass: read from the buffer - if (_replayPosition < _blockKeys.size()) { - String key = _blockKeys.get(_replayPosition++); - return OOCEvictionManager.get(key); + if (_replayPosition < _numBlocks) { + return OOCEvictionManager.get(_streamId, _replayPosition++); } else { return (IndexedMatrixValue) NO_MORE_TASKS; } From f54908492e9edc9700409cef118b21bf802ca326 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 30 Oct 2025 22:55:47 +0530 Subject: [PATCH 7/9] working tests both --- .../controlprogram/caching/OOCEvictionManager.java | 8 +++++++- .../sysds/runtime/instructions/ooc/ResettableStream.java | 5 +---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java index 657dc34ba53..72acc2951de 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java @@ -25,10 +25,12 @@ import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.util.LocalFileUtils; +import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicLong; /** * Eviction Manager for the Out-Of-Core stream cache @@ -53,7 +55,7 @@ public class OOCEvictionManager { // Memory limit for ByteBuffers private static long _limit; - private static long _size; + private static AtomicLong _size; // Cache of ByteBuffers (off-heap serialized blocks) private static CacheEvictionQueue _mQueue; @@ -159,6 +161,10 @@ private static void evict(long requiredSize) throws IOException { // Spill to disk String filename = _spillDir + "/" + key; + File spillDirFile = new File(_spillDir); + if (!spillDirFile.exists()) { + spillDirFile.mkdirs(); + } System.out.println("Evicting directory: "+ filename); bbuff.evictBuffer(filename); bbuff.freeMemory(); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java index f54df974e97..ebf2a94be5d 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java @@ -24,7 +24,6 @@ import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; -import java.util.ArrayList; /** * A wrapper around LocalTaskQueue to consume the source stream and reset to @@ -42,8 +41,7 @@ public class ResettableStream extends LocalTaskQueue { // stream identifier private final long _streamId; - // list of block keys (only the keys) -// private final ArrayList _blockKeys; + // block counter private int _numBlocks = 0; @@ -57,7 +55,6 @@ public ResettableStream(LocalTaskQueue source) { public ResettableStream(LocalTaskQueue source, long streamId) { _source = source; _streamId = streamId; -// _blockKeys = new ArrayList<>(); } /** From 8e745882324dbef0aa7e2a8157d9cf6ad960d12b Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 30 Oct 2025 23:53:17 +0530 Subject: [PATCH 8/9] use jvm cache design --- .../caching/OOCEvictionManager.java | 129 ++++++++++-------- 1 file changed, 69 insertions(+), 60 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java index 72acc2951de..a6483b3850f 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java @@ -27,14 +27,28 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.HashMap; import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.atomic.AtomicLong; /** * Eviction Manager for the Out-Of-Core stream cache * This is the base implementation for LRU, FIFO + * + * Design choice 1: Pure JVM-memory cache + * What: Store MatrixBlock objects in a synchronized in-memory cache + * (Map + Deque for LRU/FIFO). Spill to disk by serializing MatrixBlock + * only when evicting. + * Pros: Simple to implement; no off-heap management; easy to debug; + * no serialization race since you serialize only when evicting; + * fast cache hits (direct object access). + * Cons: Heap usage counted roughly via serialized-size estimate — actual + * JVM object overhead not accounted; risk of GC pressure and OOM if + * estimates are off or if many small objects cause fragmentation; + * eviction may be more expensive (serialize on eviction). + *

+ * Design choice 2: *

* This manager runtime memory management by caching serialized * ByteBuffers and spilling them to disk when needed. @@ -47,6 +61,12 @@ * by falling back to the disk * * Memory: Since the datablocks are off-heap (in ByteBuffer) or disk, * there won't be OOM. + * + * Pros: Avoids heap OOM by keeping large data off-heap; predictable + * memory usage; good for very large blocks. + * Cons: More complex synchronization; need robust off-heap allocator/free; + * must ensure serialization finishes before adding to queue or make evict + * wait on serialization; careful with native memory leaks. */ public class OOCEvictionManager { @@ -55,14 +75,14 @@ public class OOCEvictionManager { // Memory limit for ByteBuffers private static long _limit; - private static AtomicLong _size; + private static long _size; - // Cache of ByteBuffers (off-heap serialized blocks) - private static CacheEvictionQueue _mQueue; + // Cache structures: map key -> MatrixBlock and eviction deque (head=oldest block) + private static final Map _cache = new HashMap<>(); + private static final Deque _evictDeque = new ArrayDeque<>(); -// private static Map> cache = new HashMap<>(); - // I/O service for async spill/load - private static CacheMaintenanceService _fClean; + // Single lock for synchronization + private static final Object lock = new Object(); // Spill directory for evicted blocks private static String _spillDir; @@ -75,8 +95,6 @@ public enum RPolicy { private OOCEvictionManager() {} static { - _mQueue = new CacheEvictionQueue(); - _fClean = new CacheMaintenanceService(); _limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE * 0.01); // e.g., 20% of heap _size = 0; _spillDir = LocalFileUtils.getUniqueWorkingDir("ooc_stream"); @@ -87,26 +105,26 @@ private OOCEvictionManager() {} * Store a block in the OOC cache (serialize once) */ public static synchronized void put(long streamId, int blockId, IndexedMatrixValue value) { - try { - MatrixBlock mb = (MatrixBlock) value.getValue(); - // Serialize to ByteBuffer - long size = estimateSerializedSize(mb); - ByteBuffer bbuff = new ByteBuffer(size); - - synchronized (_mQueue) { - // Make space - evict(size); + MatrixBlock mb = (MatrixBlock) value.getValue(); + long size = estimateSerializedSize(mb); + String key = streamId + "_" + blockId; - // Add to cache - _mQueue.addLast(streamId + "_" + blockId, bbuff); - _size += size; + synchronized (lock) { + MatrixBlock old = _cache.remove(key); + if (old != null) { + _evictDeque.remove(key); + _size -= estimateSerializedSize(old); } - // Serialize outside lock - _fClean.serializeData(bbuff, mb); - } - catch(Exception e) { - throw new DMLRuntimeException(e); + try { + evict(size); + } catch (IOException e) { + throw new DMLRuntimeException(e); + } + + _cache.put(key, mb); + _evictDeque.addLast(key); // add to end for FIFO/LRU + _size += size; } } @@ -114,61 +132,52 @@ public static synchronized void put(long streamId, int blockId, IndexedMatrixVal * Get a block from the OOC cache (deserialize on read) */ public static synchronized IndexedMatrixValue get(long streamId, int blockId) { - ByteBuffer bbuff = null; String key = streamId + "_" + blockId; + MatrixBlock mb = (MatrixBlock) _cache.get(key); - try { - synchronized (_mQueue) { - bbuff = _mQueue.get(key); - - // LRU: move to end - if (_policy == RPolicy.LRU && bbuff != null) { - _mQueue.remove(key); - _mQueue.addLast(key, bbuff); - } + synchronized (lock) { + if (mb != null && _policy == RPolicy.LRU) { + _evictDeque.remove(key); + _evictDeque.addLast(key); } + } - if (bbuff != null) { - // Cache hit: deserialize from ByteBuffer - bbuff.checkSerialized(); - MatrixBlock mb = (MatrixBlock) bbuff.deserializeBlock(); - - MatrixIndexes ix = new MatrixIndexes(blockId + 1, 1); - return new IndexedMatrixValue(ix, mb); - } else { - // Cache miss: load from disk + if (mb != null) { + MatrixIndexes ix = new MatrixIndexes(blockId + 1, 1); + return new IndexedMatrixValue(ix, mb); + } else { + try { return loadFromDisk(streamId, blockId); + } catch (IOException e) { + throw new DMLRuntimeException(e); } } - catch (IOException e) { - throw new DMLRuntimeException(e); - } + } /** * Evict ByteBuffers to disk */ private static void evict(long requiredSize) throws IOException { - while(_size + requiredSize > _limit && !_mQueue.isEmpty()) { + while(_size + requiredSize > _limit && !_evictDeque.isEmpty()) { System.out.println("_size + requiredSize: " + _size +" + "+ requiredSize + "; _limit: " + _limit); - Entry entry = _mQueue.removeFirst(); - String key = entry.getKey(); - ByteBuffer bbuff = entry.getValue(); + String oldKey = _evictDeque.removeLast(); + MatrixBlock mbToEvict = (MatrixBlock) _cache.remove(oldKey); - if(bbuff != null) { - // Wait for serialization - bbuff.checkSerialized(); + if(mbToEvict != null) { // Spill to disk - String filename = _spillDir + "/" + key; + String filename = _spillDir + "/" + oldKey; File spillDirFile = new File(_spillDir); if (!spillDirFile.exists()) { spillDirFile.mkdirs(); } + + LocalFileUtils.writeMatrixBlockToLocal(filename, mbToEvict); System.out.println("Evicting directory: "+ filename); - bbuff.evictBuffer(filename); - bbuff.freeMemory(); - _size -= bbuff.getSize(); + + long freedSize = estimateSerializedSize(mbToEvict); + _size -= freedSize; } } } From 3926c1c3d77b83b1d28607df07aa4805d1d6fa48 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Fri, 31 Oct 2025 18:37:34 +0530 Subject: [PATCH 9/9] use indexedmatrixvalue and check with actions --- .../caching/OOCEvictionManager.java | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java index a6483b3850f..2707b9e6eb0 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/OOCEvictionManager.java @@ -78,7 +78,7 @@ public class OOCEvictionManager { private static long _size; // Cache structures: map key -> MatrixBlock and eviction deque (head=oldest block) - private static final Map _cache = new HashMap<>(); + private static final Map _cache = new HashMap<>(); private static final Deque _evictDeque = new ArrayDeque<>(); // Single lock for synchronization @@ -110,10 +110,10 @@ public static synchronized void put(long streamId, int blockId, IndexedMatrixVal String key = streamId + "_" + blockId; synchronized (lock) { - MatrixBlock old = _cache.remove(key); + IndexedMatrixValue old = _cache.remove(key); // remove old value if (old != null) { _evictDeque.remove(key); - _size -= estimateSerializedSize(old); + _size -= estimateSerializedSize((MatrixBlock) old.getValue()); } try { @@ -122,7 +122,7 @@ public static synchronized void put(long streamId, int blockId, IndexedMatrixVal throw new DMLRuntimeException(e); } - _cache.put(key, mb); + _cache.put(key, value); // put new value _evictDeque.addLast(key); // add to end for FIFO/LRU _size += size; } @@ -132,19 +132,19 @@ public static synchronized void put(long streamId, int blockId, IndexedMatrixVal * Get a block from the OOC cache (deserialize on read) */ public static synchronized IndexedMatrixValue get(long streamId, int blockId) { + String key = streamId + "_" + blockId; - MatrixBlock mb = (MatrixBlock) _cache.get(key); + IndexedMatrixValue imv = _cache.get(key); synchronized (lock) { - if (mb != null && _policy == RPolicy.LRU) { + if (imv != null && _policy == RPolicy.LRU) { _evictDeque.remove(key); _evictDeque.addLast(key); } } - if (mb != null) { - MatrixIndexes ix = new MatrixIndexes(blockId + 1, 1); - return new IndexedMatrixValue(ix, mb); + if (imv != null) { + return imv; } else { try { return loadFromDisk(streamId, blockId); @@ -162,23 +162,23 @@ private static void evict(long requiredSize) throws IOException { while(_size + requiredSize > _limit && !_evictDeque.isEmpty()) { System.out.println("_size + requiredSize: " + _size +" + "+ requiredSize + "; _limit: " + _limit); String oldKey = _evictDeque.removeLast(); - MatrixBlock mbToEvict = (MatrixBlock) _cache.remove(oldKey); + IndexedMatrixValue toEvict = _cache.remove(oldKey); + + if (toEvict == null) { continue;} + MatrixBlock mbToEvict = (MatrixBlock) toEvict.getValue(); - if(mbToEvict != null) { + // Spill to disk + String filename = _spillDir + "/" + oldKey; + File spillDirFile = new File(_spillDir); + if (!spillDirFile.exists()) { + spillDirFile.mkdirs(); + } - // Spill to disk - String filename = _spillDir + "/" + oldKey; - File spillDirFile = new File(_spillDir); - if (!spillDirFile.exists()) { - spillDirFile.mkdirs(); - } + LocalFileUtils.writeMatrixBlockToLocal(filename, mbToEvict); - LocalFileUtils.writeMatrixBlockToLocal(filename, mbToEvict); - System.out.println("Evicting directory: "+ filename); + long freedSize = estimateSerializedSize(mbToEvict); + _size -= freedSize; - long freedSize = estimateSerializedSize(mbToEvict); - _size -= freedSize; - } } }