ARROW-9010: [Java] Framework and interface changes for RecordBatch IPC buffer compression#7326
ARROW-9010: [Java] Framework and interface changes for RecordBatch IPC buffer compression#7326liyafan82 wants to merge 8 commits into
Conversation
…C buffer compression
rymurr
left a comment
There was a problem hiding this comment.
Looks good, few minor comments. It would be good to see this in a test (if that is reasonable/possible) w/ a NoOp compressor or something as its hard for me to envisoin how the compression actually will function.
Also, excuse my ignorance of the compression scheme but is there any negotiation of the compresson scheme or an opt-out for older clients? A client recieving a compressed buffer without taking it into account will likely crash hard.
| return null; | ||
| } | ||
| for (int i = 0; i < CompressionType.names.length; i++) { | ||
| if (CompressionType.names[i].equals(codec.getCodecName())) { |
There was a problem hiding this comment.
I feel like a switch statement and/or enum (similar to Type/ArrowType conversions would be better here than a tight loop of string comparisons. Not a huge deal as its a small array but I think its a bit easier to read.
There was a problem hiding this comment.
Thanks for your comment. I agree with you.
The problem is that class org.apache.arrow.flatbuf.CompressionType is automatically generated by flatbuf, and it is not implemented by an enum. Instead, it has separate fields for ordinals and names. In addition, it only provides function to convert ordinals to names, but not vice versa.
There was a problem hiding this comment.
Agreed, I thought a pattern similar to Types.java (and an added enum) or a switch statement on the byte value might be more clear. Not a blocker for me though
There was a problem hiding this comment.
Thanks for the suggestion. I have revised the code to implement it through a switch statement. Please check if it looks good.
It looks clearer. However, we may need to change the code whenever we support a new compression method.
| * @param input the input buffer to be estimated. | ||
| * @return the estimated size of the compressed data. | ||
| */ | ||
| long estimateCompressedSize(ArrowBuf input); |
There was a problem hiding this comment.
In the vector loader/unloader the estimated size is only needed by the compress/decompress methods. Is there a further use case where this would need to be externalised? Would simply compress/decompress be ok?
There was a problem hiding this comment.
Simply compress/decompress would be OK, of course.
The estimateCompressedSize/estimateDecompressedSize APIs are mainly provided for performance benefits. In particular, if we can estimate the sizes with accuracy, we will
- Avoid allocating memory spaces larger than necessary.
- Avoid reallocating memory space during compression/decompression
- Avoid data copy caused by reallocations.
Due to the above benefits, I would prefer to add the APIs to the interface, but I am open to others' opinons and willing to change my mind if necessary.
There was a problem hiding this comment.
Maybe you mean we should remove the estimateCompressedSize/estimateDecompressedSize APIs from the CompressionCodec interface, so that the sizes will be estimated internally when calling the compress/decompress APIs.
The problem is that for some scenarios, the size should be enforced from the outside, rather than being computed internally. For example, in the decompression phase, if we know the buffer type (e.g. validity buffer) and the vector length, we can estimate the decompressed buffer size with absolute certainty.
There was a problem hiding this comment.
Yeah my thinking was that estimation could be done internally. I am not sure where the estimated value is useful externally. I think that the compression codec will be allocating its own buffers so should know exact/estimated sizes and be able to enforce/validate those numbers. In what scenario does the external world need to be informed of sizes?
There was a problem hiding this comment.
Sounds reasonable. Thank you.
I have revised the code to simplify the interface. Maybe we can add the optimization in the future, if we belive it is necessary.
| public static long writeCompressionBody( | ||
| WriteChannel out, ArrowBodyCompression bodyCompression) throws IOException { | ||
| long bufferStart = out.getCurrentPosition(); | ||
| ByteBuffer buf = ByteBuffer.allocate(2); |
There was a problem hiding this comment.
I wonder if rather than allocating a new ByteBuffer it might be easier to just use a byte[] and the write(byte[]) method. Potentially you could then use a byte[] to store the data in ArrowBodyCompression so that you don't have to allocate anything while writing the message.
There was a problem hiding this comment.
I have revised the implementation of ArrowBodyCompression so that it is based on byte[]. Thanks a lot for the good suggestion.
|
Thanks a lot for your good comments. Please see my reply in line.
Sounds good. In fact, I have added some tests for NoOp compressor when preparing the PR. However, the changes are reverted before submitting the PR, because it would involve changes to the Message.fbs file. IMO, we should be careful when changing this file, as it involves protocol changes, and may break the code for other languages.
I agree with you that compression scheme negotiation is a useful and widely used feature. Maybe we will support it in the futrue, but not this this PR. According to the discussion in the ML, this issue only deals with some standard compression schemes, and these schemes are encoded in the protocol and should be supported universally. |
|
Hi, I have a question... probably not related to what this PR focus on. |
@rongma1997 I am not sure, as some implementation issues are not determined yet. It seems there are some Java libraries for basic compression algorithms (e.g. zstd-jni and lz4-java), and we may prefer them. If feel strong about the RecordBatch level JNI bridge, maybe you can open an issue/discussion for it. |
| * @param input the buffer to compress. | ||
| * @return the compressed buffer. | ||
| */ | ||
| ArrowBuf compress(ArrowBuf input); |
There was a problem hiding this comment.
Should allocator be part of this interface?
There was a problem hiding this comment.
Sounds reasonable. I have revised the code to add allocator to the interface.
| /** | ||
| * Utilities for data compression/decompression. | ||
| */ | ||
| public class CompressionUtility { |
There was a problem hiding this comment.
I don't think Utility matches naming conventions that is used elsewhere in the code base for util classes?
There was a problem hiding this comment.
Agreed. I have changed the name to CompressionUtil.
| size = DataSizeRoundingUtil.roundUpTo8Multiple(size); | ||
| } | ||
|
|
||
| if (bodyCompression != null) { |
There was a problem hiding this comment.
Did you consider making bodyCompression never null (have a static instance that represents zero compression) and making a instance method on it like 'updateMetadataSize(long size)` instead of putting this check here? Also I'm not familiar with this code but its strange to have to perform this calculation outside of serializing flatbuffers? Is there something analogous in the C++ code?
There was a problem hiding this comment.
Sounds reasonable. @rymurr had a similar suggestion.
To make the bodyCompression non-null, we need a default compression codec that makes no compression/decompression. However, our specification does not support such an option now (Please see https://github.com/apache/arrow/blob/master/format/Message.fbs#L45-L53). Providing one would make the implementation not aligning with the specification.
However, we have revised the PR to provide one, because it seems the C++ implementation is not aligning with the specification either (pls see https://github.com/apache/arrow/blob/master/cpp/src/arrow/util/compression.h#L33)
After that, we can guarantee that the bodyCompression is never null, and some other problems also go away.
| long compLength = 0L; | ||
| if (batch.getBodyCompression() != null) { | ||
| compLength = writeCompressionBody(out, batch.getBodyCompression()); | ||
| Preconditions.checkArgument(compLength == ArrowBodyCompression.BODY_COMPRESSION_LENGTH, |
There was a problem hiding this comment.
Could you add a comment here on why we need to verify this value here?. Also, it looks like align() is called in this method, so couldn't that cause these values to be unequal
There was a problem hiding this comment.
You are right. This check is removed. Thanks.
| if (batch.getBodyCompression() != null) { | ||
| compLength = writeCompressionBody(out, batch.getBodyCompression()); | ||
| Preconditions.checkArgument(compLength == ArrowBodyCompression.BODY_COMPRESSION_LENGTH, | ||
| "deserialized compression body length not equal to ArrowBodyCompression#BODY_COMPRESSION_LENGTH"); |
There was a problem hiding this comment.
included sizes here would be useful.
There was a problem hiding this comment.
This check has been removed too. Thanks.
| if (curOffset % 8 != 0) { | ||
| curOffset += 8 - curOffset % 8; | ||
| } | ||
| ArrowBodyCompression bodyCompression = null; |
There was a problem hiding this comment.
It looks like this is manually parsing the buffer stream? I would have expected to directly reference A body compression object directly
There was a problem hiding this comment.
In the revised PR, we no longer parse the buffer stream. Thanks for the suggestion.
| * @param input the buffer to compress. | ||
| * @return the compressed buffer. | ||
| */ | ||
| ArrowBuf compress(ArrowBuf input); |
There was a problem hiding this comment.
would uncompressedBuffer be a better name for the parameter? Similar question for the one below?
There was a problem hiding this comment.
Sure. The suggested name is better. Thank you.
| // add compression info, if any | ||
| if (bodyCompression != null) { | ||
| ArrowBuf compBuf = allocator.buffer(ArrowBodyCompression.BODY_COMPRESSION_LENGTH); | ||
| compBuf.setByte(0, bodyCompression.getCodec()); |
There was a problem hiding this comment.
it looks like this is manually serializing the table, shouldn't this be delegated to flatbuffers?
There was a problem hiding this comment.
In the reivsed PR, we no longer manually serilize/deserialize the table. Thanks.
| */ | ||
| public static final long BODY_COMPRESSION_LENGTH = 2L; | ||
|
|
||
| final byte[] data = new byte[(int) BODY_COMPRESSION_LENGTH]; |
There was a problem hiding this comment.
Is there a reason to not have separately named variables here?
There was a problem hiding this comment.
We have restored to separate named variables in the revised PR.
| /** | ||
| * Length of the serialized object. | ||
| */ | ||
| public static final long BODY_COMPRESSION_LENGTH = 2L; |
There was a problem hiding this comment.
this seems like it would break if a field as ever added to the table?
There was a problem hiding this comment.
Sounds reasonable. We have removed it in the revised PR.
emkornfield
left a comment
There was a problem hiding this comment.
At the very least I think this needs more comments as to why it appears that manual serializion of flatbuffer data is being used.
It would also be nice to avoid introducing (if (x == null)) checks if at all possible, instead having default implementations would be better (unless there is a real performance reason for doing this). Avoid writing out compression settings when there are none seems like it could handled in the reverse direction (by checking for things that are non compressed and avoiding writing it out.
I think parquet is considering moving to Airlift which doesn't use JNI. We should discuss specific library choices on the mailing list. Also it might be nice but not a hard requirement to follow an SPI pattern (similar to what we do for allocators) to avoid a hard dependency on any particular allocator. |
@emkornfield Thanks a lot for your valuable feedback. |
332f62d to
a842ce6
Compare
| /** | ||
| * The default compression codec that does no compression. | ||
| */ | ||
| public class DefaultCompressionCodec implements CompressionCodec { |
There was a problem hiding this comment.
nit: I think NoCompressionCodec or something similar to indicate that no compression/decompression happens might this easier to understand for future readers.
There was a problem hiding this comment.
Agreed and accepted. NoCompressionCodec is a better name.
| } | ||
|
|
||
| @Override | ||
| public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) { |
There was a problem hiding this comment.
we should clarify in the contract of the interface that implementations take ownership of unCompressedBuffer (i.e. I think in order for this to work properly for real codecs is to free uncompressedBuffer here). Does that make sense or were you imagining something else?
There was a problem hiding this comment.
I agree with your point. This makes it easier to incorporate the codec into the framework.
Added comments in the JavaDoc.
| } | ||
|
|
||
| ArrowBodyCompression bodyCompression = | ||
| new ArrowBodyCompression(recordBatchFB.compression().codec(), recordBatchFB.compression().method()); |
There was a problem hiding this comment.
does compression need to be checked for nullness?
There was a problem hiding this comment.
According to our current implementation. The compression object cannot be null.
For the sake of safety, we added check here.
| case "default": | ||
| return DefaultCompressionCodec.DEFAULT_BODY_COMPRESSION; | ||
| case "LZ4_FRAME": | ||
| return new ArrowBodyCompression((byte) 0, BodyCompressionMethod.BUFFER); |
There was a problem hiding this comment.
these isn't an enum of 0 and 1 below?
There was a problem hiding this comment.
Changed to CompressionType.LZ4_FRAME and CompressionType.ZSTD, respectively. Thanks for your kind reminder.
| vector.getField(), vector.getClass().getSimpleName(), fieldBuffers)); | ||
| } | ||
|
|
||
| fieldBuffers = fieldBuffers.stream().map( |
There was a problem hiding this comment.
small nit: if we aren't using streams elsewhere we should avoid using them here as well. they can add some unncessary overhead for serialization/deserialization path.
There was a problem hiding this comment.
Agreed. I have changed it to a for loop.
|
The conda integration test is failing, because the default no compression option is not supported by the specification. Maybe we need to start a discussion in the ML. |
I think we should handle this in the implementation. If default codec is pass through the writer (or at the appropriate level), we shouldn't populate the flatbuffer with it. |
@emkornfield Sounds reasonable. Thanks for your feedback. I will revise the PR accordingly. |
7daf2e5 to
e0b620c
Compare
e0b620c to
5573352
Compare
@emkornfield Thanks a lot for your feedback. I have revised the code accordingly. Sorry for my late response. |
| int buffersOffset = FBSerializables.writeAllStructsToVector(builder, buffersLayout); | ||
| int compressOffset = bodyCompression.writeTo(builder); | ||
| int compressOffset = 0; | ||
| if (bodyCompression != null) { |
There was a problem hiding this comment.
I would think the only change really necessary here would have been to compare bodyCompression == NO_COMPRESSION_CODEC that way you can keep things non-null everyplace else within the code?
There was a problem hiding this comment.
@emkornfield Thanks for your good idea. I have updated the code accordingly.
|
thanks @liyafan82 I added a comment I think the change puts back more places then necessary to check for null? I might be missing something though. |
| * Implementation of this method should take care of releasing this buffer. | ||
| * @return the compressed buffer. | ||
| */ | ||
| ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer); |
There was a problem hiding this comment.
In thinking through this interface, something occurred to me. Given the way the current Message serialization code works is it forces a copy from an underlying channel. Given this, it seem like we would now have two copy like operations:
- Copy from channel to compressed buffer
- Uncompress the buffer when loading VectorSchemaRoot.
I was wondering what you thought about deferring compression until writing out the batch and doing decompression eagerly when reading in the batch to avoid one of these copies. I think one thing that would potentially help us decide on the course to take would be look at the possible libraries we would use for compression/decompression and whether they support off-heap, InputStream or some other interface for accomplishing the compression
@liyafan82 have you done more research in this area?
@rymurr do you have anything thoughts on this?
There was a problem hiding this comment.
@emkornfield Thank you for starting this discussion and sharing your good ideas.
Your reasoning makes sense to me.
I guess I was looking at the problem from a different perspective.
IMO, the bottleneck of a compressing codec is the CPU resource, and the main purpose of compressing is to reduce memory/network bandwidth consumption.
Given the above assumptions, we should try to do the compression as early as possible. The earliest possible place should be in the getFieldBuffers method. In this PR, we do it in VectorUnLoader, which is not the best, but close enough to the best. Similarly, we should try to do the decompression as late as possible. In this PR, we do it in VectorLoader, which is close to the optimal.
Admittedly, we have additional copies after introducing the compression framework. However, both additional copies are based on the compressed data, with reduced data size, so the overhead should be small.
The above reasoning is based on the assumption that the compression codec could effectively reduce the data size, which is not always true in practice. So I think we can make the decision based on the specific compression codec, and real benchmark data?
There was a problem hiding this comment.
@liyafan82 good points. I think the other use-case I didn't consider is Flight which I think will already be in off-heap memory.
In terms of copies, the should be smaller in general, but it would be good to to understand if there are any limitations. It looks like at least airlift supports off-heap decompression, so I think this API is probably OK from that perspective.
@rymurr did you have any other comments otherwise I can take another look and then I think we can merge.
There was a problem hiding this comment.
@emkornfield Do you think we can merge this now?
There was a problem hiding this comment.
yes, I think so. i'll merge
…C buffer compression This is the first sub-work item of ARROW-8672 ( [Java] Implement RecordBatch IPC buffer compression from ARROW-300). However, it does not involve any concrete compression algorithms. The purpose of this PR is to establish basic interfaces for data compression, and make changes to the IPC framework so that different compression algorithms can be plug-in smoothly. Closes apache#7326 from liyafan82/fly_0514_comp Authored-by: liyafan82 <fan_li_ya@foxmail.com> Signed-off-by: Micah Kornfield <emkornfield@gmail.com>
This is the first sub-work item of ARROW-8672 (
[Java] Implement RecordBatch IPC buffer compression from ARROW-300). However, it does not involve any concrete compression algorithms. The purpose of this PR is to establish basic interfaces for data compression, and make changes to the IPC framework so that different compression algorithms can be plug-in smoothly.