From d36de4ad8d85f5105abb2a87f8c41b3467ef4393 Mon Sep 17 00:00:00 2001 From: arnabp Date: Sun, 10 Apr 2022 11:29:32 +0200 Subject: [PATCH 1/5] [SYSTEMDS-3346] Multi-threaded buffer pool evictions This patch implements a simple technique to exploit multithreading while evicting multiple buffer pool entries. Here we distribute the files to be evicted among threads. More sophisticated strategies include using our parallel reader/writes, multithreaded write to an output stream, however, they have more implementation overhead and left as future work. This simple strategy is already beneficial for unified memory as UMM tends to evict multiple objects in each eviction call while making space for worst-case output sizes. --- .../controlprogram/caching/CacheableData.java | 1 + .../caching/LazyWriteBuffer.java | 55 ++++++++++++++++--- 2 files changed, 47 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java index 3cee338f7dc..042eedf6b36 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java @@ -91,6 +91,7 @@ public abstract class CacheableData extends Data public static final String CACHING_EVICTION_FILEEXTENSION = ".dat"; public static final boolean CACHING_ASYNC_FILECLEANUP = true; public static final boolean CACHING_ASYNC_SERIALIZE = false; + public static boolean CONCURRENT_FS_WRITE = true; //NOTE CACHING_ASYNC_SERIALIZE: // The serialization of matrices and frames (ultra-sparse matrices or diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java index 53d281631db..5580a4d8c1b 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java @@ -20,11 +20,15 @@ package org.apache.sysds.runtime.controlprogram.caching; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import org.apache.sysds.api.DMLScript; import org.apache.sysds.hops.OptimizerUtils; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.util.CommonThreadPool; import org.apache.sysds.runtime.util.LocalFileUtils; public class LazyWriteBuffer @@ -46,7 +50,7 @@ public enum RPolicy { //maintenance service for synchronous or asynchronous delete of evicted files private static CacheMaintenanceService _fClean; - + public static int writeBlock(String fname, CacheBlock cb) throws IOException { @@ -59,6 +63,7 @@ public static int writeBlock(String fname, CacheBlock cb) //handle caching/eviction if it fits in writebuffer if( !requiresWrite ) { + HashMap evictionList = new HashMap<>(); //create byte buffer handle (no block allocation yet) ByteBuffer bbuff = new ByteBuffer( lSize ); @@ -74,17 +79,13 @@ public static int writeBlock(String fname, CacheBlock cb) ByteBuffer tmp = entry.getValue(); if( tmp != null ) { - //wait for pending serialization - tmp.checkSerialized(); - - //evict matrix - tmp.evictBuffer(ftmp); - tmp.freeMemory(); + evictionList.put(ftmp, tmp); _size -= tmp.getSize(); numEvicted++; } } - + evictBlocks(evictionList); + //put placeholder into buffer pool (reserve mem) _mQueue.addLast(fname, bbuff); _size += lSize; @@ -107,10 +108,46 @@ public static int writeBlock(String fname, CacheBlock cb) } numEvicted++; } - + return numEvicted; } + private static void evictBlocks(HashMap objects) + { + if (CacheableData.CONCURRENT_FS_WRITE && objects.size() > 1) { + int k = OptimizerUtils.getConstrainedNumThreads(-1); + ExecutorService myPool = CommonThreadPool.get(k); + try { + myPool.submit(() -> { + objects.entrySet().stream().parallel().forEach(LazyWriteBuffer::evictBlock); + }).get(); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + } + else { + for(Map.Entry entry : objects.entrySet()) + evictBlock(entry); + //objects.entrySet().forEach(LazyWriteBuffer::evictBlock); + } + } + + private static void evictBlock(Map.Entry entry) { + String fname = entry.getKey(); + ByteBuffer bb = entry.getValue(); + try { + //wait for pending serialization + bb.checkSerialized(); + //evict matrix + bb.evictBuffer(fname); + bb.freeMemory(); + } + catch(Exception e) { + throw new DMLRuntimeException("Eviction of " + fname + " failed"); + } + } + public static void deleteBlock(String fname) { boolean requiresDelete = true; From 3d13cf49f2860ce8bd71575c52e88750420178df Mon Sep 17 00:00:00 2001 From: arnabp Date: Sun, 10 Apr 2022 14:00:54 +0200 Subject: [PATCH 2/5] Disable soft ref, lineage reconstruction and reduce cache --- src/main/java/org/apache/sysds/conf/DMLConfig.java | 2 +- src/main/java/org/apache/sysds/hops/OptimizerUtils.java | 2 +- .../sysds/runtime/controlprogram/caching/CacheableData.java | 2 +- .../sysds/runtime/controlprogram/caching/LazyWriteBuffer.java | 1 + .../sysds/runtime/controlprogram/context/ExecutionContext.java | 2 +- 5 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java index 03a0afb2334..f864707dc26 100644 --- a/src/main/java/org/apache/sysds/conf/DMLConfig.java +++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java @@ -165,7 +165,7 @@ public class DMLConfig _defaultVals.put(NATIVE_BLAS_DIR, "none" ); _defaultVals.put(LINEAGECACHESPILL, "true" ); _defaultVals.put(COMPILERASSISTED_RW, "true" ); - _defaultVals.put(BUFFERPOOL_LIMIT, "15"); // % of total heap + _defaultVals.put(BUFFERPOOL_LIMIT, "10"); // % of total heap _defaultVals.put(MEMORY_MANAGER, "static"); // static/unified partitioning of heap _defaultVals.put(PRINT_GPU_MEMORY_INFO, "false" ); _defaultVals.put(EVICTION_SHADOW_BUFFERSIZE, "0.0" ); diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java index a58450d5f3b..14b44c63e80 100644 --- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java @@ -494,7 +494,7 @@ public static long getBufferPoolLimit() { return BUFFER_POOL_SIZE; DMLConfig conf = ConfigurationManager.getDMLConfig(); double bufferPoolFactor = (double)(conf.getIntValue(DMLConfig.BUFFERPOOL_LIMIT))/100; - bufferPoolFactor = Math.max(bufferPoolFactor, DEFAULT_MEM_UTIL_FACTOR); + //bufferPoolFactor = Math.max(bufferPoolFactor, DEFAULT_MEM_UTIL_FACTOR); long maxMem = InfrastructureAnalyzer.getLocalMaxMemory(); return (long)(bufferPoolFactor * maxMem); } diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java index 042eedf6b36..d11ffdf083f 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java @@ -732,7 +732,7 @@ && isCached(true) //not empty and not read/modify CacheStatistics.incrementLinWrites(); //create cache - createCache(); + //createCache(); _data = null; } } diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java index 5580a4d8c1b..096671b64eb 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java @@ -84,6 +84,7 @@ public static int writeBlock(String fname, CacheBlock cb) numEvicted++; } } + //System.out.println("#Evicted entries = "+numEvicted); evictBlocks(evictionList); //put placeholder into buffer pool (reserve mem) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java index 925b34b46d7..97ccf3ebd09 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java @@ -574,7 +574,7 @@ public void setMatrixOutputAndLineage(String varName, MatrixBlock outputData, Li setVariable(varName, createMatrixObject(outputData)); MatrixObject mo = getMatrixObject(varName); mo.acquireModify(outputData); - mo.setCacheLineage(li); + //mo.setCacheLineage(li); mo.release(); } From 350a2b68f1ff27caf125caafe9b6d520f2fbb378 Mon Sep 17 00:00:00 2001 From: arnabp Date: Sun, 10 Apr 2022 14:51:56 +0200 Subject: [PATCH 3/5] Reenable lineage reconstruction and soft ref --- .../sysds/runtime/controlprogram/caching/CacheableData.java | 2 +- .../sysds/runtime/controlprogram/context/ExecutionContext.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java index d11ffdf083f..042eedf6b36 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java @@ -732,7 +732,7 @@ && isCached(true) //not empty and not read/modify CacheStatistics.incrementLinWrites(); //create cache - //createCache(); + createCache(); _data = null; } } diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java index 97ccf3ebd09..925b34b46d7 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java @@ -574,7 +574,7 @@ public void setMatrixOutputAndLineage(String varName, MatrixBlock outputData, Li setVariable(varName, createMatrixObject(outputData)); MatrixObject mo = getMatrixObject(varName); mo.acquireModify(outputData); - //mo.setCacheLineage(li); + mo.setCacheLineage(li); mo.release(); } From f988e497bec490ad0a19b4819e4027d0543fdbdf Mon Sep 17 00:00:00 2001 From: arnabp Date: Sun, 10 Apr 2022 15:23:46 +0200 Subject: [PATCH 4/5] Reduce buffer pool to 5% of heap --- src/main/java/org/apache/sysds/conf/DMLConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java index f864707dc26..f634023a16e 100644 --- a/src/main/java/org/apache/sysds/conf/DMLConfig.java +++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java @@ -165,7 +165,7 @@ public class DMLConfig _defaultVals.put(NATIVE_BLAS_DIR, "none" ); _defaultVals.put(LINEAGECACHESPILL, "true" ); _defaultVals.put(COMPILERASSISTED_RW, "true" ); - _defaultVals.put(BUFFERPOOL_LIMIT, "10"); // % of total heap + _defaultVals.put(BUFFERPOOL_LIMIT, "5"); // % of total heap _defaultVals.put(MEMORY_MANAGER, "static"); // static/unified partitioning of heap _defaultVals.put(PRINT_GPU_MEMORY_INFO, "false" ); _defaultVals.put(EVICTION_SHADOW_BUFFERSIZE, "0.0" ); From 95d6e6700c4d8733fb42467af6740593f05d30c8 Mon Sep 17 00:00:00 2001 From: arnabp Date: Mon, 11 Apr 2022 18:24:26 +0200 Subject: [PATCH 5/5] Get back to 15% buffer pool --- .../java/org/apache/sysds/conf/DMLConfig.java | 2 +- .../org/apache/sysds/hops/OptimizerUtils.java | 2 +- .../caching/LazyWriteBuffer.java | 32 ++++++++++--------- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java index f634023a16e..03a0afb2334 100644 --- a/src/main/java/org/apache/sysds/conf/DMLConfig.java +++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java @@ -165,7 +165,7 @@ public class DMLConfig _defaultVals.put(NATIVE_BLAS_DIR, "none" ); _defaultVals.put(LINEAGECACHESPILL, "true" ); _defaultVals.put(COMPILERASSISTED_RW, "true" ); - _defaultVals.put(BUFFERPOOL_LIMIT, "5"); // % of total heap + _defaultVals.put(BUFFERPOOL_LIMIT, "15"); // % of total heap _defaultVals.put(MEMORY_MANAGER, "static"); // static/unified partitioning of heap _defaultVals.put(PRINT_GPU_MEMORY_INFO, "false" ); _defaultVals.put(EVICTION_SHADOW_BUFFERSIZE, "0.0" ); diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java index 14b44c63e80..a58450d5f3b 100644 --- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java @@ -494,7 +494,7 @@ public static long getBufferPoolLimit() { return BUFFER_POOL_SIZE; DMLConfig conf = ConfigurationManager.getDMLConfig(); double bufferPoolFactor = (double)(conf.getIntValue(DMLConfig.BUFFERPOOL_LIMIT))/100; - //bufferPoolFactor = Math.max(bufferPoolFactor, DEFAULT_MEM_UTIL_FACTOR); + bufferPoolFactor = Math.max(bufferPoolFactor, DEFAULT_MEM_UTIL_FACTOR); long maxMem = InfrastructureAnalyzer.getLocalMaxMemory(); return (long)(bufferPoolFactor * maxMem); } diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java index 096671b64eb..61cefa06817 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java @@ -20,14 +20,15 @@ package org.apache.sysds.runtime.controlprogram.caching; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import org.apache.sysds.api.DMLScript; import org.apache.sysds.hops.OptimizerUtils; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.matrix.data.Pair; import org.apache.sysds.runtime.util.CommonThreadPool; import org.apache.sysds.runtime.util.LocalFileUtils; @@ -63,7 +64,8 @@ public static int writeBlock(String fname, CacheBlock cb) //handle caching/eviction if it fits in writebuffer if( !requiresWrite ) { - HashMap evictionList = new HashMap<>(); + //HashMap evictionList = new HashMap<>(); + List> evictionList = new ArrayList<>(); //create byte buffer handle (no block allocation yet) ByteBuffer bbuff = new ByteBuffer( lSize ); @@ -79,12 +81,12 @@ public static int writeBlock(String fname, CacheBlock cb) ByteBuffer tmp = entry.getValue(); if( tmp != null ) { - evictionList.put(ftmp, tmp); + //add to the list of entries to be evicted + evictionList.add(new Pair<>(ftmp, tmp)); _size -= tmp.getSize(); numEvicted++; } } - //System.out.println("#Evicted entries = "+numEvicted); evictBlocks(evictionList); //put placeholder into buffer pool (reserve mem) @@ -113,28 +115,28 @@ public static int writeBlock(String fname, CacheBlock cb) return numEvicted; } - private static void evictBlocks(HashMap objects) + // Evict a list of cached objects to the local FS + private static void evictBlocks(List> objects) { if (CacheableData.CONCURRENT_FS_WRITE && objects.size() > 1) { + //concurrently serialize and write the entries int k = OptimizerUtils.getConstrainedNumThreads(-1); ExecutorService myPool = CommonThreadPool.get(k); try { myPool.submit(() -> { - objects.entrySet().stream().parallel().forEach(LazyWriteBuffer::evictBlock); + objects.stream().parallel().forEach(LazyWriteBuffer::evictBlock); }).get(); } catch(Exception ex) { - throw new DMLRuntimeException(ex); + throw new DMLRuntimeException("Concurrent bufferpool eviction of "+objects.size()+" failed.", ex); } } - else { - for(Map.Entry entry : objects.entrySet()) - evictBlock(entry); - //objects.entrySet().forEach(LazyWriteBuffer::evictBlock); - } + else + objects.forEach(LazyWriteBuffer::evictBlock); } - private static void evictBlock(Map.Entry entry) { + // Evict a single object to local FS + private static void evictBlock(Pair entry) { String fname = entry.getKey(); ByteBuffer bb = entry.getValue(); try { @@ -145,7 +147,7 @@ private static void evictBlock(Map.Entry entry) { bb.freeMemory(); } catch(Exception e) { - throw new DMLRuntimeException("Eviction of " + fname + " failed"); + throw new DMLRuntimeException("Bufferpool eviction to " + fname + " failed."); } }