From 8c667b56b4ead43c1fc2fe7663f2a7d3f5850dc5 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Fri, 31 Oct 2025 15:10:10 +0100 Subject: [PATCH 1/2] [SYSTEMDS-3930] Basic OOC eviction of intermediate streams Closes #2343. --- .../instructions/ooc/OOCEvictionManager.java | 211 ++++++++++++++++++ .../instructions/ooc/ResettableStream.java | 38 +++- 2 files changed, 237 insertions(+), 12 deletions(-) create mode 100644 src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java new file mode 100644 index 00000000000..8a94209a9ec --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.instructions.ooc; + +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.data.MatrixIndexes; +import org.apache.sysds.runtime.util.LocalFileUtils; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + +/** + * Eviction Manager for the Out-Of-Core stream cache + * This is the base implementation for LRU, FIFO + * + * Design choice 1: Pure JVM-memory cache + * What: Store MatrixBlock objects in a synchronized in-memory cache + * (Map + Deque for LRU/FIFO). Spill to disk by serializing MatrixBlock + * only when evicting. + * Pros: Simple to implement; no off-heap management; easy to debug; + * no serialization race since you serialize only when evicting; + * fast cache hits (direct object access). + * Cons: Heap usage counted roughly via serialized-size estimate — actual + * JVM object overhead not accounted; risk of GC pressure and OOM if + * estimates are off or if many small objects cause fragmentation; + * eviction may be more expensive (serialize on eviction). + *

+ * Design choice 2: + *

+ * This manager runtime memory management by caching serialized + * ByteBuffers and spilling them to disk when needed. + *

