From c57edb0ea3224fa53cf4f7db70f550d708bc3f6f Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Tue, 2 Jun 2020 17:26:37 +0800 Subject: [PATCH 1/8] ARROW-9010: [Java] Framework and interface changes for RecordBatch IPC buffer compression --- .../org/apache/arrow/flight/ArrowMessage.java | 16 +++++ .../org/apache/arrow/vector/VectorLoader.java | 18 +++-- .../apache/arrow/vector/VectorUnloader.java | 32 ++++++++- .../vector/compression/CompressionCodec.java | 68 ++++++++++++++++++ .../compression/CompressionUtility.java | 59 +++++++++++++++ .../ipc/message/ArrowBodyCompression.java | 71 +++++++++++++++++++ .../vector/ipc/message/ArrowRecordBatch.java | 30 ++++++-- .../vector/ipc/message/MessageSerializer.java | 38 ++++++++-- 8 files changed, 316 insertions(+), 16 deletions(-) create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtility.java create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java index 7c8066c0324..86f24706671 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java @@ -35,6 +35,7 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.ipc.message.ArrowBodyCompression; import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.ipc.message.MessageMetadataResult; @@ -120,6 +121,7 @@ public static HeaderType getHeader(byte b) { private final MessageMetadataResult message; private final ArrowBuf appMetadata; private final List bufs; + private final ArrowBodyCompression bodyCompression; public ArrowMessage(FlightDescriptor descriptor, Schema schema) { ByteBuffer serializedMessage = MessageSerializer.serializeMetadata(schema); @@ -128,6 +130,7 @@ public ArrowMessage(FlightDescriptor descriptor, Schema schema) { bufs = ImmutableList.of(); this.descriptor = descriptor; this.appMetadata = null; + this.bodyCompression = null; } /** @@ -141,6 +144,7 @@ public ArrowMessage(ArrowRecordBatch batch, ArrowBuf appMetadata) { this.bufs = ImmutableList.copyOf(batch.getBuffers()); this.descriptor = null; this.appMetadata = appMetadata; + this.bodyCompression = batch.getBodyCompression(); } public ArrowMessage(ArrowDictionaryBatch batch) { @@ -152,6 +156,7 @@ public ArrowMessage(ArrowDictionaryBatch batch) { this.bufs = ImmutableList.copyOf(batch.getDictionary().getBuffers()); this.descriptor = null; this.appMetadata = null; + this.bodyCompression = batch.getDictionary().getBodyCompression(); } /** @@ -163,6 +168,7 @@ public ArrowMessage(ArrowBuf appMetadata) { this.bufs = ImmutableList.of(); this.descriptor = null; this.appMetadata = appMetadata; + this.bodyCompression = null; } public ArrowMessage(FlightDescriptor descriptor) { @@ -170,6 +176,7 @@ public ArrowMessage(FlightDescriptor descriptor) { this.bufs = ImmutableList.of(); this.descriptor = descriptor; this.appMetadata = null; + this.bodyCompression = null; } private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, ArrowBuf appMetadata, @@ -178,6 +185,7 @@ private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, this.descriptor = descriptor; this.appMetadata = appMetadata; this.bufs = buf == null ? ImmutableList.of() : ImmutableList.of(buf); + this.bodyCompression = null; } public MessageMetadataResult asSchemaMessage() { @@ -353,6 +361,14 @@ private InputStream asInputStream(BufferAllocator allocator) { // the reference count b.getReferenceManager().retain(); } + // add compression info + ArrowBuf compBuf = allocator.buffer(ArrowBodyCompression.BODY_COMPRESSION_LENGTH); + compBuf.setInt(0, bodyCompression.getCodec()); + compBuf.setInt(4, bodyCompression.getMethod()); + compBuf.writerIndex(ArrowBodyCompression.BODY_COMPRESSION_LENGTH); + size += ArrowBodyCompression.BODY_COMPRESSION_LENGTH; + allBufs.add(compBuf.asNettyBuffer()); + // rawvarint is used for length definition. cos.writeUInt32NoTag(size); cos.flush(); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index 2720219767a..d29443c9331 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -25,6 +25,8 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.util.Collections2; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtility; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Field; @@ -54,8 +56,10 @@ public VectorLoader(VectorSchemaRoot root) { public void load(ArrowRecordBatch recordBatch) { Iterator buffers = recordBatch.getBuffers().iterator(); Iterator nodes = recordBatch.getNodes().iterator(); + CompressionCodec codec = + CompressionUtility.createCodec(recordBatch.getBodyCompression().getCodec()); for (FieldVector fieldVector : root.getFieldVectors()) { - loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes); + loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec); } root.setRowCount(recordBatch.getLength()); if (nodes.hasNext() || buffers.hasNext()) { @@ -68,13 +72,19 @@ private void loadBuffers( FieldVector vector, Field field, Iterator buffers, - Iterator nodes) { + Iterator nodes, + CompressionCodec codec) { checkArgument(nodes.hasNext(), "no more field nodes for for field %s and vector %s", field, vector); ArrowFieldNode fieldNode = nodes.next(); int bufferLayoutCount = TypeLayout.getTypeBufferCount(field.getType()); List ownBuffers = new ArrayList<>(bufferLayoutCount); for (int j = 0; j < bufferLayoutCount; j++) { - ownBuffers.add(buffers.next()); + ArrowBuf nextBuf = buffers.next(); + if (codec != null) { + long estimatedBufSize = codec.estimateDecompressedSize(nextBuf); + nextBuf = codec.decompress(nextBuf, estimatedBufSize); + } + ownBuffers.add(nextBuf); } try { vector.loadFieldBuffers(fieldNode, ownBuffers); @@ -91,7 +101,7 @@ private void loadBuffers( for (int i = 0; i < childrenFromFields.size(); i++) { Field child = children.get(i); FieldVector fieldVector = childrenFromFields.get(i); - loadBuffers(fieldVector, child, buffers, nodes); + loadBuffers(fieldVector, child, buffers, nodes, codec); } } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java index 62d063b0fd8..6afa8a8fcb8 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java @@ -19,8 +19,11 @@ import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtility; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; @@ -32,13 +35,14 @@ public class VectorUnloader { private final VectorSchemaRoot root; private final boolean includeNullCount; + private final CompressionCodec codec; private final boolean alignBuffers; /** * Constructs a new instance of the given set of vectors. */ public VectorUnloader(VectorSchemaRoot root) { - this(root, true, true); + this(root, true, null, true); } /** @@ -48,9 +52,24 @@ public VectorUnloader(VectorSchemaRoot root) { * @param includeNullCount Controls whether null count is copied to the {@link ArrowRecordBatch} * @param alignBuffers Controls if buffers get aligned to 8-byte boundaries. */ - public VectorUnloader(VectorSchemaRoot root, boolean includeNullCount, boolean alignBuffers) { + public VectorUnloader( + VectorSchemaRoot root, boolean includeNullCount, boolean alignBuffers) { + this(root, includeNullCount, null, alignBuffers); + } + + /** + * Constructs a new instance. + * + * @param root The set of vectors to serialize to an {@link ArrowRecordBatch}. + * @param includeNullCount Controls whether null count is copied to the {@link ArrowRecordBatch} + * @param codec the codec for compressing data. If it is null, then no compression is needed. + * @param alignBuffers Controls if buffers get aligned to 8-byte boundaries. + */ + public VectorUnloader( + VectorSchemaRoot root, boolean includeNullCount, CompressionCodec codec, boolean alignBuffers) { this.root = root; this.includeNullCount = includeNullCount; + this.codec = codec; this.alignBuffers = alignBuffers; } @@ -64,7 +83,8 @@ public ArrowRecordBatch getRecordBatch() { for (FieldVector vector : root.getFieldVectors()) { appendNodes(vector, nodes, buffers); } - return new ArrowRecordBatch(root.getRowCount(), nodes, buffers, alignBuffers); + return new ArrowRecordBatch( + root.getRowCount(), nodes, buffers, CompressionUtility.createBodyCompression(codec), alignBuffers); } private void appendNodes(FieldVector vector, List nodes, List buffers) { @@ -76,6 +96,12 @@ private void appendNodes(FieldVector vector, List nodes, List { + long estimatedSize = codec.estimateCompressedSize(buf); + return codec.compress(buf, estimatedSize); + }).collect(Collectors.toList()); + } buffers.addAll(fieldBuffers); for (FieldVector child : vector.getChildrenFromFields()) { appendNodes(child, nodes, buffers); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java new file mode 100644 index 00000000000..d83bfd760aa --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.compression; + +import org.apache.arrow.memory.ArrowBuf; + +/** + * The codec for compression/decompression. + */ +public interface CompressionCodec { + + /** + * Given a buffer, estimate the compressed size. + * Please note this operation is optional, and some compression methods may not support it. + * + * @param input the input buffer to be estimated. + * @return the estimated size of the compressed data. + */ + long estimateCompressedSize(ArrowBuf input); + + /** + * Compress a buffer. + * @param input the buffer to compress. + * @param estimatedOutputSize the estimated size of the output data. + * Note that this is optional, and may not be equal to the actual output size. + * @return the compressed buffer. + */ + ArrowBuf compress(ArrowBuf input, long estimatedOutputSize); + + /** + * Given a compressed buffer, estimate the decompressed size. + * Please note this operation is optional, and some compression methods may not support it. + * + * @param input the input buffer to be estimated. + * @return the estimated size of the decompressed data. + */ + long estimateDecompressedSize(ArrowBuf input); + + /** + * Decompress a buffer. + * @param input the buffer to be decompressed. + * @param estimatedOutputSize the estimated size of the output data. + * Note that this is optional, and may not be equal to the actual output size. + * @return the decompressed buffer. + */ + ArrowBuf decompress(ArrowBuf input, long estimatedOutputSize); + + /** + * Gets the name of the codec. + * @return the name of the codec. + */ + String getCodecName(); +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtility.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtility.java new file mode 100644 index 00000000000..c9fbdc9c123 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtility.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.compression; + +import org.apache.arrow.flatbuf.BodyCompressionMethod; +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.vector.ipc.message.ArrowBodyCompression; + +/** + * Utilities for data compression/decompression. + */ +public class CompressionUtility { + + private CompressionUtility() { + } + + /** + * Creates the {@link ArrowBodyCompression} object, given the {@link CompressionCodec}. + */ + public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) { + if (codec == null) { + return ArrowBodyCompression.NO_BODY_COMPRESSION; + } + + for (int i = 0; i < CompressionType.names.length; i++) { + if (CompressionType.names[i].equals(codec.getCodecName())) { + return new ArrowBodyCompression((byte) i, BodyCompressionMethod.BUFFER); + } + } + throw new IllegalArgumentException("Unknown codec: " + codec.getCodecName()); + } + + /** + * Creates the {@link CompressionCodec} given the compression type. + */ + public static CompressionCodec createCodec(byte compressionType) { + switch (compressionType) { + case ArrowBodyCompression.NO_COMPRESSION_TYPE: + return null; + default: + throw new IllegalArgumentException("Compression type not supported: " + compressionType); + } + } +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java new file mode 100644 index 00000000000..7f62df0497f --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.ipc.message; + +import org.apache.arrow.flatbuf.BodyCompression; +import org.apache.arrow.flatbuf.BodyCompressionMethod; + +import com.google.flatbuffers.FlatBufferBuilder; + +/** + * Compression information about data written to a channel. + */ +public class ArrowBodyCompression implements FBSerializable { + + /** + * The codec type when there is no compression. + */ + public static final byte NO_COMPRESSION_TYPE = -1; + + /** + * The {@link ArrowBodyCompression} object for cases when there is no compression. + */ + public static final ArrowBodyCompression NO_BODY_COMPRESSION = + new ArrowBodyCompression(NO_COMPRESSION_TYPE, BodyCompressionMethod.BUFFER); + + /** + * Length of the serialized object. + */ + public static final long BODY_COMPRESSION_LENGTH = 8L; + + private final byte codec; + private final byte method; + + public ArrowBodyCompression(byte codec, byte method) { + this.codec = codec; + this.method = method; + } + + @Override + public int writeTo(FlatBufferBuilder builder) { + return BodyCompression.createBodyCompression(builder, codec, method); + } + + public byte getCodec() { + return codec; + } + + public byte getMethod() { + return method; + } + + @Override + public String toString() { + return "ArrowBodyCompression [codec=" + codec + ", method=" + method + "]"; + } +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index d660a566454..1c6329d55c3 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -51,12 +51,21 @@ public class ArrowRecordBatch implements ArrowMessage { private final List buffers; + private final ArrowBodyCompression bodyCompression; + private final List buffersLayout; private boolean closed = false; - public ArrowRecordBatch(int length, List nodes, List buffers) { - this(length, nodes, buffers, true); + public ArrowRecordBatch( + int length, List nodes, List buffers) { + this(length, nodes, buffers, ArrowBodyCompression.NO_BODY_COMPRESSION, true); + } + + public ArrowRecordBatch( + int length, List nodes, List buffers, + ArrowBodyCompression bodyCompression) { + this(length, nodes, buffers, bodyCompression, true); } /** @@ -65,12 +74,16 @@ public ArrowRecordBatch(int length, List nodes, List b * @param length how many rows in this batch * @param nodes field level info * @param buffers will be retained until this recordBatch is closed + * @param bodyCompression compression info. */ - public ArrowRecordBatch(int length, List nodes, List buffers, boolean alignBuffers) { + public ArrowRecordBatch( + int length, List nodes, List buffers, + ArrowBodyCompression bodyCompression, boolean alignBuffers) { super(); this.length = length; this.nodes = nodes; this.buffers = buffers; + this.bodyCompression = bodyCompression; List arrowBuffers = new ArrayList<>(buffers.size()); long offset = 0; for (ArrowBuf arrowBuf : buffers) { @@ -92,10 +105,13 @@ public ArrowRecordBatch(int length, List nodes, List b // this constructor is different from the public ones in that the reference manager's // retain method is not called, so the first dummy parameter is used // to distinguish this from the public constructor. - private ArrowRecordBatch(boolean dummy, int length, List nodes, List buffers) { + private ArrowRecordBatch( + boolean dummy, int length, List nodes, + List buffers, ArrowBodyCompression bodyCompression) { this.length = length; this.nodes = nodes; this.buffers = buffers; + this.bodyCompression = bodyCompression; this.closed = false; List arrowBuffers = new ArrayList<>(); long offset = 0; @@ -115,6 +131,10 @@ public int getLength() { return length; } + public ArrowBodyCompression getBodyCompression() { + return bodyCompression; + } + /** * Get the nodes in this record batch. * @@ -152,7 +172,7 @@ public ArrowRecordBatch cloneWithTransfer(final BufferAllocator allocator) { .writerIndex(buf.writerIndex())) .collect(Collectors.toList()); close(); - return new ArrowRecordBatch(false, length, nodes, newBufs); + return new ArrowRecordBatch(false, length, nodes, newBufs, bodyCompression); } /** diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index 59000317e62..2de980421ed 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -266,8 +266,12 @@ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch, Ipc long bufferLength = writeBatchBuffers(out, batch); Preconditions.checkArgument(bufferLength % 8 == 0, "out is not aligned"); + long compLength = writeCompressionBody(out, batch.getBodyCompression()); + Preconditions.checkArgument(compLength == ArrowBodyCompression.BODY_COMPRESSION_LENGTH, + "deserialized compression body length not equal to ArrowBodyCompression#BODY_COMPRESSION_LENGTH"); + // Metadata size in the Block account for the size prefix - return new ArrowBlock(start, metadataLength + prefixSize, bufferLength); + return new ArrowBlock(start, metadataLength + prefixSize, bufferLength + compLength); } /** @@ -300,6 +304,18 @@ public static long writeBatchBuffers(WriteChannel out, ArrowRecordBatch batch) t return out.getCurrentPosition() - bufferStart; } + /** + * Serialize the compression body. + */ + public static long writeCompressionBody( + WriteChannel out, ArrowBodyCompression bodyCompression) throws IOException { + long bufferStart = out.getCurrentPosition(); + out.writeIntLittleEndian(bodyCompression.getCodec()); + out.writeIntLittleEndian(bodyCompression.getMethod()); + out.align(); + return out.getCurrentPosition() - bufferStart; + } + /** * Returns the serialized form of {@link RecordBatch} wrapped in a {@link org.apache.arrow.flatbuf.Message}. */ @@ -307,7 +323,7 @@ public static ByteBuffer serializeMetadata(ArrowMessage message) { FlatBufferBuilder builder = new FlatBufferBuilder(); int batchOffset = message.writeTo(builder); return serializeMessage(builder, message.getMessageType(), batchOffset, - message.computeBodyLength()); + message.computeBodyLength() + ArrowBodyCompression.BODY_COMPRESSION_LENGTH); } /** @@ -403,16 +419,25 @@ public static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB, nodes.add(new ArrowFieldNode(node.length(), node.nullCount())); } List buffers = new ArrayList<>(); + long curOffset = 0L; for (int i = 0; i < recordBatchFB.buffersLength(); ++i) { Buffer bufferFB = recordBatchFB.buffers(i); ArrowBuf vectorBuffer = body.slice(bufferFB.offset(), bufferFB.length()); + curOffset = bufferFB.offset() + bufferFB.length(); buffers.add(vectorBuffer); } + + if (curOffset % 8 != 0) { + curOffset += 8 - curOffset % 8; + } + ArrowBodyCompression bodyCompression = + new ArrowBodyCompression((byte) body.getInt(curOffset), (byte) body.getInt(curOffset + 4)); + if ((int) recordBatchFB.length() != recordBatchFB.length()) { throw new IOException("Cannot currently deserialize record batches with more than INT_MAX records."); } ArrowRecordBatch arrowRecordBatch = - new ArrowRecordBatch(checkedCastToInt(recordBatchFB.length()), nodes, buffers); + new ArrowRecordBatch(checkedCastToInt(recordBatchFB.length()), nodes, buffers, bodyCompression); body.getReferenceManager().release(); return arrowRecordBatch; } @@ -472,8 +497,13 @@ public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch, long bufferLength = writeBatchBuffers(out, batch.getDictionary()); Preconditions.checkArgument(bufferLength % 8 == 0, "out is not aligned"); + // write the compression info + long compLength = writeCompressionBody(out, batch.getDictionary().getBodyCompression()); + Preconditions.checkArgument(compLength == ArrowBodyCompression.BODY_COMPRESSION_LENGTH, + "deserialized compression body length not equal to ArrowBodyCompression#BODY_COMPRESSION_LENGTH"); + // Metadata size in the Block account for the size prefix - return new ArrowBlock(start, metadataLength + prefixSize, bufferLength); + return new ArrowBlock(start, metadataLength + prefixSize, bufferLength + compLength); } /** From 48d536550c5be4e5a5839c4621529ae17565b080 Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Fri, 5 Jun 2020 10:42:21 +0800 Subject: [PATCH 2/8] ARROW-9010: [Java] Make compression body optional --- .../org/apache/arrow/flight/ArrowMessage.java | 23 ++++++++---- .../org/apache/arrow/vector/VectorLoader.java | 2 +- .../compression/CompressionUtility.java | 5 +-- .../ipc/message/ArrowBodyCompression.java | 14 +------- .../vector/ipc/message/ArrowRecordBatch.java | 8 ++++- .../vector/ipc/message/MessageSerializer.java | 36 ++++++++++++------- 6 files changed, 50 insertions(+), 38 deletions(-) diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java index 86f24706671..ab6c6d536cb 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java @@ -361,13 +361,22 @@ private InputStream asInputStream(BufferAllocator allocator) { // the reference count b.getReferenceManager().retain(); } - // add compression info - ArrowBuf compBuf = allocator.buffer(ArrowBodyCompression.BODY_COMPRESSION_LENGTH); - compBuf.setInt(0, bodyCompression.getCodec()); - compBuf.setInt(4, bodyCompression.getMethod()); - compBuf.writerIndex(ArrowBodyCompression.BODY_COMPRESSION_LENGTH); - size += ArrowBodyCompression.BODY_COMPRESSION_LENGTH; - allBufs.add(compBuf.asNettyBuffer()); + + // add compression info, if any + if (bodyCompression != null) { + ArrowBuf compBuf = allocator.buffer(ArrowBodyCompression.BODY_COMPRESSION_LENGTH); + compBuf.setByte(0, bodyCompression.getCodec()); + compBuf.setByte(1, bodyCompression.getMethod()); + compBuf.writerIndex(ArrowBodyCompression.BODY_COMPRESSION_LENGTH); + size += ArrowBodyCompression.BODY_COMPRESSION_LENGTH; + allBufs.add(compBuf.asNettyBuffer()); + + // align + int paddingBytes = (int) (8 - ArrowBodyCompression.BODY_COMPRESSION_LENGTH); + assert paddingBytes > 0 && paddingBytes < 8; + size += paddingBytes; + allBufs.add(PADDING_BUFFERS.get(paddingBytes).retain()); + } // rawvarint is used for length definition. cos.writeUInt32NoTag(size); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index d29443c9331..edb6bf074d5 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -56,7 +56,7 @@ public VectorLoader(VectorSchemaRoot root) { public void load(ArrowRecordBatch recordBatch) { Iterator buffers = recordBatch.getBuffers().iterator(); Iterator nodes = recordBatch.getNodes().iterator(); - CompressionCodec codec = + CompressionCodec codec = recordBatch.getBodyCompression() == null ? null : CompressionUtility.createCodec(recordBatch.getBodyCompression().getCodec()); for (FieldVector fieldVector : root.getFieldVectors()) { loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtility.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtility.java index c9fbdc9c123..fcd4a4f43fb 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtility.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtility.java @@ -34,9 +34,8 @@ private CompressionUtility() { */ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) { if (codec == null) { - return ArrowBodyCompression.NO_BODY_COMPRESSION; + return null; } - for (int i = 0; i < CompressionType.names.length; i++) { if (CompressionType.names[i].equals(codec.getCodecName())) { return new ArrowBodyCompression((byte) i, BodyCompressionMethod.BUFFER); @@ -50,8 +49,6 @@ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) */ public static CompressionCodec createCodec(byte compressionType) { switch (compressionType) { - case ArrowBodyCompression.NO_COMPRESSION_TYPE: - return null; default: throw new IllegalArgumentException("Compression type not supported: " + compressionType); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java index 7f62df0497f..cb4fe19c05b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java @@ -18,7 +18,6 @@ package org.apache.arrow.vector.ipc.message; import org.apache.arrow.flatbuf.BodyCompression; -import org.apache.arrow.flatbuf.BodyCompressionMethod; import com.google.flatbuffers.FlatBufferBuilder; @@ -27,21 +26,10 @@ */ public class ArrowBodyCompression implements FBSerializable { - /** - * The codec type when there is no compression. - */ - public static final byte NO_COMPRESSION_TYPE = -1; - - /** - * The {@link ArrowBodyCompression} object for cases when there is no compression. - */ - public static final ArrowBodyCompression NO_BODY_COMPRESSION = - new ArrowBodyCompression(NO_COMPRESSION_TYPE, BodyCompressionMethod.BUFFER); - /** * Length of the serialized object. */ - public static final long BODY_COMPRESSION_LENGTH = 8L; + public static final long BODY_COMPRESSION_LENGTH = 2L; private final byte codec; private final byte method; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index 1c6329d55c3..2b4f02fe476 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -59,7 +59,7 @@ public class ArrowRecordBatch implements ArrowMessage { public ArrowRecordBatch( int length, List nodes, List buffers) { - this(length, nodes, buffers, ArrowBodyCompression.NO_BODY_COMPRESSION, true); + this(length, nodes, buffers, null, true); } public ArrowRecordBatch( @@ -252,6 +252,12 @@ public long computeBodyLength() { // round up size to the next multiple of 8 size = DataSizeRoundingUtil.roundUpTo8Multiple(size); } + + if (bodyCompression != null) { + size += ArrowBodyCompression.BODY_COMPRESSION_LENGTH; + size = DataSizeRoundingUtil.roundUpTo8Multiple(size); + } + return size; } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index 2de980421ed..0161aed342a 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -266,9 +266,12 @@ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch, Ipc long bufferLength = writeBatchBuffers(out, batch); Preconditions.checkArgument(bufferLength % 8 == 0, "out is not aligned"); - long compLength = writeCompressionBody(out, batch.getBodyCompression()); - Preconditions.checkArgument(compLength == ArrowBodyCompression.BODY_COMPRESSION_LENGTH, - "deserialized compression body length not equal to ArrowBodyCompression#BODY_COMPRESSION_LENGTH"); + long compLength = 0L; + if (batch.getBodyCompression() != null) { + compLength = writeCompressionBody(out, batch.getBodyCompression()); + Preconditions.checkArgument(compLength == ArrowBodyCompression.BODY_COMPRESSION_LENGTH, + "deserialized compression body length not equal to ArrowBodyCompression#BODY_COMPRESSION_LENGTH"); + } // Metadata size in the Block account for the size prefix return new ArrowBlock(start, metadataLength + prefixSize, bufferLength + compLength); @@ -310,8 +313,10 @@ public static long writeBatchBuffers(WriteChannel out, ArrowRecordBatch batch) t public static long writeCompressionBody( WriteChannel out, ArrowBodyCompression bodyCompression) throws IOException { long bufferStart = out.getCurrentPosition(); - out.writeIntLittleEndian(bodyCompression.getCodec()); - out.writeIntLittleEndian(bodyCompression.getMethod()); + ByteBuffer buf = ByteBuffer.allocate(2); + buf.put(bodyCompression.getCodec()); + buf.put(bodyCompression.getMethod()); + out.write(buf); out.align(); return out.getCurrentPosition() - bufferStart; } @@ -323,7 +328,7 @@ public static ByteBuffer serializeMetadata(ArrowMessage message) { FlatBufferBuilder builder = new FlatBufferBuilder(); int batchOffset = message.writeTo(builder); return serializeMessage(builder, message.getMessageType(), batchOffset, - message.computeBodyLength() + ArrowBodyCompression.BODY_COMPRESSION_LENGTH); + message.computeBodyLength()); } /** @@ -430,8 +435,12 @@ public static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB, if (curOffset % 8 != 0) { curOffset += 8 - curOffset % 8; } - ArrowBodyCompression bodyCompression = - new ArrowBodyCompression((byte) body.getInt(curOffset), (byte) body.getInt(curOffset + 4)); + ArrowBodyCompression bodyCompression = null; + if (body.writerIndex() > curOffset) { + assert body.writerIndex() - curOffset >= 8; + bodyCompression = + new ArrowBodyCompression(body.getByte(curOffset), body.getByte(curOffset + 1)); + } if ((int) recordBatchFB.length() != recordBatchFB.length()) { throw new IOException("Cannot currently deserialize record batches with more than INT_MAX records."); @@ -497,10 +506,13 @@ public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch, long bufferLength = writeBatchBuffers(out, batch.getDictionary()); Preconditions.checkArgument(bufferLength % 8 == 0, "out is not aligned"); - // write the compression info - long compLength = writeCompressionBody(out, batch.getDictionary().getBodyCompression()); - Preconditions.checkArgument(compLength == ArrowBodyCompression.BODY_COMPRESSION_LENGTH, - "deserialized compression body length not equal to ArrowBodyCompression#BODY_COMPRESSION_LENGTH"); + // write the compression info, if any + long compLength = 0L; + if (batch.getDictionary().getBodyCompression() != null) { + compLength = writeCompressionBody(out, batch.getDictionary().getBodyCompression()); + Preconditions.checkArgument(compLength == ArrowBodyCompression.BODY_COMPRESSION_LENGTH, + "deserialized compression body length not equal to ArrowBodyCompression#BODY_COMPRESSION_LENGTH"); + } // Metadata size in the Block account for the size prefix return new ArrowBlock(start, metadataLength + prefixSize, bufferLength + compLength); From 75bfb52a479e948583e1550eb1c918e8d444987c Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Mon, 8 Jun 2020 20:22:37 +0800 Subject: [PATCH 3/8] ARROW-9010: [Java] Revise the implementation of ArrowCompressionBody --- .../vector/ipc/message/ArrowBodyCompression.java | 15 +++++++-------- .../vector/ipc/message/MessageSerializer.java | 5 +---- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java index cb4fe19c05b..27d4442971e 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java @@ -31,29 +31,28 @@ public class ArrowBodyCompression implements FBSerializable { */ public static final long BODY_COMPRESSION_LENGTH = 2L; - private final byte codec; - private final byte method; + final byte[] data = new byte[(int) BODY_COMPRESSION_LENGTH]; public ArrowBodyCompression(byte codec, byte method) { - this.codec = codec; - this.method = method; + this.data[0] = codec; + this.data[1] = method; } @Override public int writeTo(FlatBufferBuilder builder) { - return BodyCompression.createBodyCompression(builder, codec, method); + return BodyCompression.createBodyCompression(builder, data[0], data[1]); } public byte getCodec() { - return codec; + return data[0]; } public byte getMethod() { - return method; + return data[1]; } @Override public String toString() { - return "ArrowBodyCompression [codec=" + codec + ", method=" + method + "]"; + return "ArrowBodyCompression [codec=" + data[0] + ", method=" + data[1] + "]"; } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index 0161aed342a..b758a781a06 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -313,10 +313,7 @@ public static long writeBatchBuffers(WriteChannel out, ArrowRecordBatch batch) t public static long writeCompressionBody( WriteChannel out, ArrowBodyCompression bodyCompression) throws IOException { long bufferStart = out.getCurrentPosition(); - ByteBuffer buf = ByteBuffer.allocate(2); - buf.put(bodyCompression.getCodec()); - buf.put(bodyCompression.getMethod()); - out.write(buf); + out.write(bodyCompression.data); out.align(); return out.getCurrentPosition() - bufferStart; } From c2d14e60557e7ded6029bb5a770471ad3d2b349d Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Tue, 30 Jun 2020 14:31:13 +0800 Subject: [PATCH 4/8] ARROW-9010: [Java] Simplify the compression codec interface --- .../org/apache/arrow/vector/VectorLoader.java | 3 +-- .../apache/arrow/vector/VectorUnloader.java | 3 +-- .../vector/compression/CompressionCodec.java | 26 ++----------------- .../compression/CompressionUtility.java | 13 ++++++---- 4 files changed, 12 insertions(+), 33 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index edb6bf074d5..a55244483bd 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -81,8 +81,7 @@ private void loadBuffers( for (int j = 0; j < bufferLayoutCount; j++) { ArrowBuf nextBuf = buffers.next(); if (codec != null) { - long estimatedBufSize = codec.estimateDecompressedSize(nextBuf); - nextBuf = codec.decompress(nextBuf, estimatedBufSize); + nextBuf = codec.decompress(nextBuf); } ownBuffers.add(nextBuf); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java index 6afa8a8fcb8..d867a0bd904 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java @@ -98,8 +98,7 @@ private void appendNodes(FieldVector vector, List nodes, List { - long estimatedSize = codec.estimateCompressedSize(buf); - return codec.compress(buf, estimatedSize); + return codec.compress(buf); }).collect(Collectors.toList()); } buffers.addAll(fieldBuffers); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java index d83bfd760aa..118d8bd1a9c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java @@ -24,41 +24,19 @@ */ public interface CompressionCodec { - /** - * Given a buffer, estimate the compressed size. - * Please note this operation is optional, and some compression methods may not support it. - * - * @param input the input buffer to be estimated. - * @return the estimated size of the compressed data. - */ - long estimateCompressedSize(ArrowBuf input); - /** * Compress a buffer. * @param input the buffer to compress. - * @param estimatedOutputSize the estimated size of the output data. - * Note that this is optional, and may not be equal to the actual output size. * @return the compressed buffer. */ - ArrowBuf compress(ArrowBuf input, long estimatedOutputSize); - - /** - * Given a compressed buffer, estimate the decompressed size. - * Please note this operation is optional, and some compression methods may not support it. - * - * @param input the input buffer to be estimated. - * @return the estimated size of the decompressed data. - */ - long estimateDecompressedSize(ArrowBuf input); + ArrowBuf compress(ArrowBuf input); /** * Decompress a buffer. * @param input the buffer to be decompressed. - * @param estimatedOutputSize the estimated size of the output data. - * Note that this is optional, and may not be equal to the actual output size. * @return the decompressed buffer. */ - ArrowBuf decompress(ArrowBuf input, long estimatedOutputSize); + ArrowBuf decompress(ArrowBuf input); /** * Gets the name of the codec. diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtility.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtility.java index fcd4a4f43fb..ef2d5fc4ca5 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtility.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtility.java @@ -31,17 +31,20 @@ private CompressionUtility() { /** * Creates the {@link ArrowBodyCompression} object, given the {@link CompressionCodec}. + * The implementation of this method should depend on the values of {@link CompressionType#names}. */ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) { if (codec == null) { return null; } - for (int i = 0; i < CompressionType.names.length; i++) { - if (CompressionType.names[i].equals(codec.getCodecName())) { - return new ArrowBodyCompression((byte) i, BodyCompressionMethod.BUFFER); - } + switch (codec.getCodecName()) { + case "LZ4_FRAME": + return new ArrowBodyCompression((byte) 0, BodyCompressionMethod.BUFFER); + case "ZSTD": + return new ArrowBodyCompression((byte) 1, BodyCompressionMethod.BUFFER); + default: + throw new IllegalArgumentException("Unknown codec: " + codec.getCodecName()); } - throw new IllegalArgumentException("Unknown codec: " + codec.getCodecName()); } /** From a842ce67ef1700085ffdd962ef7dbad27f00011c Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Wed, 5 Aug 2020 17:58:12 +0800 Subject: [PATCH 5/8] ARROW-9010: [Java] Provide default compression codec --- .../org/apache/arrow/flight/ArrowMessage.java | 26 ++------- .../org/apache/arrow/vector/VectorLoader.java | 10 ++-- .../apache/arrow/vector/VectorUnloader.java | 18 +++---- .../vector/compression/CompressionCodec.java | 11 ++-- ...ssionUtility.java => CompressionUtil.java} | 11 ++-- .../compression/DefaultCompressionCodec.java | 54 +++++++++++++++++++ .../ipc/message/ArrowBodyCompression.java | 19 +++---- .../vector/ipc/message/ArrowRecordBatch.java | 14 ++--- .../vector/ipc/message/MessageSerializer.java | 43 ++------------- 9 files changed, 103 insertions(+), 103 deletions(-) rename java/vector/src/main/java/org/apache/arrow/vector/compression/{CompressionUtility.java => CompressionUtil.java} (89%) create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/compression/DefaultCompressionCodec.java diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java index ab6c6d536cb..fc82f1a90f8 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java @@ -35,6 +35,7 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.DefaultCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowBodyCompression; import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; @@ -130,7 +131,7 @@ public ArrowMessage(FlightDescriptor descriptor, Schema schema) { bufs = ImmutableList.of(); this.descriptor = descriptor; this.appMetadata = null; - this.bodyCompression = null; + this.bodyCompression = DefaultCompressionCodec.DEFAULT_BODY_COMPRESSION; } /** @@ -168,7 +169,7 @@ public ArrowMessage(ArrowBuf appMetadata) { this.bufs = ImmutableList.of(); this.descriptor = null; this.appMetadata = appMetadata; - this.bodyCompression = null; + this.bodyCompression = DefaultCompressionCodec.DEFAULT_BODY_COMPRESSION; } public ArrowMessage(FlightDescriptor descriptor) { @@ -176,7 +177,7 @@ public ArrowMessage(FlightDescriptor descriptor) { this.bufs = ImmutableList.of(); this.descriptor = descriptor; this.appMetadata = null; - this.bodyCompression = null; + this.bodyCompression = DefaultCompressionCodec.DEFAULT_BODY_COMPRESSION; } private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, ArrowBuf appMetadata, @@ -185,7 +186,7 @@ private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, this.descriptor = descriptor; this.appMetadata = appMetadata; this.bufs = buf == null ? ImmutableList.of() : ImmutableList.of(buf); - this.bodyCompression = null; + this.bodyCompression = DefaultCompressionCodec.DEFAULT_BODY_COMPRESSION; } public MessageMetadataResult asSchemaMessage() { @@ -361,23 +362,6 @@ private InputStream asInputStream(BufferAllocator allocator) { // the reference count b.getReferenceManager().retain(); } - - // add compression info, if any - if (bodyCompression != null) { - ArrowBuf compBuf = allocator.buffer(ArrowBodyCompression.BODY_COMPRESSION_LENGTH); - compBuf.setByte(0, bodyCompression.getCodec()); - compBuf.setByte(1, bodyCompression.getMethod()); - compBuf.writerIndex(ArrowBodyCompression.BODY_COMPRESSION_LENGTH); - size += ArrowBodyCompression.BODY_COMPRESSION_LENGTH; - allBufs.add(compBuf.asNettyBuffer()); - - // align - int paddingBytes = (int) (8 - ArrowBodyCompression.BODY_COMPRESSION_LENGTH); - assert paddingBytes > 0 && paddingBytes < 8; - size += paddingBytes; - allBufs.add(PADDING_BUFFERS.get(paddingBytes).retain()); - } - // rawvarint is used for length definition. cos.writeUInt32NoTag(size); cos.flush(); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index a55244483bd..3a4b00de5ef 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -26,7 +26,7 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.util.Collections2; import org.apache.arrow.vector.compression.CompressionCodec; -import org.apache.arrow.vector.compression.CompressionUtility; +import org.apache.arrow.vector.compression.CompressionUtil; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Field; @@ -56,8 +56,7 @@ public VectorLoader(VectorSchemaRoot root) { public void load(ArrowRecordBatch recordBatch) { Iterator buffers = recordBatch.getBuffers().iterator(); Iterator nodes = recordBatch.getNodes().iterator(); - CompressionCodec codec = recordBatch.getBodyCompression() == null ? null : - CompressionUtility.createCodec(recordBatch.getBodyCompression().getCodec()); + CompressionCodec codec = CompressionUtil.createCodec(recordBatch.getBodyCompression().getCodec()); for (FieldVector fieldVector : root.getFieldVectors()) { loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec); } @@ -80,10 +79,7 @@ private void loadBuffers( List ownBuffers = new ArrayList<>(bufferLayoutCount); for (int j = 0; j < bufferLayoutCount; j++) { ArrowBuf nextBuf = buffers.next(); - if (codec != null) { - nextBuf = codec.decompress(nextBuf); - } - ownBuffers.add(nextBuf); + ownBuffers.add(codec.decompress(vector.getAllocator(), nextBuf)); } try { vector.loadFieldBuffers(fieldNode, ownBuffers); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java index d867a0bd904..07627dcfcb6 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java @@ -23,7 +23,8 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.vector.compression.CompressionCodec; -import org.apache.arrow.vector.compression.CompressionUtility; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.arrow.vector.compression.DefaultCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; @@ -42,7 +43,7 @@ public class VectorUnloader { * Constructs a new instance of the given set of vectors. */ public VectorUnloader(VectorSchemaRoot root) { - this(root, true, null, true); + this(root, true, DefaultCompressionCodec.INSTANCE, true); } /** @@ -54,7 +55,7 @@ public VectorUnloader(VectorSchemaRoot root) { */ public VectorUnloader( VectorSchemaRoot root, boolean includeNullCount, boolean alignBuffers) { - this(root, includeNullCount, null, alignBuffers); + this(root, includeNullCount, DefaultCompressionCodec.INSTANCE, alignBuffers); } /** @@ -84,7 +85,7 @@ public ArrowRecordBatch getRecordBatch() { appendNodes(vector, nodes, buffers); } return new ArrowRecordBatch( - root.getRowCount(), nodes, buffers, CompressionUtility.createBodyCompression(codec), alignBuffers); + root.getRowCount(), nodes, buffers, CompressionUtil.createBodyCompression(codec), alignBuffers); } private void appendNodes(FieldVector vector, List nodes, List buffers) { @@ -96,11 +97,10 @@ private void appendNodes(FieldVector vector, List nodes, List { - return codec.compress(buf); - }).collect(Collectors.toList()); - } + + fieldBuffers = fieldBuffers.stream().map( + buf -> codec.compress(vector.getAllocator(), buf)).collect(Collectors.toList()); + buffers.addAll(fieldBuffers); for (FieldVector child : vector.getChildrenFromFields()) { appendNodes(child, nodes, buffers); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java index 118d8bd1a9c..df43b31ee69 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java @@ -18,6 +18,7 @@ package org.apache.arrow.vector.compression; import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; /** * The codec for compression/decompression. @@ -26,17 +27,19 @@ public interface CompressionCodec { /** * Compress a buffer. - * @param input the buffer to compress. + * @param allocator the allocator for allocating memory for compressed buffer. + * @param unCompressedBuffer the buffer to compress. * @return the compressed buffer. */ - ArrowBuf compress(ArrowBuf input); + ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer); /** * Decompress a buffer. - * @param input the buffer to be decompressed. + * @param allocator the allocator for allocating memory for decompressed buffer. + * @param compressedBuffer the buffer to be decompressed. * @return the decompressed buffer. */ - ArrowBuf decompress(ArrowBuf input); + ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer); /** * Gets the name of the codec. diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtility.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java similarity index 89% rename from java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtility.java rename to java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java index ef2d5fc4ca5..c458c4d999a 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtility.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java @@ -24,9 +24,9 @@ /** * Utilities for data compression/decompression. */ -public class CompressionUtility { +public class CompressionUtil { - private CompressionUtility() { + private CompressionUtil() { } /** @@ -34,10 +34,9 @@ private CompressionUtility() { * The implementation of this method should depend on the values of {@link CompressionType#names}. */ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) { - if (codec == null) { - return null; - } switch (codec.getCodecName()) { + case "default": + return DefaultCompressionCodec.DEFAULT_BODY_COMPRESSION; case "LZ4_FRAME": return new ArrowBodyCompression((byte) 0, BodyCompressionMethod.BUFFER); case "ZSTD": @@ -52,6 +51,8 @@ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) */ public static CompressionCodec createCodec(byte compressionType) { switch (compressionType) { + case DefaultCompressionCodec.COMPRESSION_TYPE: + return DefaultCompressionCodec.INSTANCE; default: throw new IllegalArgumentException("Compression type not supported: " + compressionType); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/DefaultCompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/DefaultCompressionCodec.java new file mode 100644 index 00000000000..d50822bb259 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/DefaultCompressionCodec.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.compression; + +import org.apache.arrow.flatbuf.BodyCompressionMethod; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.message.ArrowBodyCompression; + +/** + * The default compression codec that does no compression. + */ +public class DefaultCompressionCodec implements CompressionCodec { + + public static final DefaultCompressionCodec INSTANCE = new DefaultCompressionCodec(); + + public static final byte COMPRESSION_TYPE = -1; + + public static final ArrowBodyCompression DEFAULT_BODY_COMPRESSION = + new ArrowBodyCompression(COMPRESSION_TYPE, BodyCompressionMethod.BUFFER); + + private DefaultCompressionCodec() { + } + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) { + return unCompressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { + return compressedBuffer; + } + + @Override + public String getCodecName() { + return "default"; + } +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java index 27d4442971e..5370ddfa09d 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBodyCompression.java @@ -26,33 +26,30 @@ */ public class ArrowBodyCompression implements FBSerializable { - /** - * Length of the serialized object. - */ - public static final long BODY_COMPRESSION_LENGTH = 2L; + private final byte codec; - final byte[] data = new byte[(int) BODY_COMPRESSION_LENGTH]; + private final byte method; public ArrowBodyCompression(byte codec, byte method) { - this.data[0] = codec; - this.data[1] = method; + this.codec = codec; + this.method = method; } @Override public int writeTo(FlatBufferBuilder builder) { - return BodyCompression.createBodyCompression(builder, data[0], data[1]); + return BodyCompression.createBodyCompression(builder, codec, method); } public byte getCodec() { - return data[0]; + return codec; } public byte getMethod() { - return data[1]; + return method; } @Override public String toString() { - return "ArrowBodyCompression [codec=" + data[0] + ", method=" + data[1] + "]"; + return "ArrowBodyCompression [codec=" + codec + ", method=" + method + "]"; } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index 2b4f02fe476..9a4a1d6a1eb 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -27,6 +27,8 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.DataSizeRoundingUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.DefaultCompressionCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +61,7 @@ public class ArrowRecordBatch implements ArrowMessage { public ArrowRecordBatch( int length, List nodes, List buffers) { - this(length, nodes, buffers, null, true); + this(length, nodes, buffers, DefaultCompressionCodec.DEFAULT_BODY_COMPRESSION, true); } public ArrowRecordBatch( @@ -84,6 +86,7 @@ public ArrowRecordBatch( this.nodes = nodes; this.buffers = buffers; this.bodyCompression = bodyCompression; + Preconditions.checkArgument(bodyCompression != null, "body compression cannot be null"); List arrowBuffers = new ArrayList<>(buffers.size()); long offset = 0; for (ArrowBuf arrowBuf : buffers) { @@ -112,6 +115,7 @@ private ArrowRecordBatch( this.nodes = nodes; this.buffers = buffers; this.bodyCompression = bodyCompression; + Preconditions.checkArgument(bodyCompression != null, "body compression cannot be null"); this.closed = false; List arrowBuffers = new ArrayList<>(); long offset = 0; @@ -190,10 +194,12 @@ public int writeTo(FlatBufferBuilder builder) { int nodesOffset = FBSerializables.writeAllStructsToVector(builder, nodes); RecordBatch.startBuffersVector(builder, buffers.size()); int buffersOffset = FBSerializables.writeAllStructsToVector(builder, buffersLayout); + int compressOffset = bodyCompression.writeTo(builder); RecordBatch.startRecordBatch(builder); RecordBatch.addLength(builder, length); RecordBatch.addNodes(builder, nodesOffset); RecordBatch.addBuffers(builder, buffersOffset); + RecordBatch.addCompression(builder, compressOffset); return RecordBatch.endRecordBatch(builder); } @@ -252,12 +258,6 @@ public long computeBodyLength() { // round up size to the next multiple of 8 size = DataSizeRoundingUtil.roundUpTo8Multiple(size); } - - if (bodyCompression != null) { - size += ArrowBodyCompression.BODY_COMPRESSION_LENGTH; - size = DataSizeRoundingUtil.roundUpTo8Multiple(size); - } - return size; } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index b758a781a06..68b42eeac83 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -266,15 +266,8 @@ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch, Ipc long bufferLength = writeBatchBuffers(out, batch); Preconditions.checkArgument(bufferLength % 8 == 0, "out is not aligned"); - long compLength = 0L; - if (batch.getBodyCompression() != null) { - compLength = writeCompressionBody(out, batch.getBodyCompression()); - Preconditions.checkArgument(compLength == ArrowBodyCompression.BODY_COMPRESSION_LENGTH, - "deserialized compression body length not equal to ArrowBodyCompression#BODY_COMPRESSION_LENGTH"); - } - // Metadata size in the Block account for the size prefix - return new ArrowBlock(start, metadataLength + prefixSize, bufferLength + compLength); + return new ArrowBlock(start, metadataLength + prefixSize, bufferLength); } /** @@ -307,17 +300,6 @@ public static long writeBatchBuffers(WriteChannel out, ArrowRecordBatch batch) t return out.getCurrentPosition() - bufferStart; } - /** - * Serialize the compression body. - */ - public static long writeCompressionBody( - WriteChannel out, ArrowBodyCompression bodyCompression) throws IOException { - long bufferStart = out.getCurrentPosition(); - out.write(bodyCompression.data); - out.align(); - return out.getCurrentPosition() - bufferStart; - } - /** * Returns the serialized form of {@link RecordBatch} wrapped in a {@link org.apache.arrow.flatbuf.Message}. */ @@ -421,23 +403,14 @@ public static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB, nodes.add(new ArrowFieldNode(node.length(), node.nullCount())); } List buffers = new ArrayList<>(); - long curOffset = 0L; for (int i = 0; i < recordBatchFB.buffersLength(); ++i) { Buffer bufferFB = recordBatchFB.buffers(i); ArrowBuf vectorBuffer = body.slice(bufferFB.offset(), bufferFB.length()); - curOffset = bufferFB.offset() + bufferFB.length(); buffers.add(vectorBuffer); } - if (curOffset % 8 != 0) { - curOffset += 8 - curOffset % 8; - } - ArrowBodyCompression bodyCompression = null; - if (body.writerIndex() > curOffset) { - assert body.writerIndex() - curOffset >= 8; - bodyCompression = - new ArrowBodyCompression(body.getByte(curOffset), body.getByte(curOffset + 1)); - } + ArrowBodyCompression bodyCompression = + new ArrowBodyCompression(recordBatchFB.compression().codec(), recordBatchFB.compression().method()); if ((int) recordBatchFB.length() != recordBatchFB.length()) { throw new IOException("Cannot currently deserialize record batches with more than INT_MAX records."); @@ -503,16 +476,8 @@ public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch, long bufferLength = writeBatchBuffers(out, batch.getDictionary()); Preconditions.checkArgument(bufferLength % 8 == 0, "out is not aligned"); - // write the compression info, if any - long compLength = 0L; - if (batch.getDictionary().getBodyCompression() != null) { - compLength = writeCompressionBody(out, batch.getDictionary().getBodyCompression()); - Preconditions.checkArgument(compLength == ArrowBodyCompression.BODY_COMPRESSION_LENGTH, - "deserialized compression body length not equal to ArrowBodyCompression#BODY_COMPRESSION_LENGTH"); - } - // Metadata size in the Block account for the size prefix - return new ArrowBlock(start, metadataLength + prefixSize, bufferLength + compLength); + return new ArrowBlock(start, metadataLength + prefixSize, bufferLength); } /** From bb9fab5846de75ba5934f107bf56e94572a67b0c Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Wed, 12 Aug 2020 11:37:39 +0800 Subject: [PATCH 6/8] ARROW-9010: [Java] Resolve comments --- .../org/apache/arrow/flight/ArrowMessage.java | 10 +++++----- .../org/apache/arrow/vector/VectorUnloader.java | 15 ++++++--------- .../vector/compression/CompressionCodec.java | 2 ++ .../arrow/vector/compression/CompressionUtil.java | 10 +++++----- ...pressionCodec.java => NoCompressionCodec.java} | 6 +++--- .../vector/ipc/message/ArrowRecordBatch.java | 4 ++-- .../vector/ipc/message/MessageSerializer.java | 6 ++++-- 7 files changed, 27 insertions(+), 26 deletions(-) rename java/vector/src/main/java/org/apache/arrow/vector/compression/{DefaultCompressionCodec.java => NoCompressionCodec.java} (89%) diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java index fc82f1a90f8..7f441202ea6 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java @@ -35,7 +35,7 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.util.Preconditions; -import org.apache.arrow.vector.compression.DefaultCompressionCodec; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowBodyCompression; import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; @@ -131,7 +131,7 @@ public ArrowMessage(FlightDescriptor descriptor, Schema schema) { bufs = ImmutableList.of(); this.descriptor = descriptor; this.appMetadata = null; - this.bodyCompression = DefaultCompressionCodec.DEFAULT_BODY_COMPRESSION; + this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION; } /** @@ -169,7 +169,7 @@ public ArrowMessage(ArrowBuf appMetadata) { this.bufs = ImmutableList.of(); this.descriptor = null; this.appMetadata = appMetadata; - this.bodyCompression = DefaultCompressionCodec.DEFAULT_BODY_COMPRESSION; + this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION; } public ArrowMessage(FlightDescriptor descriptor) { @@ -177,7 +177,7 @@ public ArrowMessage(FlightDescriptor descriptor) { this.bufs = ImmutableList.of(); this.descriptor = descriptor; this.appMetadata = null; - this.bodyCompression = DefaultCompressionCodec.DEFAULT_BODY_COMPRESSION; + this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION; } private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, ArrowBuf appMetadata, @@ -186,7 +186,7 @@ private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, this.descriptor = descriptor; this.appMetadata = appMetadata; this.bufs = buf == null ? ImmutableList.of() : ImmutableList.of(buf); - this.bodyCompression = DefaultCompressionCodec.DEFAULT_BODY_COMPRESSION; + this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION; } public MessageMetadataResult asSchemaMessage() { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java index 07627dcfcb6..e2cbf3ec1d8 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java @@ -19,12 +19,11 @@ import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.vector.compression.CompressionCodec; import org.apache.arrow.vector.compression.CompressionUtil; -import org.apache.arrow.vector.compression.DefaultCompressionCodec; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; @@ -43,7 +42,7 @@ public class VectorUnloader { * Constructs a new instance of the given set of vectors. */ public VectorUnloader(VectorSchemaRoot root) { - this(root, true, DefaultCompressionCodec.INSTANCE, true); + this(root, true, NoCompressionCodec.INSTANCE, true); } /** @@ -55,7 +54,7 @@ public VectorUnloader(VectorSchemaRoot root) { */ public VectorUnloader( VectorSchemaRoot root, boolean includeNullCount, boolean alignBuffers) { - this(root, includeNullCount, DefaultCompressionCodec.INSTANCE, alignBuffers); + this(root, includeNullCount, NoCompressionCodec.INSTANCE, alignBuffers); } /** @@ -97,11 +96,9 @@ private void appendNodes(FieldVector vector, List nodes, List codec.compress(vector.getAllocator(), buf)).collect(Collectors.toList()); - - buffers.addAll(fieldBuffers); + for (ArrowBuf buf : fieldBuffers) { + buffers.add(codec.compress(vector.getAllocator(), buf)); + } for (FieldVector child : vector.getChildrenFromFields()) { appendNodes(child, nodes, buffers); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java index df43b31ee69..ce2dd73aab5 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java @@ -29,6 +29,7 @@ public interface CompressionCodec { * Compress a buffer. * @param allocator the allocator for allocating memory for compressed buffer. * @param unCompressedBuffer the buffer to compress. + * Implementation of this method should take care of releasing this buffer. * @return the compressed buffer. */ ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer); @@ -37,6 +38,7 @@ public interface CompressionCodec { * Decompress a buffer. * @param allocator the allocator for allocating memory for decompressed buffer. * @param compressedBuffer the buffer to be decompressed. + * Implementation of this method should take care of releasing this buffer. * @return the decompressed buffer. */ ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java index c458c4d999a..464f3aa8e9c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java @@ -36,11 +36,11 @@ private CompressionUtil() { public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) { switch (codec.getCodecName()) { case "default": - return DefaultCompressionCodec.DEFAULT_BODY_COMPRESSION; + return NoCompressionCodec.DEFAULT_BODY_COMPRESSION; case "LZ4_FRAME": - return new ArrowBodyCompression((byte) 0, BodyCompressionMethod.BUFFER); + return new ArrowBodyCompression(CompressionType.LZ4_FRAME, BodyCompressionMethod.BUFFER); case "ZSTD": - return new ArrowBodyCompression((byte) 1, BodyCompressionMethod.BUFFER); + return new ArrowBodyCompression(CompressionType.ZSTD, BodyCompressionMethod.BUFFER); default: throw new IllegalArgumentException("Unknown codec: " + codec.getCodecName()); } @@ -51,8 +51,8 @@ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) */ public static CompressionCodec createCodec(byte compressionType) { switch (compressionType) { - case DefaultCompressionCodec.COMPRESSION_TYPE: - return DefaultCompressionCodec.INSTANCE; + case NoCompressionCodec.COMPRESSION_TYPE: + return NoCompressionCodec.INSTANCE; default: throw new IllegalArgumentException("Compression type not supported: " + compressionType); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/DefaultCompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java similarity index 89% rename from java/vector/src/main/java/org/apache/arrow/vector/compression/DefaultCompressionCodec.java rename to java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java index d50822bb259..72273de7630 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/DefaultCompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java @@ -25,16 +25,16 @@ /** * The default compression codec that does no compression. */ -public class DefaultCompressionCodec implements CompressionCodec { +public class NoCompressionCodec implements CompressionCodec { - public static final DefaultCompressionCodec INSTANCE = new DefaultCompressionCodec(); + public static final NoCompressionCodec INSTANCE = new NoCompressionCodec(); public static final byte COMPRESSION_TYPE = -1; public static final ArrowBodyCompression DEFAULT_BODY_COMPRESSION = new ArrowBodyCompression(COMPRESSION_TYPE, BodyCompressionMethod.BUFFER); - private DefaultCompressionCodec() { + private NoCompressionCodec() { } @Override diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index 9a4a1d6a1eb..e38bff4e089 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -28,7 +28,7 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.DataSizeRoundingUtil; import org.apache.arrow.util.Preconditions; -import org.apache.arrow.vector.compression.DefaultCompressionCodec; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +61,7 @@ public class ArrowRecordBatch implements ArrowMessage { public ArrowRecordBatch( int length, List nodes, List buffers) { - this(length, nodes, buffers, DefaultCompressionCodec.DEFAULT_BODY_COMPRESSION, true); + this(length, nodes, buffers, NoCompressionCodec.DEFAULT_BODY_COMPRESSION, true); } public ArrowRecordBatch( diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index 68b42eeac83..64712a2741b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -34,6 +34,7 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.ReadChannel; import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.types.pojo.Schema; @@ -409,8 +410,9 @@ public static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB, buffers.add(vectorBuffer); } - ArrowBodyCompression bodyCompression = - new ArrowBodyCompression(recordBatchFB.compression().codec(), recordBatchFB.compression().method()); + ArrowBodyCompression bodyCompression = recordBatchFB.compression() == null ? + NoCompressionCodec.DEFAULT_BODY_COMPRESSION + : new ArrowBodyCompression(recordBatchFB.compression().codec(), recordBatchFB.compression().method()); if ((int) recordBatchFB.length() != recordBatchFB.length()) { throw new IOException("Cannot currently deserialize record batches with more than INT_MAX records."); From 55733524cb34d6ea51e2e57c755d81755bab65d5 Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Mon, 24 Aug 2020 14:39:34 +0800 Subject: [PATCH 7/8] ARROW-9010: [Java] Stop using no-compression decoder during IPC --- .../org/apache/arrow/flight/ArrowMessage.java | 9 ++++----- .../org/apache/arrow/vector/VectorLoader.java | 4 +++- .../arrow/vector/compression/CompressionUtil.java | 4 +--- .../vector/ipc/message/ArrowRecordBatch.java | 15 ++++++++------- .../vector/ipc/message/MessageSerializer.java | 4 +--- 5 files changed, 17 insertions(+), 19 deletions(-) diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java index 7f441202ea6..0cf0b77d79f 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java @@ -35,7 +35,6 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.util.Preconditions; -import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowBodyCompression; import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; @@ -131,7 +130,7 @@ public ArrowMessage(FlightDescriptor descriptor, Schema schema) { bufs = ImmutableList.of(); this.descriptor = descriptor; this.appMetadata = null; - this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION; + this.bodyCompression = null; } /** @@ -169,7 +168,7 @@ public ArrowMessage(ArrowBuf appMetadata) { this.bufs = ImmutableList.of(); this.descriptor = null; this.appMetadata = appMetadata; - this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION; + this.bodyCompression = null; } public ArrowMessage(FlightDescriptor descriptor) { @@ -177,7 +176,7 @@ public ArrowMessage(FlightDescriptor descriptor) { this.bufs = ImmutableList.of(); this.descriptor = descriptor; this.appMetadata = null; - this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION; + this.bodyCompression = null; } private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, ArrowBuf appMetadata, @@ -186,7 +185,7 @@ private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, this.descriptor = descriptor; this.appMetadata = appMetadata; this.bufs = buf == null ? ImmutableList.of() : ImmutableList.of(buf); - this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION; + this.bodyCompression = null; } public MessageMetadataResult asSchemaMessage() { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index 3a4b00de5ef..6bf3b334381 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -27,6 +27,7 @@ import org.apache.arrow.util.Collections2; import org.apache.arrow.vector.compression.CompressionCodec; import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Field; @@ -56,7 +57,8 @@ public VectorLoader(VectorSchemaRoot root) { public void load(ArrowRecordBatch recordBatch) { Iterator buffers = recordBatch.getBuffers().iterator(); Iterator nodes = recordBatch.getNodes().iterator(); - CompressionCodec codec = CompressionUtil.createCodec(recordBatch.getBodyCompression().getCodec()); + CompressionCodec codec = recordBatch.getBodyCompression() == null ? NoCompressionCodec.INSTANCE + : CompressionUtil.createCodec(recordBatch.getBodyCompression().getCodec()); for (FieldVector fieldVector : root.getFieldVectors()) { loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java index 464f3aa8e9c..247dab193ad 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java @@ -35,14 +35,12 @@ private CompressionUtil() { */ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) { switch (codec.getCodecName()) { - case "default": - return NoCompressionCodec.DEFAULT_BODY_COMPRESSION; case "LZ4_FRAME": return new ArrowBodyCompression(CompressionType.LZ4_FRAME, BodyCompressionMethod.BUFFER); case "ZSTD": return new ArrowBodyCompression(CompressionType.ZSTD, BodyCompressionMethod.BUFFER); default: - throw new IllegalArgumentException("Unknown codec: " + codec.getCodecName()); + return null; } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index e38bff4e089..6301b6b0e07 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -27,8 +27,6 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.DataSizeRoundingUtil; -import org.apache.arrow.util.Preconditions; -import org.apache.arrow.vector.compression.NoCompressionCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +59,7 @@ public class ArrowRecordBatch implements ArrowMessage { public ArrowRecordBatch( int length, List nodes, List buffers) { - this(length, nodes, buffers, NoCompressionCodec.DEFAULT_BODY_COMPRESSION, true); + this(length, nodes, buffers, null, true); } public ArrowRecordBatch( @@ -86,7 +84,6 @@ public ArrowRecordBatch( this.nodes = nodes; this.buffers = buffers; this.bodyCompression = bodyCompression; - Preconditions.checkArgument(bodyCompression != null, "body compression cannot be null"); List arrowBuffers = new ArrayList<>(buffers.size()); long offset = 0; for (ArrowBuf arrowBuf : buffers) { @@ -115,7 +112,6 @@ private ArrowRecordBatch( this.nodes = nodes; this.buffers = buffers; this.bodyCompression = bodyCompression; - Preconditions.checkArgument(bodyCompression != null, "body compression cannot be null"); this.closed = false; List arrowBuffers = new ArrayList<>(); long offset = 0; @@ -194,12 +190,17 @@ public int writeTo(FlatBufferBuilder builder) { int nodesOffset = FBSerializables.writeAllStructsToVector(builder, nodes); RecordBatch.startBuffersVector(builder, buffers.size()); int buffersOffset = FBSerializables.writeAllStructsToVector(builder, buffersLayout); - int compressOffset = bodyCompression.writeTo(builder); + int compressOffset = 0; + if (bodyCompression != null) { + compressOffset = bodyCompression.writeTo(builder); + } RecordBatch.startRecordBatch(builder); RecordBatch.addLength(builder, length); RecordBatch.addNodes(builder, nodesOffset); RecordBatch.addBuffers(builder, buffersOffset); - RecordBatch.addCompression(builder, compressOffset); + if (bodyCompression != null) { + RecordBatch.addCompression(builder, compressOffset); + } return RecordBatch.endRecordBatch(builder); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index 64712a2741b..c09ec81fa6c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -34,7 +34,6 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.Preconditions; -import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.ReadChannel; import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.types.pojo.Schema; @@ -410,8 +409,7 @@ public static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB, buffers.add(vectorBuffer); } - ArrowBodyCompression bodyCompression = recordBatchFB.compression() == null ? - NoCompressionCodec.DEFAULT_BODY_COMPRESSION + ArrowBodyCompression bodyCompression = recordBatchFB.compression() == null ? null : new ArrowBodyCompression(recordBatchFB.compression().codec(), recordBatchFB.compression().method()); if ((int) recordBatchFB.length() != recordBatchFB.length()) { From a1a08003feb267e84aa5319955ea97352f57af91 Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Fri, 28 Aug 2020 16:46:17 +0800 Subject: [PATCH 8/8] ARROW-9010: [Java] Continue to use no-compression decoder outside IPC --- .../java/org/apache/arrow/flight/ArrowMessage.java | 9 +++++---- .../java/org/apache/arrow/vector/VectorLoader.java | 4 +--- .../arrow/vector/compression/CompressionUtil.java | 4 +++- .../arrow/vector/ipc/message/ArrowRecordBatch.java | 10 +++++++--- .../arrow/vector/ipc/message/MessageSerializer.java | 4 +++- 5 files changed, 19 insertions(+), 12 deletions(-) diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java index 0cf0b77d79f..7f441202ea6 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java @@ -35,6 +35,7 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowBodyCompression; import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; @@ -130,7 +131,7 @@ public ArrowMessage(FlightDescriptor descriptor, Schema schema) { bufs = ImmutableList.of(); this.descriptor = descriptor; this.appMetadata = null; - this.bodyCompression = null; + this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION; } /** @@ -168,7 +169,7 @@ public ArrowMessage(ArrowBuf appMetadata) { this.bufs = ImmutableList.of(); this.descriptor = null; this.appMetadata = appMetadata; - this.bodyCompression = null; + this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION; } public ArrowMessage(FlightDescriptor descriptor) { @@ -176,7 +177,7 @@ public ArrowMessage(FlightDescriptor descriptor) { this.bufs = ImmutableList.of(); this.descriptor = descriptor; this.appMetadata = null; - this.bodyCompression = null; + this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION; } private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, ArrowBuf appMetadata, @@ -185,7 +186,7 @@ private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, this.descriptor = descriptor; this.appMetadata = appMetadata; this.bufs = buf == null ? ImmutableList.of() : ImmutableList.of(buf); - this.bodyCompression = null; + this.bodyCompression = NoCompressionCodec.DEFAULT_BODY_COMPRESSION; } public MessageMetadataResult asSchemaMessage() { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index 6bf3b334381..3a4b00de5ef 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -27,7 +27,6 @@ import org.apache.arrow.util.Collections2; import org.apache.arrow.vector.compression.CompressionCodec; import org.apache.arrow.vector.compression.CompressionUtil; -import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Field; @@ -57,8 +56,7 @@ public VectorLoader(VectorSchemaRoot root) { public void load(ArrowRecordBatch recordBatch) { Iterator buffers = recordBatch.getBuffers().iterator(); Iterator nodes = recordBatch.getNodes().iterator(); - CompressionCodec codec = recordBatch.getBodyCompression() == null ? NoCompressionCodec.INSTANCE - : CompressionUtil.createCodec(recordBatch.getBodyCompression().getCodec()); + CompressionCodec codec = CompressionUtil.createCodec(recordBatch.getBodyCompression().getCodec()); for (FieldVector fieldVector : root.getFieldVectors()) { loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java index 247dab193ad..464f3aa8e9c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java @@ -35,12 +35,14 @@ private CompressionUtil() { */ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) { switch (codec.getCodecName()) { + case "default": + return NoCompressionCodec.DEFAULT_BODY_COMPRESSION; case "LZ4_FRAME": return new ArrowBodyCompression(CompressionType.LZ4_FRAME, BodyCompressionMethod.BUFFER); case "ZSTD": return new ArrowBodyCompression(CompressionType.ZSTD, BodyCompressionMethod.BUFFER); default: - return null; + throw new IllegalArgumentException("Unknown codec: " + codec.getCodecName()); } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index 6301b6b0e07..5abfb338e46 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -27,6 +27,8 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.DataSizeRoundingUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +61,7 @@ public class ArrowRecordBatch implements ArrowMessage { public ArrowRecordBatch( int length, List nodes, List buffers) { - this(length, nodes, buffers, null, true); + this(length, nodes, buffers, NoCompressionCodec.DEFAULT_BODY_COMPRESSION, true); } public ArrowRecordBatch( @@ -83,6 +85,7 @@ public ArrowRecordBatch( this.length = length; this.nodes = nodes; this.buffers = buffers; + Preconditions.checkArgument(bodyCompression != null, "body compression cannot be null"); this.bodyCompression = bodyCompression; List arrowBuffers = new ArrayList<>(buffers.size()); long offset = 0; @@ -111,6 +114,7 @@ private ArrowRecordBatch( this.length = length; this.nodes = nodes; this.buffers = buffers; + Preconditions.checkArgument(bodyCompression != null, "body compression cannot be null"); this.bodyCompression = bodyCompression; this.closed = false; List arrowBuffers = new ArrayList<>(); @@ -191,14 +195,14 @@ public int writeTo(FlatBufferBuilder builder) { RecordBatch.startBuffersVector(builder, buffers.size()); int buffersOffset = FBSerializables.writeAllStructsToVector(builder, buffersLayout); int compressOffset = 0; - if (bodyCompression != null) { + if (bodyCompression != null && bodyCompression != NoCompressionCodec.DEFAULT_BODY_COMPRESSION) { compressOffset = bodyCompression.writeTo(builder); } RecordBatch.startRecordBatch(builder); RecordBatch.addLength(builder, length); RecordBatch.addNodes(builder, nodesOffset); RecordBatch.addBuffers(builder, buffersOffset); - if (bodyCompression != null) { + if (bodyCompression != null && bodyCompression != NoCompressionCodec.DEFAULT_BODY_COMPRESSION) { RecordBatch.addCompression(builder, compressOffset); } return RecordBatch.endRecordBatch(builder); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index c09ec81fa6c..64712a2741b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -34,6 +34,7 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.ReadChannel; import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.types.pojo.Schema; @@ -409,7 +410,8 @@ public static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB, buffers.add(vectorBuffer); } - ArrowBodyCompression bodyCompression = recordBatchFB.compression() == null ? null + ArrowBodyCompression bodyCompression = recordBatchFB.compression() == null ? + NoCompressionCodec.DEFAULT_BODY_COMPRESSION : new ArrowBodyCompression(recordBatchFB.compression().codec(), recordBatchFB.compression().method()); if ((int) recordBatchFB.length() != recordBatchFB.length()) {