diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java index 7c8066c0324..7f441202ea6 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java @@ -35,6 +35,8 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.NoCompressionCodec; +import org.apache.arrow.vector.ipc.message.ArrowBodyCompression; import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.ipc.message.MessageMetadataResult; @@ -120,6 +122,7 @@ public static HeaderType getHeader(byte b) { private final MessageMetadataResult message; private final ArrowBuf appMetadata; private final List bufs; + private final ArrowBodyCompression bodyCompression; public ArrowMessage(FlightDescriptor descriptor, Schema schema) { ByteBuffer serializedMessage = MessageSerializer.serializeMetadata(schema); @@ -128,6 +131,7 @@ public ArrowMessage(FlightDescriptor descriptor, Schema schema) { bufs = ImmutableList.of(); this.descriptor = descriptor; this.appMetadata = null; + this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION; } /** @@ -141,6 +145,7 @@ public ArrowMessage(ArrowRecordBatch batch, ArrowBuf appMetadata) { this.bufs = ImmutableList.copyOf(batch.getBuffers()); this.descriptor = null; this.appMetadata = appMetadata; + this.bodyCompression = batch.getBodyCompression(); } public ArrowMessage(ArrowDictionaryBatch batch) { @@ -152,6 +157,7 @@ public ArrowMessage(ArrowDictionaryBatch batch) { this.bufs = ImmutableList.copyOf(batch.getDictionary().getBuffers()); this.descriptor = null; this.appMetadata = null; + this.bodyCompression = batch.getDictionary().getBodyCompression(); } /** @@ -163,6 +169,7 @@ public ArrowMessage(ArrowBuf appMetadata) { this.bufs = ImmutableList.of(); this.descriptor = null; this.appMetadata = appMetadata; + this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION; } public ArrowMessage(FlightDescriptor descriptor) { @@ -170,6 +177,7 @@ public ArrowMessage(FlightDescriptor descriptor) { this.bufs = ImmutableList.of(); this.descriptor = descriptor; this.appMetadata = null; + this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION; } private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, ArrowBuf appMetadata, @@ -178,6 +186,7 @@ private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, this.descriptor = descriptor; this.appMetadata = appMetadata; this.bufs = buf == null ? ImmutableList.of() : ImmutableList.of(buf); + this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION; } public MessageMetadataResult asSchemaMessage() { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index 2720219767a..3a4b00de5ef 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -25,6 +25,8 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.util.Collections2; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Field; @@ -54,8 +56,9 @@ public VectorLoader(VectorSchemaRoot root) { public void load(ArrowRecordBatch recordBatch) { Iterator buffers = recordBatch.getBuffers().iterator(); Iterator nodes = recordBatch.getNodes().iterator(); + CompressionCodec codec = CompressionUtil.createCodec(recordBatch.getBodyCompression().getCodec()); for (FieldVector fieldVector : root.getFieldVectors()) { - loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes); + loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec); } root.setRowCount(recordBatch.getLength()); if (nodes.hasNext() || buffers.hasNext()) { @@ -68,13 +71,15 @@ private void loadBuffers( FieldVector vector, Field field, Iterator buffers, - Iterator nodes) { + Iterator nodes, + CompressionCodec codec) { checkArgument(nodes.hasNext(), "no more field nodes for for field %s and vector %s", field, vector); ArrowFieldNode fieldNode = nodes.next(); int bufferLayoutCount = TypeLayout.getTypeBufferCount(field.getType()); List ownBuffers = new ArrayList<>(bufferLayoutCount); for (int j = 0; j < bufferLayoutCount; j++) { - ownBuffers.add(buffers.next()); + ArrowBuf nextBuf = buffers.next(); + ownBuffers.add(codec.decompress(vector.getAllocator(), nextBuf)); } try { vector.loadFieldBuffers(fieldNode, ownBuffers); @@ -91,7 +96,7 @@ private void loadBuffers( for (int i = 0; i < childrenFromFields.size(); i++) { Field child = children.get(i); FieldVector fieldVector = childrenFromFields.get(i); - loadBuffers(fieldVector, child, buffers, nodes); + loadBuffers(fieldVector, child, buffers, nodes, codec); } } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java index 62d063b0fd8..e2cbf3ec1d8 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java @@ -21,6 +21,9 @@ import java.util.List; import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; @@ -32,13 +35,14 @@ public class VectorUnloader { private final VectorSchemaRoot root; private final boolean includeNullCount; + private final CompressionCodec codec; private final boolean alignBuffers; /** * Constructs a new instance of the given set of vectors. */ public VectorUnloader(VectorSchemaRoot root) { - this(root, true, true); + this(root, true, NoCompressionCodec.INSTANCE, true); } /** @@ -48,9 +52,24 @@ public VectorUnloader(VectorSchemaRoot root) { * @param includeNullCount Controls whether null count is copied to the {@link ArrowRecordBatch} * @param alignBuffers Controls if buffers get aligned to 8-byte boundaries. */ - public VectorUnloader(VectorSchemaRoot root, boolean includeNullCount, boolean alignBuffers) { + public VectorUnloader( + VectorSchemaRoot root, boolean includeNullCount, boolean alignBuffers) { + this(root, includeNullCount, NoCompressionCodec.INSTANCE, alignBuffers); + } + + /** + * Constructs a new instance. + * + * @param root The set of vectors to serialize to an {@link ArrowRecordBatch}. + * @param includeNullCount Controls whether null count is copied to the {@link ArrowRecordBatch} + * @param codec the codec for compressing data. If it is null, then no compression is needed. + * @param alignBuffers Controls if buffers get aligned to 8-byte boundaries. + */ + public VectorUnloader( + VectorSchemaRoot root, boolean includeNullCount, CompressionCodec codec, boolean alignBuffers) { this.root = root; this.includeNullCount = includeNullCount; + this.codec = codec; this.alignBuffers = alignBuffers; } @@ -64,7 +83,8 @@ public ArrowRecordBatch getRecordBatch() { for (FieldVector vector : root.getFieldVectors()) { appendNodes(vector, nodes, buffers); } - return new ArrowRecordBatch(root.getRowCount(), nodes, buffers, alignBuffers); + return new ArrowRecordBatch( + root.getRowCount(), nodes, buffers, CompressionUtil.createBodyCompression(codec), alignBuffers); } private void appendNodes(FieldVector vector, List nodes, List buffers) { @@ -76,7 +96,9 @@ private void appendNodes(FieldVector vector, List nodes, List buffers; + private final ArrowBodyCompression bodyCompression; + private final List buffersLayout; private boolean closed = false; - public ArrowRecordBatch(int length, List nodes, List buffers) { - this(length, nodes, buffers, true); + public ArrowRecordBatch( + int length, List nodes, List buffers) { + this(length, nodes, buffers, NoCompressionCodec.DEFAULT_BODY_COMPRESSION, true); + } + + public ArrowRecordBatch( + int length, List nodes, List buffers, + ArrowBodyCompression bodyCompression) { + this(length, nodes, buffers, bodyCompression, true); } /** @@ -65,12 +76,17 @@ public ArrowRecordBatch(int length, List nodes, List b * @param length how many rows in this batch * @param nodes field level info * @param buffers will be retained until this recordBatch is closed + * @param bodyCompression compression info. */ - public ArrowRecordBatch(int length, List nodes, List buffers, boolean alignBuffers) { + public ArrowRecordBatch( + int length, List nodes, List buffers, + ArrowBodyCompression bodyCompression, boolean alignBuffers) { super(); this.length = length; this.nodes = nodes; this.buffers = buffers; + Preconditions.checkArgument(bodyCompression != null, "body compression cannot be null"); + this.bodyCompression = bodyCompression; List arrowBuffers = new ArrayList<>(buffers.size()); long offset = 0; for (ArrowBuf arrowBuf : buffers) { @@ -92,10 +108,14 @@ public ArrowRecordBatch(int length, List nodes, List b // this constructor is different from the public ones in that the reference manager's // retain method is not called, so the first dummy parameter is used // to distinguish this from the public constructor. - private ArrowRecordBatch(boolean dummy, int length, List nodes, List buffers) { + private ArrowRecordBatch( + boolean dummy, int length, List nodes, + List buffers, ArrowBodyCompression bodyCompression) { this.length = length; this.nodes = nodes; this.buffers = buffers; + Preconditions.checkArgument(bodyCompression != null, "body compression cannot be null"); + this.bodyCompression = bodyCompression; this.closed = false; List arrowBuffers = new ArrayList<>(); long offset = 0; @@ -115,6 +135,10 @@ public int getLength() { return length; } + public ArrowBodyCompression getBodyCompression() { + return bodyCompression; + } + /** * Get the nodes in this record batch. * @@ -152,7 +176,7 @@ public ArrowRecordBatch cloneWithTransfer(final BufferAllocator allocator) { .writerIndex(buf.writerIndex())) .collect(Collectors.toList()); close(); - return new ArrowRecordBatch(false, length, nodes, newBufs); + return new ArrowRecordBatch(false, length, nodes, newBufs, bodyCompression); } /** @@ -170,10 +194,17 @@ public int writeTo(FlatBufferBuilder builder) { int nodesOffset = FBSerializables.writeAllStructsToVector(builder, nodes); RecordBatch.startBuffersVector(builder, buffers.size()); int buffersOffset = FBSerializables.writeAllStructsToVector(builder, buffersLayout); + int compressOffset = 0; + if (bodyCompression != null && bodyCompression != NoCompressionCodec.DEFAULT_BODY_COMPRESSION) { + compressOffset = bodyCompression.writeTo(builder); + } RecordBatch.startRecordBatch(builder); RecordBatch.addLength(builder, length); RecordBatch.addNodes(builder, nodesOffset); RecordBatch.addBuffers(builder, buffersOffset); + if (bodyCompression != null && bodyCompression != NoCompressionCodec.DEFAULT_BODY_COMPRESSION) { + RecordBatch.addCompression(builder, compressOffset); + } return RecordBatch.endRecordBatch(builder); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index 59000317e62..64712a2741b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -34,6 +34,7 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.ReadChannel; import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.types.pojo.Schema; @@ -408,11 +409,16 @@ public static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB, ArrowBuf vectorBuffer = body.slice(bufferFB.offset(), bufferFB.length()); buffers.add(vectorBuffer); } + + ArrowBodyCompression bodyCompression = recordBatchFB.compression() == null ? + NoCompressionCodec.DEFAULT_BODY_COMPRESSION + : new ArrowBodyCompression(recordBatchFB.compression().codec(), recordBatchFB.compression().method()); + if ((int) recordBatchFB.length() != recordBatchFB.length()) { throw new IOException("Cannot currently deserialize record batches with more than INT_MAX records."); } ArrowRecordBatch arrowRecordBatch = - new ArrowRecordBatch(checkedCastToInt(recordBatchFB.length()), nodes, buffers); + new ArrowRecordBatch(checkedCastToInt(recordBatchFB.length()), nodes, buffers, bodyCompression); body.getReferenceManager().release(); return arrowRecordBatch; }