Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.compression.NoCompressionCodec;
import org.apache.arrow.vector.ipc.message.ArrowBodyCompression;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.ipc.message.MessageMetadataResult;
Expand Down Expand Up @@ -120,6 +122,7 @@ public static HeaderType getHeader(byte b) {
private final MessageMetadataResult message;
private final ArrowBuf appMetadata;
private final List<ArrowBuf> bufs;
private final ArrowBodyCompression bodyCompression;

public ArrowMessage(FlightDescriptor descriptor, Schema schema) {
ByteBuffer serializedMessage = MessageSerializer.serializeMetadata(schema);
Expand All @@ -128,6 +131,7 @@ public ArrowMessage(FlightDescriptor descriptor, Schema schema) {
bufs = ImmutableList.of();
this.descriptor = descriptor;
this.appMetadata = null;
this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION;
}

/**
Expand All @@ -141,6 +145,7 @@ public ArrowMessage(ArrowRecordBatch batch, ArrowBuf appMetadata) {
this.bufs = ImmutableList.copyOf(batch.getBuffers());
this.descriptor = null;
this.appMetadata = appMetadata;
this.bodyCompression = batch.getBodyCompression();
}

public ArrowMessage(ArrowDictionaryBatch batch) {
Expand All @@ -152,6 +157,7 @@ public ArrowMessage(ArrowDictionaryBatch batch) {
this.bufs = ImmutableList.copyOf(batch.getDictionary().getBuffers());
this.descriptor = null;
this.appMetadata = null;
this.bodyCompression = batch.getDictionary().getBodyCompression();
}

/**
Expand All @@ -163,13 +169,15 @@ public ArrowMessage(ArrowBuf appMetadata) {
this.bufs = ImmutableList.of();
this.descriptor = null;
this.appMetadata = appMetadata;
this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION;
}

public ArrowMessage(FlightDescriptor descriptor) {
this.message = null;
this.bufs = ImmutableList.of();
this.descriptor = descriptor;
this.appMetadata = null;
this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION;
}

private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, ArrowBuf appMetadata,
Expand All @@ -178,6 +186,7 @@ private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message,
this.descriptor = descriptor;
this.appMetadata = appMetadata;
this.bufs = buf == null ? ImmutableList.of() : ImmutableList.of(buf);
this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION;
}

public MessageMetadataResult asSchemaMessage() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.util.Collections2;
import org.apache.arrow.vector.compression.CompressionCodec;
import org.apache.arrow.vector.compression.CompressionUtil;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Field;
Expand Down Expand Up @@ -54,8 +56,9 @@ public VectorLoader(VectorSchemaRoot root) {
public void load(ArrowRecordBatch recordBatch) {
Iterator<ArrowBuf> buffers = recordBatch.getBuffers().iterator();
Iterator<ArrowFieldNode> nodes = recordBatch.getNodes().iterator();
CompressionCodec codec = CompressionUtil.createCodec(recordBatch.getBodyCompression().getCodec());
for (FieldVector fieldVector : root.getFieldVectors()) {
loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes);
loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec);
}
root.setRowCount(recordBatch.getLength());
if (nodes.hasNext() || buffers.hasNext()) {
Expand All @@ -68,13 +71,15 @@ private void loadBuffers(
FieldVector vector,
Field field,
Iterator<ArrowBuf> buffers,
Iterator<ArrowFieldNode> nodes) {
Iterator<ArrowFieldNode> nodes,
CompressionCodec codec) {
checkArgument(nodes.hasNext(), "no more field nodes for for field %s and vector %s", field, vector);
ArrowFieldNode fieldNode = nodes.next();
int bufferLayoutCount = TypeLayout.getTypeBufferCount(field.getType());
List<ArrowBuf> ownBuffers = new ArrayList<>(bufferLayoutCount);
for (int j = 0; j < bufferLayoutCount; j++) {
ownBuffers.add(buffers.next());
ArrowBuf nextBuf = buffers.next();
ownBuffers.add(codec.decompress(vector.getAllocator(), nextBuf));
}
try {
vector.loadFieldBuffers(fieldNode, ownBuffers);
Expand All @@ -91,7 +96,7 @@ private void loadBuffers(
for (int i = 0; i < childrenFromFields.size(); i++) {
Field child = children.get(i);
FieldVector fieldVector = childrenFromFields.get(i);
loadBuffers(fieldVector, child, buffers, nodes);
loadBuffers(fieldVector, child, buffers, nodes, codec);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.List;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.vector.compression.CompressionCodec;
import org.apache.arrow.vector.compression.CompressionUtil;
import org.apache.arrow.vector.compression.NoCompressionCodec;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;

Expand All @@ -32,13 +35,14 @@ public class VectorUnloader {

private final VectorSchemaRoot root;
private final boolean includeNullCount;
private final CompressionCodec codec;
private final boolean alignBuffers;

/**
* Constructs a new instance of the given set of vectors.
*/
public VectorUnloader(VectorSchemaRoot root) {
this(root, true, true);
this(root, true, NoCompressionCodec.INSTANCE, true);
}

/**
Expand All @@ -48,9 +52,24 @@ public VectorUnloader(VectorSchemaRoot root) {
* @param includeNullCount Controls whether null count is copied to the {@link ArrowRecordBatch}
* @param alignBuffers Controls if buffers get aligned to 8-byte boundaries.
*/
public VectorUnloader(VectorSchemaRoot root, boolean includeNullCount, boolean alignBuffers) {
public VectorUnloader(
VectorSchemaRoot root, boolean includeNullCount, boolean alignBuffers) {
this(root, includeNullCount, NoCompressionCodec.INSTANCE, alignBuffers);
}

/**
* Constructs a new instance.
*
* @param root The set of vectors to serialize to an {@link ArrowRecordBatch}.
* @param includeNullCount Controls whether null count is copied to the {@link ArrowRecordBatch}
* @param codec the codec for compressing data. If it is null, then no compression is needed.
* @param alignBuffers Controls if buffers get aligned to 8-byte boundaries.
*/
public VectorUnloader(
VectorSchemaRoot root, boolean includeNullCount, CompressionCodec codec, boolean alignBuffers) {
this.root = root;
this.includeNullCount = includeNullCount;
this.codec = codec;
this.alignBuffers = alignBuffers;
}

Expand All @@ -64,7 +83,8 @@ public ArrowRecordBatch getRecordBatch() {
for (FieldVector vector : root.getFieldVectors()) {
appendNodes(vector, nodes, buffers);
}
return new ArrowRecordBatch(root.getRowCount(), nodes, buffers, alignBuffers);
return new ArrowRecordBatch(
root.getRowCount(), nodes, buffers, CompressionUtil.createBodyCompression(codec), alignBuffers);
}

private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
Expand All @@ -76,7 +96,9 @@ private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<Ar
"wrong number of buffers for field %s in vector %s. found: %s",
vector.getField(), vector.getClass().getSimpleName(), fieldBuffers));
}
buffers.addAll(fieldBuffers);
for (ArrowBuf buf : fieldBuffers) {
buffers.add(codec.compress(vector.getAllocator(), buf));
}
for (FieldVector child : vector.getChildrenFromFields()) {
appendNodes(child, nodes, buffers);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.vector.compression;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;

/**
* The codec for compression/decompression.
*/
public interface CompressionCodec {

/**
* Compress a buffer.
* @param allocator the allocator for allocating memory for compressed buffer.
* @param unCompressedBuffer the buffer to compress.
* Implementation of this method should take care of releasing this buffer.
* @return the compressed buffer.
*/
ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In thinking through this interface, something occurred to me. Given the way the current Message serialization code works is it forces a copy from an underlying channel. Given this, it seem like we would now have two copy like operations:

  1. Copy from channel to compressed buffer
  2. Uncompress the buffer when loading VectorSchemaRoot.

I was wondering what you thought about deferring compression until writing out the batch and doing decompression eagerly when reading in the batch to avoid one of these copies. I think one thing that would potentially help us decide on the course to take would be look at the possible libraries we would use for compression/decompression and whether they support off-heap, InputStream or some other interface for accomplishing the compression

@liyafan82 have you done more research in this area?

@rymurr do you have anything thoughts on this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield Thank you for starting this discussion and sharing your good ideas.
Your reasoning makes sense to me.

I guess I was looking at the problem from a different perspective.

IMO, the bottleneck of a compressing codec is the CPU resource, and the main purpose of compressing is to reduce memory/network bandwidth consumption.

Given the above assumptions, we should try to do the compression as early as possible. The earliest possible place should be in the getFieldBuffers method. In this PR, we do it in VectorUnLoader, which is not the best, but close enough to the best. Similarly, we should try to do the decompression as late as possible. In this PR, we do it in VectorLoader, which is close to the optimal.

Admittedly, we have additional copies after introducing the compression framework. However, both additional copies are based on the compressed data, with reduced data size, so the overhead should be small.

The above reasoning is based on the assumption that the compression codec could effectively reduce the data size, which is not always true in practice. So I think we can make the decision based on the specific compression codec, and real benchmark data?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liyafan82 good points. I think the other use-case I didn't consider is Flight which I think will already be in off-heap memory.

In terms of copies, the should be smaller in general, but it would be good to to understand if there are any limitations. It looks like at least airlift supports off-heap decompression, so I think this API is probably OK from that perspective.

@rymurr did you have any other comments otherwise I can take another look and then I think we can merge.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield Do you think we can merge this now?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think so. i'll merge


/**
* Decompress a buffer.
* @param allocator the allocator for allocating memory for decompressed buffer.
* @param compressedBuffer the buffer to be decompressed.
* Implementation of this method should take care of releasing this buffer.
* @return the decompressed buffer.
*/
ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer);

/**
* Gets the name of the codec.
* @return the name of the codec.
*/
String getCodecName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.vector.compression;

import org.apache.arrow.flatbuf.BodyCompressionMethod;
import org.apache.arrow.flatbuf.CompressionType;
import org.apache.arrow.vector.ipc.message.ArrowBodyCompression;

/**
* Utilities for data compression/decompression.
*/
public class CompressionUtil {

private CompressionUtil() {
}

/**
* Creates the {@link ArrowBodyCompression} object, given the {@link CompressionCodec}.
* The implementation of this method should depend on the values of {@link CompressionType#names}.
*/
public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) {
switch (codec.getCodecName()) {
case "default":
return NoCompressionCodec.DEFAULT_BODY_COMPRESSION;
case "LZ4_FRAME":
return new ArrowBodyCompression(CompressionType.LZ4_FRAME, BodyCompressionMethod.BUFFER);
case "ZSTD":
return new ArrowBodyCompression(CompressionType.ZSTD, BodyCompressionMethod.BUFFER);
default:
throw new IllegalArgumentException("Unknown codec: " + codec.getCodecName());
}
}

/**
* Creates the {@link CompressionCodec} given the compression type.
*/
public static CompressionCodec createCodec(byte compressionType) {
switch (compressionType) {
case NoCompressionCodec.COMPRESSION_TYPE:
return NoCompressionCodec.INSTANCE;
default:
throw new IllegalArgumentException("Compression type not supported: " + compressionType);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.vector.compression;

import org.apache.arrow.flatbuf.BodyCompressionMethod;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.ipc.message.ArrowBodyCompression;

/**
* The default compression codec that does no compression.
*/
public class NoCompressionCodec implements CompressionCodec {

public static final NoCompressionCodec INSTANCE = new NoCompressionCodec();

public static final byte COMPRESSION_TYPE = -1;

public static final ArrowBodyCompression DEFAULT_BODY_COMPRESSION =
new ArrowBodyCompression(COMPRESSION_TYPE, BodyCompressionMethod.BUFFER);

private NoCompressionCodec() {
}

@Override
public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) {
return unCompressedBuffer;
}

@Override
public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
return compressedBuffer;
}

@Override
public String getCodecName() {
return "default";
}
}
Loading