Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
import org.apache.sysds.runtime.compress.lib.CLALibMatrixMult;
import org.apache.sysds.runtime.compress.lib.CLALibMerge;
import org.apache.sysds.runtime.compress.lib.CLALibRemoveEmpty;
import org.apache.sysds.runtime.compress.lib.CLALibReplace;
import org.apache.sysds.runtime.compress.lib.CLALibReorg;
import org.apache.sysds.runtime.compress.lib.CLALibReplace;
import org.apache.sysds.runtime.compress.lib.CLALibReshape;
import org.apache.sysds.runtime.compress.lib.CLALibRexpand;
import org.apache.sysds.runtime.compress.lib.CLALibScalar;
Expand Down Expand Up @@ -103,6 +103,7 @@
import org.apache.sysds.runtime.util.IndexRange;
import org.apache.sysds.utils.DMLCompressionStatistics;
import org.apache.sysds.utils.stats.InfrastructureAnalyzer;
import org.apache.sysds.utils.stats.Timing;

public class CompressedMatrixBlock extends MatrixBlock {
private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName());
Expand Down Expand Up @@ -477,16 +478,20 @@ public void readFields(DataInput in) throws IOException {
}

public static CompressedMatrixBlock read(DataInput in) throws IOException {
Timing t = new Timing();
int rlen = in.readInt();
int clen = in.readInt();
long nonZeros = in.readLong();
boolean overlappingColGroups = in.readBoolean();
List<AColGroup> groups = ColGroupIO.readGroups(in, rlen);
return new CompressedMatrixBlock(rlen, clen, nonZeros, overlappingColGroups, groups);
CompressedMatrixBlock ret = new CompressedMatrixBlock(rlen, clen, nonZeros, overlappingColGroups, groups);
LOG.debug("Compressed read serialization time: " + t.stop());
return ret;
}

@Override
public void write(DataOutput out) throws IOException {
Timing t = new Timing();
final long estimateUncompressed = nonZeros > 0 ? MatrixBlock.estimateSizeOnDisk(rlen, clen,
nonZeros) : Long.MAX_VALUE;
final long estDisk = nonZeros > 0 ? getExactSizeOnDisk() : Long.MAX_VALUE;
Expand Down Expand Up @@ -514,6 +519,7 @@ public void write(DataOutput out) throws IOException {
out.writeLong(nonZeros);
out.writeBoolean(overlappingColGroups);
ColGroupIO.writeGroups(out, _colGroups);
LOG.debug("Compressed write serialization time: " + t.stop());
}

/**
Expand Down Expand Up @@ -613,16 +619,7 @@ public MatrixBlock aggregateUnaryOperations(AggregateUnaryOperator op, MatrixVal
public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype, int k) {
// check for transpose type
if(tstype == MMTSJType.LEFT) {
if(isEmpty())
return new MatrixBlock(clen, clen, true);
// create output matrix block
if(out == null)
out = new MatrixBlock(clen, clen, false);
else
out.reset(clen, clen, false);
out.allocateDenseBlock();
CLALibTSMM.leftMultByTransposeSelf(this, out, k);
return out;
return CLALibTSMM.leftMultByTransposeSelf(this, out, k);
}
else {
throw new DMLRuntimeException("Invalid MMTSJ type '" + tstype.toString() + "'.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ public class CompressedMatrixBlockFactory {

private static final Log LOG = LogFactory.getLog(CompressedMatrixBlockFactory.class.getName());

/** Global lock serializing all async compressions to bound concurrent compression memory/CPU. */
private static final Object asyncCompressLock = new Object();

/** Timing object to measure the time of each phase in the compression */
private final Timing time = new Timing(true);
/** Compression statistics gathered throughout the compression */
Expand Down Expand Up @@ -181,21 +184,23 @@ public static Future<Void> compressAsync(ExecutionContext ec, String varName) {
}

public static Future<Void> compressAsync(ExecutionContext ec, String varName, InstructionTypeCounter ins) {
LOG.debug("Compressing Async");
final ExecutorService pool = CommonThreadPool.get(); // We have to guarantee that a thread pool is allocated.
return CompletableFuture.runAsync(() -> {
// method call or code to be async
try {
CacheableData<?> data = ec.getCacheableData(varName);
if(data instanceof MatrixObject) {
MatrixObject mo = (MatrixObject) data;
MatrixBlock mb = mo.acquireReadAndRelease();
MatrixBlock mbc = CompressedMatrixBlockFactory.compress(mo.acquireReadAndRelease(), ins).getLeft();
if(mbc instanceof CompressedMatrixBlock) {
ExecutionContext.createCacheableData(mb);
mo.acquireModify(mbc);
mo.release();
mbc.sum(); // calculate sum to forcefully materialize counts
synchronized(asyncCompressLock) { // global lock: serialize all async compressions (not per-matrix)
if(data instanceof MatrixObject) {
LOG.debug("Compressing Async");
MatrixObject mo = (MatrixObject) data;
MatrixBlock mb = mo.acquireReadAndRelease();
MatrixBlock mbc = CompressedMatrixBlockFactory.compress(mb, ins).getLeft();
if(mbc instanceof CompressedMatrixBlock) {
ExecutionContext.createCacheableData(mb);
mo.acquireModify(mbc);
mo.release();
mbc.sum(); // calculate sum to forcefully materialize counts
}
}
}
}
Expand Down
Loading
Loading