+ * * core function: Caches ByteBuffers (off-heap/direct) and + * spills them to disk + * * Eviction: Evicts a ByteBuffer by writing its contents to a file + * * Granularity: Evicts one IndexedMatrixValue block at a time + * * Data replay: get() will always return the data either from memory or + * by falling back to the disk + * * Memory: Since the datablocks are off-heap (in ByteBuffer) or disk, + * there won't be OOM. + * + * Pros: Avoids heap OOM by keeping large data off-heap; predictable + * memory usage; good for very large blocks. + * Cons: More complex synchronization; need robust off-heap allocator/free; + * must ensure serialization finishes before adding to queue or make evict + * wait on serialization; careful with native memory leaks. + */ +public class OOCEvictionManager { + + // Configuration: OOC buffer limit as percentage of heap + private static final double OOC_BUFFER_PERCENTAGE = 0.15; // 15% of heap + + // Memory limit for ByteBuffers + private static long _limit; + private static long _size; + + // Cache structures: map key -> MatrixBlock and eviction deque (head=oldest block) + private static final Map _cache = new HashMap<>(); + private static final Deque _evictDeque = new ArrayDeque<>(); + + // Single lock for synchronization + private static final Object lock = new Object(); + + // Spill directory for evicted blocks + private static String _spillDir; + + public enum RPolicy { + FIFO, LRU + } + private static RPolicy _policy = RPolicy.FIFO; + + private OOCEvictionManager() {} + + static { + _limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE * 0.01); // e.g., 20% of heap + _size = 0; + _spillDir = LocalFileUtils.getUniqueWorkingDir("ooc_stream"); + LocalFileUtils.createLocalFileIfNotExist(_spillDir); + } + + /** + * Store a block in the OOC cache (serialize once) + */ + public static synchronized void put(long streamId, int blockId, IndexedMatrixValue value) { + MatrixBlock mb = (MatrixBlock) value.getValue(); + long size = estimateSerializedSize(mb); + String key = streamId + "_" + blockId; + + synchronized (lock) { + IndexedMatrixValue old = _cache.remove(key); // remove old value + if (old != null) { + _evictDeque.remove(key); + _size -= estimateSerializedSize((MatrixBlock) old.getValue()); + } + + try { + evict(size); + } catch (IOException e) { + throw new DMLRuntimeException(e); + } + + _cache.put(key, value); // put new value + _evictDeque.addLast(key); // add to end for FIFO/LRU + _size += size; + } + } + + /** + * Get a block from the OOC cache (deserialize on read) + */ + public static synchronized IndexedMatrixValue get(long streamId, int blockId) { + + String key = streamId + "_" + blockId; + IndexedMatrixValue imv = _cache.get(key); + + synchronized (lock) { + if (imv != null && _policy == RPolicy.LRU) { + _evictDeque.remove(key); + _evictDeque.addLast(key); + } + } + + if (imv != null) { + return imv; + } else { + try { + return loadFromDisk(streamId, blockId); + } catch (IOException e) { + throw new DMLRuntimeException(e); + } + } + + } + + /** + * Evict ByteBuffers to disk + */ + private static void evict(long requiredSize) throws IOException { + while(_size + requiredSize > _limit && !_evictDeque.isEmpty()) { + System.out.println("_size + requiredSize: " + _size +" + "+ requiredSize + "; _limit: " + _limit); + String oldKey = _evictDeque.removeLast(); + IndexedMatrixValue toEvict = _cache.remove(oldKey); + + if (toEvict == null) { continue;} + MatrixBlock mbToEvict = (MatrixBlock) toEvict.getValue(); + + // Spill to disk + String filename = _spillDir + "/" + oldKey; + File spillDirFile = new File(_spillDir); + if (!spillDirFile.exists()) { + spillDirFile.mkdirs(); + } + + LocalFileUtils.writeMatrixBlockToLocal(filename, mbToEvict); + + long freedSize = estimateSerializedSize(mbToEvict); + _size -= freedSize; + + } + } + + /** + * Load block from spill file + */ + private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) throws IOException { + String filename = _spillDir + "/" + streamId + "_" + blockId; + + // check if file exists + if (!LocalFileUtils.isExisting(filename)) { + throw new IOException("File " + filename + " does not exist"); + } + + // Read from disk + MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename); + + MatrixIndexes ix = new MatrixIndexes(blockId + 1, 1); + + // Put back in cache (may trigger eviction) + // get() operation should not modify cache + // put(streamId, blockId, new IndexedMatrixValue(ix, mb)); + + return new IndexedMatrixValue(ix, mb); + } + + private static long estimateSerializedSize(MatrixBlock mb) { + return mb.getExactSerializedSize(); + } +} diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java index 038e1a8b983..6179811f7a7 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java @@ -20,13 +20,15 @@ package org.apache.sysds.runtime.instructions.ooc; import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; +import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; -import java.util.ArrayList; /** * A wrapper around LocalTaskQueue to consume the source stream and reset to * consume again for other operators. + *

