diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java index 3cee338f7dc..042eedf6b36 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java @@ -91,6 +91,7 @@ public abstract class CacheableData extends Data public static final String CACHING_EVICTION_FILEEXTENSION = ".dat"; public static final boolean CACHING_ASYNC_FILECLEANUP = true; public static final boolean CACHING_ASYNC_SERIALIZE = false; + public static boolean CONCURRENT_FS_WRITE = true; //NOTE CACHING_ASYNC_SERIALIZE: // The serialization of matrices and frames (ultra-sparse matrices or diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java index 53d281631db..61cefa06817 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java @@ -20,11 +20,16 @@ package org.apache.sysds.runtime.controlprogram.caching; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import org.apache.sysds.api.DMLScript; import org.apache.sysds.hops.OptimizerUtils; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.matrix.data.Pair; +import org.apache.sysds.runtime.util.CommonThreadPool; import org.apache.sysds.runtime.util.LocalFileUtils; public class LazyWriteBuffer @@ -46,7 +51,7 @@ public enum RPolicy { //maintenance service for synchronous or asynchronous delete of evicted files private static CacheMaintenanceService _fClean; - + public static int writeBlock(String fname, CacheBlock cb) throws IOException { @@ -59,6 +64,8 @@ public static int writeBlock(String fname, CacheBlock cb) //handle caching/eviction if it fits in writebuffer if( !requiresWrite ) { + //HashMap evictionList = new HashMap<>(); + List> evictionList = new ArrayList<>(); //create byte buffer handle (no block allocation yet) ByteBuffer bbuff = new ByteBuffer( lSize ); @@ -74,17 +81,14 @@ public static int writeBlock(String fname, CacheBlock cb) ByteBuffer tmp = entry.getValue(); if( tmp != null ) { - //wait for pending serialization - tmp.checkSerialized(); - - //evict matrix - tmp.evictBuffer(ftmp); - tmp.freeMemory(); + //add to the list of entries to be evicted + evictionList.add(new Pair<>(ftmp, tmp)); _size -= tmp.getSize(); numEvicted++; } } - + evictBlocks(evictionList); + //put placeholder into buffer pool (reserve mem) _mQueue.addLast(fname, bbuff); _size += lSize; @@ -107,10 +111,46 @@ public static int writeBlock(String fname, CacheBlock cb) } numEvicted++; } - + return numEvicted; } + // Evict a list of cached objects to the local FS + private static void evictBlocks(List> objects) + { + if (CacheableData.CONCURRENT_FS_WRITE && objects.size() > 1) { + //concurrently serialize and write the entries + int k = OptimizerUtils.getConstrainedNumThreads(-1); + ExecutorService myPool = CommonThreadPool.get(k); + try { + myPool.submit(() -> { + objects.stream().parallel().forEach(LazyWriteBuffer::evictBlock); + }).get(); + } + catch(Exception ex) { + throw new DMLRuntimeException("Concurrent bufferpool eviction of "+objects.size()+" failed.", ex); + } + } + else + objects.forEach(LazyWriteBuffer::evictBlock); + } + + // Evict a single object to local FS + private static void evictBlock(Pair entry) { + String fname = entry.getKey(); + ByteBuffer bb = entry.getValue(); + try { + //wait for pending serialization + bb.checkSerialized(); + //evict matrix + bb.evictBuffer(fname); + bb.freeMemory(); + } + catch(Exception e) { + throw new DMLRuntimeException("Bufferpool eviction to " + fname + " failed."); + } + } + public static void deleteBlock(String fname) { boolean requiresDelete = true;