ARROW-17735: [C++][Parquet] Optimize parquet reading for String/Binary type#14353
ARROW-17735: [C++][Parquet] Optimize parquet reading for String/Binary type#14353zhixingheyi-tian wants to merge 21 commits into
Conversation
|
|
|
Hi @zhixingheyi-tian, sorry this hasn't gotten reviewer attention for a while. Are you still interested in working on this? Would you mind rebasing and cleaning up any commented code? In addition, I noticed there aren't any benchmarks for binary or string types in either of these two benchmark files:
We'll want some benchmarks in order to evaluate the changes. If you add those, then you can compare benchmarks locally like so: BUILD_DIR=cpp/path/to/build
archery benchmark run $BUILD_DIR \
--suite-filter=parquet-arrow-reader-writer-benchmark \
--output=contender.json
git checkout master # Or some ref that has benchmarks but not your changes
archery benchmark run $BUILD_DIR \
--suite-filter=parquet-arrow-reader-writer-benchmark \
--output=baseline.json
archery benchmark diff contender.json baseline.json |
|
@wjones127 , Thanks! |
|
Hi @pitrou , Recently,have fixed remaining failed 7 UTs, and this patch is ready for reviw. Thanks! |
|
HI @jorisvandenbossche @wjones127 @pitrou @iajoiner This PR is ready to review, please have a look. |
|
New failed UT "RecordReaderByteArrayTest.SkipByteArray" , came from newly commit #14142. This UT used the GetBuilderChunks() interface : https://github.com/apache/arrow/blame/6cfe24633b9fe3c474137571940eca35d7a475dc/cpp/src/parquet/column_reader_test.cc#L1181-L1185 And this performance PR is avoiding this interface. So it failed. Any suggestions to fix this UT? Or change this UT? Thanks! |
Hi @wjones127 But how to add Binary/String benchmark in *benchmark.cc? In my local server end-2-end performance testing: CPU: Intel(R) Xeon(R) Platinum 8268 CPU @ 2.90GHz
|
Hi @pitrou , |
|
HI @cyb70289 @jorisvandenbossche |
|
Would you fix the CI failures? |
I am in fixing errors in github CI. |
Don't know the details. But it should be easy to find in the source code. |
|
Instead of defining entire separate classes for this, why not change template <>
struct EncodingTraits<ByteArrayType> {
// ...
struct Accumulator {
std::unique_ptr<::arrow::Int32Builder> offsets_builder;
std::unique_ptr<::arrow::BufferBuilder> data_builder;
std::vector<std::shared_ptr<::arrow::Array>> chunks;
}; |
New commits should fix all UTs. Please give a review! |
If use *Builder, may add extra data copy when accumulating element. arrow/cpp/src/parquet/encoding.cc Lines 1062 to 1070 in 5ce8d79 Thanks! |
cyb70289
left a comment
There was a problem hiding this comment.
Thanks @zhixingheyi-tian. Look it's a useful improvement.
Please see some comments from me.
Besides, did you compare benchmarks (use microbenchmarks in arrow source) of various cases (no nulls, with nulls, different data types, etc.) against current code? If some microbenchmarks are missing, will you add them? It is necessary to evaluate this PR.
| /// \brief Pre-allocate space for data. Results in better flat read performance | ||
| virtual void Reserve(int64_t num_values) = 0; | ||
|
|
||
| virtual void ReserveValues(int64_t capacity) {} |
There was a problem hiding this comment.
A new interface? Are these added data members and functions necessary for this base class? I suppose they are only for the new reader implementation.
There was a problem hiding this comment.
Previously. it's TypedRecordReader internal interface,
arrow/cpp/src/parquet/column_reader.cc
Lines 1698 to 1720 in f82501e
And ByteArrayChunkedOptRecordReader extends from TypedRecordReader, so extract it as public interface.
There was a problem hiding this comment.
Isn't it just a helper function specific to implementation?
There was a problem hiding this comment.
Since these are coming from TypeRecordReader, which is private, could you mark it's methods as virtual instead?
| virtual void ReserveValues(int64_t capacity) {} |
There was a problem hiding this comment.
Is it better to make it pure virtual? In addition, it helps to add a comment for public function.
| current_encoding_ = encoding; | ||
| current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer, | ||
| static_cast<int>(data_size)); | ||
| if (!hasSet_uses_opt_) { |
There was a problem hiding this comment.
Stick to snake_case for variables.
There was a problem hiding this comment.
Why different renaming ways for variables?
Thank!
There was a problem hiding this comment.
Don't mix camel and snake case.
has_set_uses_opt
| const auto last_offset = offsetArr[values_written_]; | ||
| int64_t binary_length = last_offset - first_offset; | ||
| binary_per_row_length_ = binary_length / values_written_ + 1; | ||
| // std::cout << "binary_per_row_length_:" << binary_per_row_length_ << std::endl; |
There was a problem hiding this comment.
Please remove debug code.
There was a problem hiding this comment.
Looks there are other debug code not removed. Do remove all of them.
// RETURN_NOT_OK(IndexInBounds(idx));
| bool hasSet_uses_opt_ = false; | ||
| bool uses_opt_ = false; |
There was a problem hiding this comment.
Are these two flags really necessary?
Looks to me a trivial helper function is better.
There was a problem hiding this comment.
These two flags is for
if (current_encoding_ == Encoding::PLAIN_DICTIONARY || current_encoding_ == Encoding::PLAIN || current_encoding_ == Encoding::RLE_DICTIONARY)
Just avoid comparing every time.
There was a problem hiding this comment.
Try to make it simple. I don't think there's any performance consideration here.
E.g., define a helper function
bool UsesOpt() const { return (current_encoding == xxx || ....; }
There was a problem hiding this comment.
Could you create a separate function for that, as Yibo suggested? If you do measure a meaningful performance difference, could you share your results then?
In addition, could you add a comment explaining why the optimization is only applicable to those those three encodings?
There was a problem hiding this comment.
Hmm, also please find a more descriptive name than "uses optimization". (which optimization?)
| if (current_encoding_ == Encoding::PLAIN_DICTIONARY || | ||
| current_encoding_ == Encoding::PLAIN || | ||
| current_encoding_ == Encoding::RLE_DICTIONARY) { |
There was a problem hiding this comment.
Are all these cases covered by UT?
There was a problem hiding this comment.
Yes,
This patch is from one customer's case. They want to boost String scan performance.
So this patch is just for parquet general encodings: PLAIN , RLE_DICTIONARY, PLAIN_DICTIONARY.
Other encodings will skip this optimization.
Existed UTs will cover all encodings cases.
There was a problem hiding this comment.
Just curious, why other encodings are not supported?
| } else { | ||
| ::arrow::ArrayVector result = accumulator_.chunks; | ||
| if (result.size() == 0 || accumulator_.builder->length() > 0) { | ||
| std::shared_ptr<::arrow::Array> last_chunk; | ||
| PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk)); | ||
| result.push_back(std::move(last_chunk)); | ||
| } | ||
| accumulator_.chunks = {}; | ||
| return result; |
There was a problem hiding this comment.
Duplicates ByteArrayChunkedRecordReader? This doesn't look right.
There was a problem hiding this comment.
The optimized RecordReader implementation is ByteArrayChunkedOptRecordReader
And is just for Binary/String/LargeBinary/LargeString types.
There was a problem hiding this comment.
This probably means it's not good to create a new class for this optimized reader.
That said, I don't have deep knowlege of parquet code and lack bandwidth recently to investigate, someone else might comment.
There was a problem hiding this comment.
I agree, would shouldn't create another class. I don't see any reason this can't be used for the Decimal case.
There was a problem hiding this comment.
I've found locally that if I merge the implementations, the unit tests pass. Could you please merge them in the PR?
There was a problem hiding this comment.
@wjones127
Do you merge the two class: ByteArrayChunkedRecordReader and ByteArrayChunkedOptRecordReader, , all unit tests passed?
There was a problem hiding this comment.
Yes, those are the two classes I merged.
There was a problem hiding this comment.
Here is the changeset that allows the parquet-arrow-test and parquet-arrow-internals-test to pass. zhixingheyi-tian#2
Could you incorporate those changes?
There was a problem hiding this comment.
+1 for not adding a separate class. This would be difficult to maintain if more optimization will be added. It would be better if an option can be added so that user can manually turn it off when something goes wrong with the new feature.
| int32_t* offset, | ||
| std::shared_ptr<::arrow::ResizableBuffer>& values, | ||
| int64_t valid_bits_offset, int32_t* bianry_length) { | ||
| return 0; |
There was a problem hiding this comment.
I saw many dummy implementations like this in the PR. Probably they can be eliminated.
There was a problem hiding this comment.
Just follow previous DecodeArrow() function call stack usages.
There was a problem hiding this comment.
My gut feeling is still that they are not necessary. Though it might be wrong.
There was a problem hiding this comment.
It seems like we are adding this because the other methods are based on builders (in the accumulator), and builder don't provide a way to transfer multiple values in one memcpy. Does that sound right?
I wonder if we could add such a method on builders, and that might be a cleaner solution. Something like:
class BaseBinaryBuilder {
...
Status UnsafeAppendValues(const uint8_t* values, int64_t length, const uint8_t* valid_bytes = NULL_PTR) { ... }
};The reason getting back to builders might be good is that I don't think these changes handle the case where there are more values than can fit into a StringArray. The current implementation will split it up into chunks if it reaches capacity, and I think we need to keep that behavior.
There was a problem hiding this comment.
Also, I'll say it again, but another possibility is to change the definition of EncodingTraits<ByteArrayType>::Accumulator from:
template <>
struct EncodingTraits<ByteArrayType> {
using Encoder = ByteArrayEncoder;
using Decoder = ByteArrayDecoder;
/// \brief Internal helper class for decoding BYTE_ARRAY data where we can
/// overflow the capacity of a single arrow::BinaryArray
struct Accumulator {
std::unique_ptr<::arrow::BinaryBuilder> builder;
std::vector<std::shared_ptr<::arrow::Array>> chunks;
};
using ArrowType = ::arrow::BinaryType;
using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::BinaryType>;
};to:
template <>
struct EncodingTraits<ByteArrayType> {
using Encoder = ByteArrayEncoder;
using Decoder = ByteArrayDecoder;
/// \brief Internal helper class for decoding BYTE_ARRAY data where we can
/// overflow the capacity of a single arrow::Int32Array
struct Accumulator {
std::unique_ptr<::arrow::Int32Builder> offsets_builder;
std::unique_ptr<::arrow::BufferBuilder> data_builder;
std::vector<std::shared_ptr<::arrow::Array>> chunks;
};
using ArrowType = ::arrow::BinaryType;
using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::BinaryType>;
};Either this or @wjones127 's suggestion would be better than adding a specialized method, IMHO.
There was a problem hiding this comment.
Thanks Antoine. I guess I'm just catching to up your understanding 😄
There was a problem hiding this comment.
When I was at previous employer, we have implemented mutable arrow::Array to address similar issue of ORC reader. The idea is that we can know in advance the total length of all string/binary values in a single batch. Therefore we can pre-allocate the data buffer at once or even reuse previous buffer if it has enough capacity. The overhead of buffer allocation and resize operation are non-negligible.
@pitrou We have discussed the idea in https://issues.apache.org/jira/browse/ARROW-15289
There was a problem hiding this comment.
@wgtmac I'm not sure if you are objecting to my proposal above. A separate BufferBuilder for the string data allows to presize for the computed total length, so it should address your concern.
There was a problem hiding this comment.
@wgtmac I'm not sure if you are objecting to my proposal above. A separate
BufferBuilderfor the string data allows to presize for the computed total length, so it should address your concern.
Sorry I didn't make it clear. I'm not objecting your proposal. Instead I just summarized another optimization we have done before if reusing arrow::RecordBatch on the same reader is possible. Internal experiment reveals that repeated buffer allocation and resize operation are non-negligible overhead, especially for wide columns (e.g. 1000+ columns). @pitrou
| int32_t indices[kBufferSize]; | ||
| auto dst_value = values->mutable_data() + (*bianry_length); | ||
|
|
||
| ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); |
There was a problem hiding this comment.
Do we have benchmark result of this bitmap reader based implementation?
Is it better than original code (bitblock counter)?
Hi @cyb70289 , Good idea to show performance. But how to add Binary/String benchmark in *benchmark.cc? |
There are many examples you can reference. Please note it can be a separate PR to add new benchmarks, which can be reviewed and merged before this one. |
|
@cyb70289 @wjones127 |
|
Sorry to hear you were sick @zhixingheyi-tian. I'll soon have a benchmark merged (#15100), and that will give us relevant results in Conbench that will help the evaluate these changes. |
Thanks! |
|
@ursabot please benchmark command=cpp-micro --suite-filter=parquet-arrow-reader-writer-benchmark |
|
Benchmark runs are scheduled for baseline = f82501e and contender = 19e2ba3. Results will be available as each benchmark for each run completes. |
|
The failures in the continuous-integration/appveyor/pr seem relevant. This test is failing: arrow/python/pyarrow/tests/parquet/test_basic.py Lines 715 to 742 in 85b167c Perhaps we need to make sure that if there are null values that we don't copy those parts of the buffer? |
| // Helper data structure for accumulating builder chunks | ||
| typename EncodingTraits<ByteArrayType>::Accumulator accumulator_; | ||
|
|
||
| int32_t bianry_length_ = 0; |
There was a problem hiding this comment.
It's a prexisting issue, but could you fix this variable name?
| int32_t bianry_length_ = 0; | |
| int32_t binary_length_ = 0; |
| // 16 KB is the default expected page header size | ||
| static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024; | ||
|
|
||
| static constexpr int32_t kDefaultBinaryPerRowSzie = 20; |
There was a problem hiding this comment.
| static constexpr int32_t kDefaultBinaryPerRowSzie = 20; | |
| static constexpr int32_t kDefaultBinaryPerRowSize = 20; |
| void ResetValues() { | ||
| if (values_written_ > 0) { | ||
| // Resize to 0, but do not shrink to fit | ||
| PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); |
There was a problem hiding this comment.
I ran the unit tests (parquet-arrow-test) with a debugger, and found this branch was never hit. Does that seem right? Could you add a test that validates this branch?
There was a problem hiding this comment.
Just follow here:
arrow/cpp/src/parquet/column_reader.cc
Lines 1846 to 1851 in fc53ff8
|
@zhixingheyi-tian Once you've fixed the issues mentioned, could you also rebase on the latest master branch? Once you do that we can run the benchmark. |
Have rebased. Performance can refer to #14353 (comment) |
I haven't touched pyarrow before. pyarrow underlayer is arrow-cpp, right? |
I looked at it a bit more, and I'm actually not sure. It seems to be verifying the offsets buffer is all zeros when the array is composed only of empty string and nulls, which I can't get to fail in C++. I can help look at that further once other feedback is addressed in this PR. |
wjones127
left a comment
There was a problem hiding this comment.
The performance is looking promising. I've looked through part of it, and have some suggestions for simplifying the code. Will look through the rest soon.
| bool hasCal_average_len_ = false; | ||
| int64_t binary_per_row_length_ = kDefaultBinaryPerRowSize; |
There was a problem hiding this comment.
First, I think this would be clearer as a std::optional, rather than a boolean on the side.
Second, please document the purpose of these fields in the header file.
| bool hasCal_average_len_ = false; | |
| int64_t binary_per_row_length_ = kDefaultBinaryPerRowSize; | |
| /// \brief Typical size of single binary value, used for pre-allocating value buffer. | |
| /// | |
| /// Before this is set, kDefaultBinaryPerRowSize is used. After the first | |
| /// batch of values, this is set to the size of the values buffer divided by | |
| /// the number of values. | |
| std::optional<int64_t> binary_per_row_length_ = std::nullopt; |
| } | ||
| std::shared_ptr<ResizableBuffer> ReleaseOffsets() override { | ||
| auto result = offset_; | ||
| if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) { |
There was a problem hiding this comment.
if we make it an optional:
| if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) { | |
| if (ARROW_PREDICT_FALSE(!binary_per_row_length_.has_value())) { |
| PARQUET_THROW_NOT_OK( | ||
| values_->Resize(new_values_capacity * binary_per_row_length_, false)); |
There was a problem hiding this comment.
if we make this an option:
| PARQUET_THROW_NOT_OK( | |
| values_->Resize(new_values_capacity * binary_per_row_length_, false)); | |
| int64_t per_row_length = binary_per_row_length_.value_or(kDefaultBinaryPerRowSize); | |
| PARQUET_THROW_NOT_OK( | |
| values_->Resize(new_values_capacity * per_row_length, false)); |
| // 16 KB is the default expected page header size | ||
| static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024; | ||
|
|
||
| static constexpr int32_t kDefaultBinaryPerRowSize = 20; |
There was a problem hiding this comment.
Since this corresponds to binary_per_row_length_, could we make the names match? I'm thinking "bytes per row" is the best description here:
| static constexpr int32_t kDefaultBinaryPerRowSize = 20; | |
| static constexpr int32_t kDefaultBinaryBytesPerRow = 20; |
(Also change binary_per_row_length_ to binary_bytes_per_row_)
There was a problem hiding this comment.
It would be better to add a comment here.
| /// \brief Pre-allocate space for data. Results in better flat read performance | ||
| virtual void Reserve(int64_t num_values) = 0; | ||
|
|
||
| virtual void ReserveValues(int64_t capacity) {} |
There was a problem hiding this comment.
Since these are coming from TypeRecordReader, which is private, could you mark it's methods as virtual instead?
| virtual void ReserveValues(int64_t capacity) {} |
| } | ||
| } | ||
|
|
||
| std::shared_ptr<ResizableBuffer> ReleaseOffsets() override { return nullptr; } |
There was a problem hiding this comment.
| std::shared_ptr<ResizableBuffer> ReleaseOffsets() override { return nullptr; } | |
| virtual std::shared_ptr<ResizableBuffer> ReleaseOffsets() { return nullptr; } |
| } | ||
|
|
||
| void ReserveValues(int64_t extra_values) { | ||
| void ReserveValues(int64_t extra_values) override { |
There was a problem hiding this comment.
| void ReserveValues(int64_t extra_values) override { | |
| virtual void ReserveValues(int64_t extra_values) { |
| virtual std::shared_ptr<ResizableBuffer> ReleaseOffsets() = 0; | ||
|
|
There was a problem hiding this comment.
| virtual std::shared_ptr<ResizableBuffer> ReleaseOffsets() = 0; |
| bool hasSet_uses_opt_ = false; | ||
| bool uses_opt_ = false; |
There was a problem hiding this comment.
Could you create a separate function for that, as Yibo suggested? If you do measure a meaningful performance difference, could you share your results then?
In addition, could you add a comment explaining why the optimization is only applicable to those those three encodings?
| // 16 KB is the default expected page header size | ||
| static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024; | ||
|
|
||
| static constexpr int32_t kDefaultBinaryPerRowSize = 20; |
There was a problem hiding this comment.
It would be better to add a comment here.
| /// \brief Pre-allocate space for data. Results in better flat read performance | ||
| virtual void Reserve(int64_t num_values) = 0; | ||
|
|
||
| virtual void ReserveValues(int64_t capacity) {} |
There was a problem hiding this comment.
Is it better to make it pure virtual? In addition, it helps to add a comment for public function.
| virtual std::shared_ptr<ResizableBuffer> ReleaseOffsets() = 0; | ||
|
|
| int32_t* offset, | ||
| std::shared_ptr<::arrow::ResizableBuffer>& values, | ||
| int64_t valid_bits_offset, int32_t* bianry_length) { | ||
| return 0; |
There was a problem hiding this comment.
When I was at previous employer, we have implemented mutable arrow::Array to address similar issue of ORC reader. The idea is that we can know in advance the total length of all string/binary values in a single batch. Therefore we can pre-allocate the data buffer at once or even reuse previous buffer if it has enough capacity. The overhead of buffer allocation and resize operation are non-negligible.
@pitrou We have discussed the idea in https://issues.apache.org/jira/browse/ARROW-15289
| int32_t* offset, | ||
| std::shared_ptr<::arrow::ResizableBuffer>& values, | ||
| int64_t valid_bits_offset, int* out_values_decoded, | ||
| int32_t* bianry_length) { |
There was a problem hiding this comment.
| int32_t* bianry_length) { | |
| int32_t* binary_length) { |
| return Status::OK(); | ||
| } | ||
|
|
||
| Status DecodeArrowDenseNonNull_opt(int num_values, int32_t* offset, |
There was a problem hiding this comment.
Can we avoid this kind of name and give a meaningful one instead?
| while (values_decoded < num_values) { | ||
| int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - values_decoded); | ||
| int num_indices = idx_decoder_.GetBatch(indices, batch_size); | ||
| if (num_indices == 0) ParquetException::EofException(); |
There was a problem hiding this comment.
It would be helpful to provide some concrete error message.
| if (current_encoding_ == Encoding::PLAIN_DICTIONARY || | ||
| current_encoding_ == Encoding::PLAIN || | ||
| current_encoding_ == Encoding::RLE_DICTIONARY) { |
There was a problem hiding this comment.
Just curious, why other encodings are not supported?
| std::vector<std::shared_ptr<Buffer>> buffers = {ReleaseIsValid(), ReleaseOffsets(), | ||
| ReleaseValues()}; | ||
| auto data = std::make_shared<::arrow::ArrayData>( | ||
| ::arrow::binary(), values_written(), buffers, null_count()); |
There was a problem hiding this comment.
Is ::arrow::binary() correct? Will it cause type mismatch if actual type is ::arrow::utf8()? For example, when checking equality with a StringArray, the type identity may be broken if one side is binary and the other is utf8.
| } else { | ||
| ::arrow::ArrayVector result = accumulator_.chunks; | ||
| if (result.size() == 0 || accumulator_.builder->length() > 0) { | ||
| std::shared_ptr<::arrow::Array> last_chunk; | ||
| PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk)); | ||
| result.push_back(std::move(last_chunk)); | ||
| } | ||
| accumulator_.chunks = {}; | ||
| return result; |
There was a problem hiding this comment.
+1 for not adding a separate class. This would be difficult to maintain if more optimization will be added. It would be better if an option can be added so that user can manually turn it off when something goes wrong with the new feature.
|
Closing because it has been untouched for a while, in case it's still relevant feel free to reopen and move it forward 👍 |
|
@zhixingheyi-tian Do you plan to update this? |
Target
Improve parquet reading performance for String/Binary type based on Buffer operations instead of BinaryArrayBuilder.
Just like fixed-width types which take full advantage of using buffer,
here:
arrow/cpp/src/parquet/arrow/reader_internal.cc
Line 344 in 6cfe246
Performance evaluation
CPU: Intel(R) Xeon(R) Platinum 8268 CPU @ 2.90GHz
OS :CentOS 7.6
Data: Single parquet file with dictionary-encoding, gzip compression, 100M Rows, 10 Cols
Run:With one thread
Performance evaluation using binary benchmark #15100
upstream/master:
zhixingheyi-tian/arrow_parquet_string_opt:
@cyb70289 @wjones127 @pitrou
The performance improvement by this optimization is very obvious.