+ * Uses OOCEvictionManager for out-of-core caching. * */ public class ResettableStream extends LocalTaskQueue { @@ -34,16 +36,24 @@ public class ResettableStream extends LocalTaskQueue { // original live stream private final LocalTaskQueue _source; - // in-memory cache to store stream for re-play - private final ArrayList _cache; + private static final IDSequence _streamSeq = new IDSequence(); + // stream identifier + private final long _streamId; + + // block counter + private int _numBlocks = 0; + // state flags private boolean _cacheInProgress = true; // caching in progress, in the first pass. private int _replayPosition = 0; // slider position in the stream public ResettableStream(LocalTaskQueue source) { + this(source, _streamSeq.getNextID()); + } + public ResettableStream(LocalTaskQueue source, long streamId) { _source = source; - _cache = new ArrayList<>(); + _streamId = streamId; } /** @@ -51,7 +61,6 @@ public ResettableStream(LocalTaskQueue source) { * For subsequent passes it reads from the memory. * * @return The next matrix value in the stream, or NO_MORE_TASKS - * @throws InterruptedException */ @Override public synchronized IndexedMatrixValue dequeueTask() @@ -60,18 +69,23 @@ public synchronized IndexedMatrixValue dequeueTask() // First pass: Read value from the source and cache it, and return. IndexedMatrixValue task = _source.dequeueTask(); if (task != NO_MORE_TASKS) { - _cache.add(new IndexedMatrixValue(task)); + + OOCEvictionManager.put(_streamId, _numBlocks, task); + _numBlocks++; + + return task; } else { _cacheInProgress = false; // caching is complete _source.closeInput(); // close source stream + + // Notify all the waiting consumers waiting for cache to fill with this stream + notifyAll(); + return (IndexedMatrixValue) NO_MORE_TASKS; } - notifyAll(); // Notify all the waiting consumers waiting for cache to fill with this stream - return task; } else { - // Replay pass: read directly from in-memory cache - if (_replayPosition < _cache.size()) { - // Return a copy to ensure consumer won't modify the cache - return new IndexedMatrixValue(_cache.get(_replayPosition++)); + // Replay pass: read from the buffer + if (_replayPosition < _numBlocks) { + return OOCEvictionManager.get(_streamId, _replayPosition++); } else { return (IndexedMatrixValue) NO_MORE_TASKS; } From 3cfdb3e5e5b502d237b5d32b56ea24eebcbce145 Mon Sep 17 00:00:00 2001 From: Matthias Boehm Date: Fri, 31 Oct 2025 17:04:40 +0100 Subject: [PATCH 2/2] [SYSTEMDS-3930] Cleanup out-of-core backend and buffer pool * Fix order-dependent restore of indexed matrix blocks (keep indexes) * Fix potential race condition in unguarded cache modifications * Fix warnings and unnecessary imports --- .../ooc/AggregateUnaryOOCInstruction.java | 2 - .../ooc/BinaryOOCInstruction.java | 3 - .../ooc/MatrixVectorBinaryOOCInstruction.java | 2 - .../instructions/ooc/OOCEvictionManager.java | 161 +++++++++--------- .../ooc/ReblockOOCInstruction.java | 3 - .../ooc/TransposeOOCInstruction.java | 3 - .../instructions/ooc/UnaryOOCInstruction.java | 3 - .../spark/data/IndexedMatrixValue.java | 4 + .../runtime/matrix/data/MatrixIndexes.java | 5 + 9 files changed, 87 insertions(+), 99 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java index 8c8a64b0225..c87b3c99cf2 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java @@ -36,10 +36,8 @@ import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysds.runtime.matrix.operators.Operator; import org.apache.sysds.runtime.meta.DataCharacteristics; -import org.apache.sysds.runtime.util.CommonThreadPool; import java.util.HashMap; -import java.util.concurrent.ExecutorService; public class AggregateUnaryOOCInstruction extends ComputationOOCInstruction { private AggregateOperator _aop = null; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java index 82ad12ae554..1dfc99be811 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java @@ -19,8 +19,6 @@ package org.apache.sysds.runtime.instructions.ooc; -import java.util.concurrent.ExecutorService; - import org.apache.sysds.common.Types.DataType; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; @@ -33,7 +31,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.operators.Operator; import org.apache.sysds.runtime.matrix.operators.ScalarOperator; -import org.apache.sysds.runtime.util.CommonThreadPool; public class BinaryOOCInstruction extends ComputationOOCInstruction { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java index c1d1ed6ace7..aa215e83e90 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java @@ -20,7 +20,6 @@ package org.apache.sysds.runtime.instructions.ooc; import java.util.HashMap; -import java.util.concurrent.ExecutorService; import org.apache.sysds.common.Opcodes; import org.apache.sysds.conf.ConfigurationManager; @@ -39,7 +38,6 @@ import org.apache.sysds.runtime.matrix.operators.AggregateOperator; import org.apache.sysds.runtime.matrix.operators.BinaryOperator; import org.apache.sysds.runtime.matrix.operators.Operator; -import org.apache.sysds.runtime.util.CommonThreadPool; public class MatrixVectorBinaryOOCInstruction extends ComputationOOCInstruction { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java index 8a94209a9ec..747167d5102 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -22,14 +22,12 @@ import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; -import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.util.LocalFileUtils; import java.io.File; import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; /** @@ -71,19 +69,15 @@ public class OOCEvictionManager { // Configuration: OOC buffer limit as percentage of heap - private static final double OOC_BUFFER_PERCENTAGE = 0.15; // 15% of heap + private static final double OOC_BUFFER_PERCENTAGE = 0.00015; // 15% of heap // Memory limit for ByteBuffers private static long _limit; private static long _size; // Cache structures: map key -> MatrixBlock and eviction deque (head=oldest block) - private static final Map _cache = new HashMap<>(); - private static final Deque _evictDeque = new ArrayDeque<>(); - - // Single lock for synchronization - private static final Object lock = new Object(); - + private static LinkedHashMap _cache = new LinkedHashMap<>(); + // Spill directory for evicted blocks private static String _spillDir; @@ -92,10 +86,8 @@ public enum RPolicy { } private static RPolicy _policy = RPolicy.FIFO; - private OOCEvictionManager() {} - static { - _limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE * 0.01); // e.g., 20% of heap + _limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE); // e.g., 20% of heap _size = 0; _spillDir = LocalFileUtils.getUniqueWorkingDir("ooc_stream"); LocalFileUtils.createLocalFileIfNotExist(_spillDir); @@ -109,103 +101,106 @@ public static synchronized void put(long streamId, int blockId, IndexedMatrixVal long size = estimateSerializedSize(mb); String key = streamId + "_" + blockId; - synchronized (lock) { - IndexedMatrixValue old = _cache.remove(key); // remove old value - if (old != null) { - _evictDeque.remove(key); - _size -= estimateSerializedSize((MatrixBlock) old.getValue()); - } - - try { - evict(size); - } catch (IOException e) { - throw new DMLRuntimeException(e); - } - - _cache.put(key, value); // put new value - _evictDeque.addLast(key); // add to end for FIFO/LRU - _size += size; + IndexedMatrixValue old = _cache.remove(key); // remove old value + if (old != null) { + _size -= estimateSerializedSize((MatrixBlock) old.getValue()); } + + //make room if needed + evict(size); + + _cache.put(key, value); // put new value last + _size += size; } /** * Get a block from the OOC cache (deserialize on read) */ public static synchronized IndexedMatrixValue get(long streamId, int blockId) { - String key = streamId + "_" + blockId; IndexedMatrixValue imv = _cache.get(key); - synchronized (lock) { - if (imv != null && _policy == RPolicy.LRU) { - _evictDeque.remove(key); - _evictDeque.addLast(key); - } + if (imv != null && _policy == RPolicy.LRU) { + _cache.remove(key); + _cache.put(key, imv); //add last semantic } - - if (imv != null) { - return imv; - } else { - try { - return loadFromDisk(streamId, blockId); - } catch (IOException e) { - throw new DMLRuntimeException(e); - } - } - + + //restore if needed + return (imv.getValue() != null) ? imv : + loadFromDisk(streamId, blockId); } /** * Evict ByteBuffers to disk */ - private static void evict(long requiredSize) throws IOException { - while(_size + requiredSize > _limit && !_evictDeque.isEmpty()) { - System.out.println("_size + requiredSize: " + _size +" + "+ requiredSize + "; _limit: " + _limit); - String oldKey = _evictDeque.removeLast(); - IndexedMatrixValue toEvict = _cache.remove(oldKey); - - if (toEvict == null) { continue;} - MatrixBlock mbToEvict = (MatrixBlock) toEvict.getValue(); - - // Spill to disk - String filename = _spillDir + "/" + oldKey; - File spillDirFile = new File(_spillDir); - if (!spillDirFile.exists()) { - spillDirFile.mkdirs(); + private static void evict(long requiredSize) { + try { + int pos = 0; + while(_size + requiredSize > _limit && pos++ < _cache.size()) { + //System.out.println("BUFFER: "+_size+"/"+_limit+" size="+_cache.size()); + Map.Entry tmp = removeFirstFromCache(); + if( tmp == null || tmp.getValue().getValue() == null ) { + if( tmp != null ) + _cache.put(tmp.getKey(), tmp.getValue()); + continue; + } + + // Spill to disk + String filename = _spillDir + "/" + tmp.getKey(); + File spillDirFile = new File(_spillDir); + if (!spillDirFile.exists()) { + spillDirFile.mkdirs(); + } + LocalFileUtils.writeMatrixBlockToLocal(filename, (MatrixBlock)tmp.getValue().getValue()); + + // Evict from memory + long freedSize = estimateSerializedSize((MatrixBlock)tmp.getValue().getValue()); + tmp.getValue().setValue(null); + _cache.put(tmp.getKey(), tmp.getValue()); // add last semantic + _size -= freedSize; } - - LocalFileUtils.writeMatrixBlockToLocal(filename, mbToEvict); - - long freedSize = estimateSerializedSize(mbToEvict); - _size -= freedSize; - + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); } } /** * Load block from spill file */ - private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) throws IOException { - String filename = _spillDir + "/" + streamId + "_" + blockId; + private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) { + String key = streamId + "_" + blockId; + String filename = _spillDir + "/" + key; - // check if file exists - if (!LocalFileUtils.isExisting(filename)) { - throw new IOException("File " + filename + " does not exist"); + try { + // check if file exists + if (!LocalFileUtils.isExisting(filename)) { + throw new IOException("File " + filename + " does not exist"); + } + + // Read from disk and put into original indexed matrix value + MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename); + IndexedMatrixValue imv = _cache.get(key); + imv.setValue(mb); + return imv; + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); } - - // Read from disk - MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename); - - MatrixIndexes ix = new MatrixIndexes(blockId + 1, 1); - - // Put back in cache (may trigger eviction) - // get() operation should not modify cache - // put(streamId, blockId, new IndexedMatrixValue(ix, mb)); - - return new IndexedMatrixValue(ix, mb); } private static long estimateSerializedSize(MatrixBlock mb) { return mb.getExactSerializedSize(); } + + private static Map.Entry removeFirstFromCache() { + //move iterator to first entry + Iterator> iter = _cache.entrySet().iterator(); + Map.Entry entry = iter.next(); + + //remove current iterator entry + iter.remove(); + + return entry; + } } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java index 06386c5d66c..3c78879b45d 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java @@ -19,8 +19,6 @@ package org.apache.sysds.runtime.instructions.ooc; -import java.util.concurrent.ExecutorService; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; @@ -40,7 +38,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.matrix.operators.Operator; import org.apache.sysds.runtime.meta.DataCharacteristics; -import org.apache.sysds.runtime.util.CommonThreadPool; public class ReblockOOCInstruction extends ComputationOOCInstruction { private int blen; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java index fce5408960e..05e31830a56 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java @@ -30,9 +30,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.matrix.operators.ReorgOperator; -import org.apache.sysds.runtime.util.CommonThreadPool; - -import java.util.concurrent.ExecutorService; public class TransposeOOCInstruction extends ComputationOOCInstruction { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java index 63f42f5bf15..173486844a6 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java @@ -28,9 +28,6 @@ import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.operators.UnaryOperator; -import org.apache.sysds.runtime.util.CommonThreadPool; - -import java.util.concurrent.ExecutorService; public class UnaryOOCInstruction extends ComputationOOCInstruction { private UnaryOperator _uop = null; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java index 8f82a99abff..7b20fe2f9e5 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java @@ -66,6 +66,10 @@ public MatrixIndexes getIndexes() { public MatrixValue getValue() { return _value; } + + public void setValue(MatrixValue value) { + _value = value; + } public void set(MatrixIndexes indexes2, MatrixValue block2) { _indexes.setIndexes(indexes2); diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java index 5a3d0a64306..7f0f3b7a658 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java @@ -108,6 +108,11 @@ public int hashCode() { public String toString() { return "("+_row+", "+_col+")"; } + + public MatrixIndexes fromString(String ix) { + String[] parts = ix.substring(1, ix.length()-1).split(","); + return new MatrixIndexes(Long.parseLong(parts[0]), Long.parseLong(parts[1].trim())); + } //////////////////////////////////////////////////// // implementation of Writable read/write