diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 29334572874..2e74a2cac12 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -407,6 +407,7 @@ if(ARROW_COMPUTE) compute/function.cc compute/function_internal.cc compute/kernel.cc + compute/light_array.cc compute/registry.cc compute/kernels/aggregate_basic.cc compute/kernels/aggregate_mode.cc diff --git a/cpp/src/arrow/compute/CMakeLists.txt b/cpp/src/arrow/compute/CMakeLists.txt index 897dc32f357..27e693d9a34 100644 --- a/cpp/src/arrow/compute/CMakeLists.txt +++ b/cpp/src/arrow/compute/CMakeLists.txt @@ -63,6 +63,7 @@ add_arrow_compute_test(internals_test function_test.cc exec_test.cc kernel_test.cc + light_array_test.cc registry_test.cc) add_arrow_benchmark(function_benchmark PREFIX "arrow-compute") diff --git a/cpp/src/arrow/compute/exec/key_compare.cc b/cpp/src/arrow/compute/exec/key_compare.cc index ed94bf72301..d873aec692e 100644 --- a/cpp/src/arrow/compute/exec/key_compare.cc +++ b/cpp/src/arrow/compute/exec/key_compare.cc @@ -34,7 +34,7 @@ void KeyCompare::NullUpdateColumnToRow(uint32_t id_col, uint32_t num_rows_to_com const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx, - const KeyEncoder::KeyColumnArray& col, + const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) { if (!rows.has_any_nulls(ctx) && !col.data(0)) { @@ -91,7 +91,7 @@ void KeyCompare::CompareBinaryColumnToRowHelper( uint32_t offset_within_row, uint32_t first_row_to_compare, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx, - const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, + const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector, COMPARE_FN compare_fn) { bool is_fixed_length = rows.metadata().is_fixed_length; if (is_fixed_length) { @@ -121,7 +121,7 @@ template void KeyCompare::CompareBinaryColumnToRow( uint32_t offset_within_row, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col, + KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) { uint32_t num_processed = 0; #if defined(ARROW_HAVE_AVX2) @@ -231,7 +231,7 @@ template void KeyCompare::CompareVarBinaryColumnToRow( uint32_t id_varbinary_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col, + KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) { #if defined(ARROW_HAVE_AVX2) if (ctx->has_avx2()) { @@ -306,14 +306,11 @@ void KeyCompare::AndByteVectors(KeyEncoder::KeyEncoderContext* ctx, uint32_t num } } -void KeyCompare::CompareColumnsToRows(uint32_t num_rows_to_compare, - const uint16_t* sel_left_maybe_null, - const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, - uint32_t* out_num_rows, - uint16_t* out_sel_left_maybe_same, - const std::vector& cols, - const KeyEncoder::KeyRowArray& rows) { +void KeyCompare::CompareColumnsToRows( + uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, + const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx, + uint32_t* out_num_rows, uint16_t* out_sel_left_maybe_same, + const std::vector& cols, const KeyEncoder::KeyRowArray& rows) { if (num_rows_to_compare == 0) { *out_num_rows = 0; return; @@ -333,7 +330,7 @@ void KeyCompare::CompareColumnsToRows(uint32_t num_rows_to_compare, bool is_first_column = true; for (size_t icol = 0; icol < cols.size(); ++icol) { - const KeyEncoder::KeyColumnArray& col = cols[icol]; + const KeyColumnArray& col = cols[icol]; if (col.metadata().is_null_type) { // If this null type col is the first column, the match_bytevector_A needs to be // initialized with 0xFF. Otherwise, the calculation can be skipped @@ -374,7 +371,7 @@ void KeyCompare::CompareColumnsToRows(uint32_t num_rows_to_compare, uint32_t ivarbinary = 0; for (size_t icol = 0; icol < cols.size(); ++icol) { - const KeyEncoder::KeyColumnArray& col = cols[icol]; + const KeyColumnArray& col = cols[icol]; if (!col.metadata().is_fixed_length) { // Process varbinary and nulls if (sel_left_maybe_null) { diff --git a/cpp/src/arrow/compute/exec/key_compare.h b/cpp/src/arrow/compute/exec/key_compare.h index aeb5abbdd14..773b32d46c6 100644 --- a/cpp/src/arrow/compute/exec/key_compare.h +++ b/cpp/src/arrow/compute/exec/key_compare.h @@ -33,14 +33,11 @@ class KeyCompare { // Returns a single 16-bit selection vector of rows that failed comparison. // If there is input selection on the left, the resulting selection is a filtered image // of input selection. - static void CompareColumnsToRows(uint32_t num_rows_to_compare, - const uint16_t* sel_left_maybe_null, - const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, - uint32_t* out_num_rows, - uint16_t* out_sel_left_maybe_same, - const std::vector& cols, - const KeyEncoder::KeyRowArray& rows); + static void CompareColumnsToRows( + uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, + const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx, + uint32_t* out_num_rows, uint16_t* out_sel_left_maybe_same, + const std::vector& cols, const KeyEncoder::KeyRowArray& rows); private: template @@ -48,7 +45,7 @@ class KeyCompare { const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx, - const KeyEncoder::KeyColumnArray& col, + const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector); @@ -57,21 +54,21 @@ class KeyCompare { uint32_t offset_within_row, uint32_t first_row_to_compare, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx, - const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, + const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector, COMPARE_FN compare_fn); template static void CompareBinaryColumnToRow( uint32_t offset_within_row, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col, + KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector); template static void CompareVarBinaryColumnToRow( uint32_t id_varlen_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col, + KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector); static void AndByteVectors(KeyEncoder::KeyEncoderContext* ctx, uint32_t num_elements, @@ -83,14 +80,14 @@ class KeyCompare { static uint32_t NullUpdateColumnToRowImp_avx2( uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx, - const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, + const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector); template static uint32_t CompareBinaryColumnToRowHelper_avx2( uint32_t offset_within_row, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col, + KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector, COMPARE8_FN compare8_fn); @@ -98,14 +95,14 @@ class KeyCompare { static uint32_t CompareBinaryColumnToRowImp_avx2( uint32_t offset_within_row, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col, + KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector); template static void CompareVarBinaryColumnToRowImp_avx2( uint32_t id_varlen_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col, + KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector); static uint32_t AndByteVectors_avx2(uint32_t num_elements, uint8_t* bytevector_A, @@ -114,20 +111,20 @@ class KeyCompare { static uint32_t NullUpdateColumnToRow_avx2( bool use_selection, uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col, + KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector); static uint32_t CompareBinaryColumnToRow_avx2( bool use_selection, uint32_t offset_within_row, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col, + KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector); static void CompareVarBinaryColumnToRow_avx2( bool use_selection, bool is_first_varbinary_col, uint32_t id_varlen_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx, - const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, + const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector); #endif diff --git a/cpp/src/arrow/compute/exec/key_compare_avx2.cc b/cpp/src/arrow/compute/exec/key_compare_avx2.cc index df13e8cae3c..e45486b2ebb 100644 --- a/cpp/src/arrow/compute/exec/key_compare_avx2.cc +++ b/cpp/src/arrow/compute/exec/key_compare_avx2.cc @@ -40,7 +40,7 @@ template uint32_t KeyCompare::NullUpdateColumnToRowImp_avx2( uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx, - const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, + const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) { if (!rows.has_any_nulls(ctx) && !col.data(0)) { return num_rows_to_compare; @@ -180,7 +180,7 @@ template uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2( uint32_t offset_within_row, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col, + KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector, COMPARE8_FN compare8_fn) { bool is_fixed_length = rows.metadata().is_fixed_length; @@ -419,7 +419,7 @@ template uint32_t KeyCompare::CompareBinaryColumnToRowImp_avx2( uint32_t offset_within_row, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col, + KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) { uint32_t col_width = col.metadata().fixed_length; if (col_width == 0) { @@ -503,7 +503,7 @@ template void KeyCompare::CompareVarBinaryColumnToRowImp_avx2( uint32_t id_varbinary_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col, + KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) { const uint32_t* offsets_left = col.offsets(); const uint32_t* offsets_right = rows.offsets(); @@ -569,7 +569,7 @@ uint32_t KeyCompare::AndByteVectors_avx2(uint32_t num_elements, uint8_t* bytevec uint32_t KeyCompare::NullUpdateColumnToRow_avx2( bool use_selection, uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col, + KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) { if (use_selection) { return NullUpdateColumnToRowImp_avx2(id_col, num_rows_to_compare, @@ -585,7 +585,7 @@ uint32_t KeyCompare::NullUpdateColumnToRow_avx2( uint32_t KeyCompare::CompareBinaryColumnToRow_avx2( bool use_selection, uint32_t offset_within_row, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, - KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col, + KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) { if (use_selection) { return CompareBinaryColumnToRowImp_avx2(offset_within_row, num_rows_to_compare, @@ -602,7 +602,7 @@ void KeyCompare::CompareVarBinaryColumnToRow_avx2( bool use_selection, bool is_first_varbinary_col, uint32_t id_varlen_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx, - const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, + const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) { if (use_selection) { if (is_first_varbinary_col) { diff --git a/cpp/src/arrow/compute/exec/key_encode.cc b/cpp/src/arrow/compute/exec/key_encode.cc index f8bd7c2503e..3d92c77b09c 100644 --- a/cpp/src/arrow/compute/exec/key_encode.cc +++ b/cpp/src/arrow/compute/exec/key_encode.cc @@ -271,87 +271,8 @@ bool KeyEncoder::KeyRowArray::has_any_nulls(const KeyEncoderContext* ctx) const return has_any_nulls_; } -KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, - const KeyColumnArray& left, - const KeyColumnArray& right, - int buffer_id_to_replace) { - metadata_ = metadata; - length_ = left.length(); - for (int i = 0; i < max_buffers_; ++i) { - buffers_[i] = left.buffers_[i]; - mutable_buffers_[i] = left.mutable_buffers_[i]; - } - buffers_[buffer_id_to_replace] = right.buffers_[buffer_id_to_replace]; - mutable_buffers_[buffer_id_to_replace] = right.mutable_buffers_[buffer_id_to_replace]; - bit_offset_[0] = left.bit_offset_[0]; - bit_offset_[1] = left.bit_offset_[1]; - if (buffer_id_to_replace < max_buffers_ - 1) { - bit_offset_[buffer_id_to_replace] = right.bit_offset_[buffer_id_to_replace]; - } -} - -KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, - int64_t length, const uint8_t* buffer0, - const uint8_t* buffer1, const uint8_t* buffer2, - int bit_offset0, int bit_offset1) { - metadata_ = metadata; - length_ = length; - buffers_[0] = buffer0; - buffers_[1] = buffer1; - buffers_[2] = buffer2; - mutable_buffers_[0] = mutable_buffers_[1] = mutable_buffers_[2] = nullptr; - bit_offset_[0] = bit_offset0; - bit_offset_[1] = bit_offset1; -} - -KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, - int64_t length, uint8_t* buffer0, - uint8_t* buffer1, uint8_t* buffer2, - int bit_offset0, int bit_offset1) { - metadata_ = metadata; - length_ = length; - buffers_[0] = mutable_buffers_[0] = buffer0; - buffers_[1] = mutable_buffers_[1] = buffer1; - buffers_[2] = mutable_buffers_[2] = buffer2; - bit_offset_[0] = bit_offset0; - bit_offset_[1] = bit_offset1; -} - -KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnArray& from, int64_t start, - int64_t length) { - metadata_ = from.metadata_; - length_ = length; - uint32_t fixed_size = - !metadata_.is_fixed_length ? sizeof(uint32_t) : metadata_.fixed_length; - - buffers_[0] = - from.buffers_[0] ? from.buffers_[0] + (from.bit_offset_[0] + start) / 8 : nullptr; - mutable_buffers_[0] = from.mutable_buffers_[0] - ? from.mutable_buffers_[0] + (from.bit_offset_[0] + start) / 8 - : nullptr; - bit_offset_[0] = (from.bit_offset_[0] + start) % 8; - - if (fixed_size == 0 && !metadata_.is_null_type) { - buffers_[1] = - from.buffers_[1] ? from.buffers_[1] + (from.bit_offset_[1] + start) / 8 : nullptr; - mutable_buffers_[1] = from.mutable_buffers_[1] ? from.mutable_buffers_[1] + - (from.bit_offset_[1] + start) / 8 - : nullptr; - bit_offset_[1] = (from.bit_offset_[1] + start) % 8; - } else { - buffers_[1] = from.buffers_[1] ? from.buffers_[1] + start * fixed_size : nullptr; - mutable_buffers_[1] = from.mutable_buffers_[1] - ? from.mutable_buffers_[1] + start * fixed_size - : nullptr; - bit_offset_[1] = 0; - } - - buffers_[2] = from.buffers_[2]; - mutable_buffers_[2] = from.mutable_buffers_[2]; -} - -KeyEncoder::KeyColumnArray KeyEncoder::TransformBoolean::ArrayReplace( - const KeyColumnArray& column, const KeyColumnArray& temp) { +KeyColumnArray KeyEncoder::TransformBoolean::ArrayReplace(const KeyColumnArray& column, + const KeyColumnArray& temp) { // Make sure that the temp buffer is large enough DCHECK(temp.length() >= column.length() && temp.metadata().is_fixed_length && temp.metadata().fixed_length >= sizeof(uint8_t)); @@ -359,8 +280,7 @@ KeyEncoder::KeyColumnArray KeyEncoder::TransformBoolean::ArrayReplace( metadata.is_fixed_length = true; metadata.fixed_length = sizeof(uint8_t); constexpr int buffer_index = 1; - KeyColumnArray result = KeyColumnArray(metadata, column, temp, buffer_index); - return result; + return column.WithBufferFrom(temp, buffer_index).WithMetadata(metadata); } void KeyEncoder::TransformBoolean::PostDecode(const KeyColumnArray& input, @@ -387,8 +307,8 @@ bool KeyEncoder::EncoderInteger::UsesTransform(const KeyColumnArray& column) { return IsBoolean(column.metadata()); } -KeyEncoder::KeyColumnArray KeyEncoder::EncoderInteger::ArrayReplace( - const KeyColumnArray& column, const KeyColumnArray& temp) { +KeyColumnArray KeyEncoder::EncoderInteger::ArrayReplace(const KeyColumnArray& column, + const KeyColumnArray& temp) { if (IsBoolean(column.metadata())) { return TransformBoolean::ArrayReplace(column, temp); } @@ -955,7 +875,8 @@ void KeyEncoder::PrepareKeyColumnArrays(int64_t start_row, int64_t num_rows, uint32_t num_varbinary_visited = 0; for (uint32_t i = 0; i < num_cols; ++i) { const KeyColumnArray& col = cols_in[row_metadata_.column_order[i]]; - KeyColumnArray col_window(col, start_row, num_rows); + KeyColumnArray col_window = col.Slice(start_row, num_rows); + batch_all_cols_[i] = col_window; if (!col.metadata().is_fixed_length) { DCHECK(num_varbinary_visited < batch_varbinary_cols_.size()); diff --git a/cpp/src/arrow/compute/exec/key_encode.h b/cpp/src/arrow/compute/exec/key_encode.h index da533434d39..f9de31d9c21 100644 --- a/cpp/src/arrow/compute/exec/key_encode.h +++ b/cpp/src/arrow/compute/exec/key_encode.h @@ -22,6 +22,7 @@ #include #include "arrow/compute/exec/util.h" +#include "arrow/compute/light_array.h" #include "arrow/memory_pool.h" #include "arrow/result.h" #include "arrow/status.h" @@ -30,8 +31,6 @@ namespace arrow { namespace compute { -class KeyColumnMetadata; - /// Converts between key representation as a collection of arrays for /// individual columns and another representation as a single array of rows /// combining data from all columns into one value. @@ -49,27 +48,6 @@ class KeyEncoder { util::TempVectorStack* stack; }; - /// Description of a storage format of a single key column as needed - /// for the purpose of row encoding. - struct KeyColumnMetadata { - KeyColumnMetadata() = default; - KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in, - bool is_null_type_in = false) - : is_fixed_length(is_fixed_length_in), - is_null_type(is_null_type_in), - fixed_length(fixed_length_in) {} - /// Is column storing a varying-length binary, using offsets array - /// to find a beginning of a value, or is it a fixed-length binary. - bool is_fixed_length; - /// Is column null type - bool is_null_type; - /// For a fixed-length binary column: number of bytes per value. - /// Zero has a special meaning, indicating a bit vector with one bit per value if it - /// isn't a null type column. - /// For a varying-length binary column: number of bytes per offset. - uint32_t fixed_length; - }; - /// Description of a storage format for rows produced by encoder. struct KeyRowMetadata { /// Is row a varying-length binary, using offsets array to find a beginning of a row, @@ -241,57 +219,6 @@ class KeyEncoder { mutable bool has_any_nulls_; }; - /// A lightweight description of an array representing one of key columns. - class KeyColumnArray { - public: - KeyColumnArray() = default; - /// Create as a mix of buffers according to the mask from two descriptions - /// (Nth bit is set to 0 if Nth buffer from the first input - /// should be used and is set to 1 otherwise). - /// Metadata is inherited from the first input. - KeyColumnArray(const KeyColumnMetadata& metadata, const KeyColumnArray& left, - const KeyColumnArray& right, int buffer_id_to_replace); - /// Create for reading - KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, - const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2, - int bit_offset0 = 0, int bit_offset1 = 0); - /// Create for writing - KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0, - uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0, - int bit_offset1 = 0); - /// Create as a window view of original description that is offset - /// by a given number of rows. - /// The number of rows used in offset must be divisible by 8 - /// in order to not split bit vectors within a single byte. - KeyColumnArray(const KeyColumnArray& from, int64_t start, int64_t length); - uint8_t* mutable_data(int i) { - ARROW_DCHECK(i >= 0 && i <= max_buffers_); - return mutable_buffers_[i]; - } - const uint8_t* data(int i) const { - ARROW_DCHECK(i >= 0 && i <= max_buffers_); - return buffers_[i]; - } - uint32_t* mutable_offsets() { return reinterpret_cast(mutable_data(1)); } - const uint32_t* offsets() const { return reinterpret_cast(data(1)); } - const KeyColumnMetadata& metadata() const { return metadata_; } - int64_t length() const { return length_; } - int bit_offset(int i) const { - ARROW_DCHECK(i >= 0 && i < max_buffers_); - return bit_offset_[i]; - } - - private: - static constexpr int max_buffers_ = 3; - const uint8_t* buffers_[max_buffers_]; - uint8_t* mutable_buffers_[max_buffers_]; - KeyColumnMetadata metadata_; - int64_t length_; - // Starting bit offset within the first byte (between 0 and 7) - // to be used when accessing buffers that store bit vectors. - int bit_offset_[max_buffers_ - 1]; - }; - void Init(const std::vector& cols, KeyEncoderContext* ctx, int row_alignment, int string_alignment); diff --git a/cpp/src/arrow/compute/exec/key_hash.cc b/cpp/src/arrow/compute/exec/key_hash.cc index bc4cae74ddc..125fd3912e1 100644 --- a/cpp/src/arrow/compute/exec/key_hash.cc +++ b/cpp/src/arrow/compute/exec/key_hash.cc @@ -375,7 +375,7 @@ void Hashing32::HashFixed(int64_t hardware_flags, bool combine_hashes, uint32_t } } -void Hashing32::HashMultiColumn(const std::vector& cols, +void Hashing32::HashMultiColumn(const std::vector& cols, KeyEncoder::KeyEncoderContext* ctx, uint32_t* hashes) { uint32_t num_rows = static_cast(cols[0].length()); @@ -799,7 +799,7 @@ void Hashing64::HashFixed(bool combine_hashes, uint32_t num_rows, uint64_t lengt } } -void Hashing64::HashMultiColumn(const std::vector& cols, +void Hashing64::HashMultiColumn(const std::vector& cols, KeyEncoder::KeyEncoderContext* ctx, uint64_t* hashes) { uint32_t num_rows = static_cast(cols[0].length()); diff --git a/cpp/src/arrow/compute/exec/key_hash.h b/cpp/src/arrow/compute/exec/key_hash.h index 88f77be1a4f..719f3dfd460 100644 --- a/cpp/src/arrow/compute/exec/key_hash.h +++ b/cpp/src/arrow/compute/exec/key_hash.h @@ -45,7 +45,7 @@ class ARROW_EXPORT Hashing32 { friend void TestBloomSmall(BloomFilterBuildStrategy, int64_t, int, bool, bool); public: - static void HashMultiColumn(const std::vector& cols, + static void HashMultiColumn(const std::vector& cols, KeyEncoder::KeyEncoderContext* ctx, uint32_t* out_hash); private: @@ -153,7 +153,7 @@ class ARROW_EXPORT Hashing64 { friend void TestBloomSmall(BloomFilterBuildStrategy, int64_t, int, bool, bool); public: - static void HashMultiColumn(const std::vector& cols, + static void HashMultiColumn(const std::vector& cols, KeyEncoder::KeyEncoderContext* ctx, uint64_t* hashes); private: diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index ab8e6cd77d1..db34ee6c596 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -237,19 +237,18 @@ struct GrouperFastImpl : Grouper { auto bit_width = checked_cast(*key).bit_width(); ARROW_DCHECK(bit_width % 8 == 0); impl->col_metadata_[icol] = - arrow::compute::KeyEncoder::KeyColumnMetadata(true, bit_width / 8); + arrow::compute::KeyColumnMetadata(true, bit_width / 8); } else if (key->id() == Type::BOOL) { - impl->col_metadata_[icol] = - arrow::compute::KeyEncoder::KeyColumnMetadata(true, 0); + impl->col_metadata_[icol] = arrow::compute::KeyColumnMetadata(true, 0); } else if (is_fixed_width(key->id())) { - impl->col_metadata_[icol] = arrow::compute::KeyEncoder::KeyColumnMetadata( + impl->col_metadata_[icol] = arrow::compute::KeyColumnMetadata( true, checked_cast(*key).bit_width() / 8); } else if (is_binary_like(key->id())) { impl->col_metadata_[icol] = - arrow::compute::KeyEncoder::KeyColumnMetadata(false, sizeof(uint32_t)); + arrow::compute::KeyColumnMetadata(false, sizeof(uint32_t)); } else if (key->id() == Type::NA) { - impl->col_metadata_[icol] = arrow::compute::KeyEncoder::KeyColumnMetadata( - true, 0, /*is_null_type_in=*/true); + impl->col_metadata_[icol] = + arrow::compute::KeyColumnMetadata(true, 0, /*is_null_type_in=*/true); } else { return Status::NotImplemented("Keys of type ", *key); } @@ -352,11 +351,10 @@ struct GrouperFastImpl : Grouper { int64_t offset = batch[icol].array()->offset; - auto col_base = arrow::compute::KeyEncoder::KeyColumnArray( + auto col_base = arrow::compute::KeyColumnArray( col_metadata_[icol], offset + num_rows, non_nulls, fixedlen, varlen); - cols_[icol] = - arrow::compute::KeyEncoder::KeyColumnArray(col_base, offset, num_rows); + cols_[icol] = col_base.Slice(offset, num_rows); } // Split into smaller mini-batches @@ -434,8 +432,8 @@ struct GrouperFastImpl : Grouper { if (col_metadata_[i].is_null_type) { uint8_t* non_nulls = NULLPTR; uint8_t* fixedlen = NULLPTR; - cols_[i] = arrow::compute::KeyEncoder::KeyColumnArray( - col_metadata_[i], num_groups, non_nulls, fixedlen, NULLPTR); + cols_[i] = arrow::compute::KeyColumnArray(col_metadata_[i], num_groups, non_nulls, + fixedlen, NULLPTR); continue; } ARROW_ASSIGN_OR_RAISE(non_null_bufs[i], AllocatePaddedBitmap(num_groups)); @@ -451,7 +449,7 @@ struct GrouperFastImpl : Grouper { ARROW_ASSIGN_OR_RAISE(fixedlen_bufs[i], AllocatePaddedBuffer((num_groups + 1) * sizeof(uint32_t))); } - cols_[i] = arrow::compute::KeyEncoder::KeyColumnArray( + cols_[i] = arrow::compute::KeyColumnArray( col_metadata_[i], num_groups, non_null_bufs[i]->mutable_data(), fixedlen_bufs[i]->mutable_data(), nullptr); } @@ -470,7 +468,7 @@ struct GrouperFastImpl : Grouper { auto varlen_size = reinterpret_cast(fixedlen_bufs[i]->data())[num_groups]; ARROW_ASSIGN_OR_RAISE(varlen_bufs[i], AllocatePaddedBuffer(varlen_size)); - cols_[i] = arrow::compute::KeyEncoder::KeyColumnArray( + cols_[i] = arrow::compute::KeyColumnArray( col_metadata_[i], num_groups, non_null_bufs[i]->mutable_data(), fixedlen_bufs[i]->mutable_data(), varlen_bufs[i]->mutable_data()); } @@ -534,8 +532,8 @@ struct GrouperFastImpl : Grouper { arrow::compute::KeyEncoder::KeyEncoderContext encode_ctx_; std::vector> key_types_; - std::vector col_metadata_; - std::vector cols_; + std::vector col_metadata_; + std::vector cols_; std::vector minibatch_hashes_; std::vector> dictionaries_; diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc new file mode 100644 index 00000000000..390dcc6cbf1 --- /dev/null +++ b/cpp/src/arrow/compute/light_array.cc @@ -0,0 +1,729 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/light_array.h" + +#include + +#include "arrow/util/bitmap_ops.h" + +namespace arrow { +namespace compute { + +KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, + const uint8_t* validity_buffer, + const uint8_t* fixed_length_buffer, + const uint8_t* var_length_buffer, int bit_offset_validity, + int bit_offset_fixed) { + static_assert(std::is_pod::value, + "This class was intended to be a POD type"); + metadata_ = metadata; + length_ = length; + buffers_[kValidityBuffer] = validity_buffer; + buffers_[kFixedLengthBuffer] = fixed_length_buffer; + buffers_[kVariableLengthBuffer] = var_length_buffer; + mutable_buffers_[kValidityBuffer] = mutable_buffers_[kFixedLengthBuffer] = + mutable_buffers_[kVariableLengthBuffer] = nullptr; + bit_offset_[kValidityBuffer] = bit_offset_validity; + bit_offset_[kFixedLengthBuffer] = bit_offset_fixed; +} + +KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, + uint8_t* validity_buffer, uint8_t* fixed_length_buffer, + uint8_t* var_length_buffer, int bit_offset_validity, + int bit_offset_fixed) { + metadata_ = metadata; + length_ = length; + buffers_[kValidityBuffer] = mutable_buffers_[kValidityBuffer] = validity_buffer; + buffers_[kFixedLengthBuffer] = mutable_buffers_[kFixedLengthBuffer] = + fixed_length_buffer; + buffers_[kVariableLengthBuffer] = mutable_buffers_[kVariableLengthBuffer] = + var_length_buffer; + bit_offset_[kValidityBuffer] = bit_offset_validity; + bit_offset_[kFixedLengthBuffer] = bit_offset_fixed; +} + +KeyColumnArray KeyColumnArray::WithBufferFrom(const KeyColumnArray& other, + int buffer_id_to_replace) const { + KeyColumnArray copy = *this; + copy.mutable_buffers_[buffer_id_to_replace] = + other.mutable_buffers_[buffer_id_to_replace]; + copy.buffers_[buffer_id_to_replace] = other.buffers_[buffer_id_to_replace]; + if (buffer_id_to_replace < kMaxBuffers - 1) { + copy.bit_offset_[buffer_id_to_replace] = other.bit_offset_[buffer_id_to_replace]; + } + return copy; +} + +KeyColumnArray KeyColumnArray::WithMetadata(const KeyColumnMetadata& metadata) const { + KeyColumnArray copy = *this; + copy.metadata_ = metadata; + return copy; +} + +KeyColumnArray KeyColumnArray::Slice(int64_t offset, int64_t length) const { + KeyColumnArray sliced; + sliced.metadata_ = metadata_; + sliced.length_ = length; + uint32_t fixed_size = + !metadata_.is_fixed_length ? sizeof(uint32_t) : metadata_.fixed_length; + + sliced.buffers_[0] = + buffers_[0] ? buffers_[0] + (bit_offset_[0] + offset) / 8 : nullptr; + sliced.mutable_buffers_[0] = + mutable_buffers_[0] ? mutable_buffers_[0] + (bit_offset_[0] + offset) / 8 : nullptr; + sliced.bit_offset_[0] = (bit_offset_[0] + offset) % 8; + + if (fixed_size == 0 && !metadata_.is_null_type) { + sliced.buffers_[1] = + buffers_[1] ? buffers_[1] + (bit_offset_[1] + offset) / 8 : nullptr; + sliced.mutable_buffers_[1] = mutable_buffers_[1] + ? mutable_buffers_[1] + (bit_offset_[1] + offset) / 8 + : nullptr; + sliced.bit_offset_[1] = (bit_offset_[1] + offset) % 8; + } else { + sliced.buffers_[1] = buffers_[1] ? buffers_[1] + offset * fixed_size : nullptr; + sliced.mutable_buffers_[1] = + mutable_buffers_[1] ? mutable_buffers_[1] + offset * fixed_size : nullptr; + sliced.bit_offset_[1] = 0; + } + + sliced.buffers_[2] = buffers_[2]; + sliced.mutable_buffers_[2] = mutable_buffers_[2]; + return sliced; +} + +Result ColumnMetadataFromDataType( + const std::shared_ptr& type) { + if (type->id() == Type::DICTIONARY) { + auto bit_width = + arrow::internal::checked_cast(*type).bit_width(); + ARROW_DCHECK(bit_width % 8 == 0); + return KeyColumnMetadata(true, bit_width / 8); + } + if (type->id() == Type::BOOL) { + return KeyColumnMetadata(true, 0); + } + if (is_fixed_width(type->id())) { + return KeyColumnMetadata( + true, + arrow::internal::checked_cast(*type).bit_width() / 8); + } + if (is_binary_like(type->id())) { + return KeyColumnMetadata(false, sizeof(uint32_t)); + } + if (is_large_binary_like(type->id())) { + return KeyColumnMetadata(false, sizeof(uint64_t)); + } + if (type->id() == Type::NA) { + return KeyColumnMetadata(true, 0, true); + } + // Caller attempted to create a KeyColumnArray from an invalid type + return Status::TypeError("Unsupported column data type ", type->name(), + " used with KeyColumnMetadata"); +} + +Result ColumnArrayFromArrayData( + const std::shared_ptr& array_data, int start_row, int num_rows) { + ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata metadata, + ColumnMetadataFromDataType(array_data->type)); + KeyColumnArray column_array = KeyColumnArray( + metadata, array_data->offset + start_row + num_rows, + array_data->buffers[0] != NULLPTR ? array_data->buffers[0]->data() : nullptr, + array_data->buffers[1]->data(), + (array_data->buffers.size() > 2 && array_data->buffers[2] != NULLPTR) + ? array_data->buffers[2]->data() + : nullptr); + return column_array.Slice(array_data->offset + start_row, num_rows); +} + +Status ColumnMetadatasFromExecBatch(const ExecBatch& batch, + std::vector* column_metadatas) { + int num_columns = static_cast(batch.values.size()); + column_metadatas->resize(num_columns); + for (int i = 0; i < num_columns; ++i) { + const Datum& data = batch.values[i]; + ARROW_DCHECK(data.is_array()); + const std::shared_ptr& array_data = data.array(); + ARROW_ASSIGN_OR_RAISE((*column_metadatas)[i], + ColumnMetadataFromDataType(array_data->type)); + } + return Status::OK(); +} + +Status ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows, + std::vector* column_arrays) { + int num_columns = static_cast(batch.values.size()); + column_arrays->resize(num_columns); + for (int i = 0; i < num_columns; ++i) { + const Datum& data = batch.values[i]; + ARROW_DCHECK(data.is_array()); + const std::shared_ptr& array_data = data.array(); + ARROW_ASSIGN_OR_RAISE((*column_arrays)[i], + ColumnArrayFromArrayData(array_data, start_row, num_rows)); + } + return Status::OK(); +} + +Status ColumnArraysFromExecBatch(const ExecBatch& batch, + std::vector* column_arrays) { + return ColumnArraysFromExecBatch(batch, 0, static_cast(batch.length), + column_arrays); +} + +void ResizableArrayData::Init(const std::shared_ptr& data_type, + MemoryPool* pool, int log_num_rows_min) { +#ifndef NDEBUG + if (num_rows_allocated_ > 0) { + ARROW_DCHECK(data_type_ != NULLPTR); + KeyColumnMetadata metadata_before = + ColumnMetadataFromDataType(data_type_).ValueOrDie(); + KeyColumnMetadata metadata_after = ColumnMetadataFromDataType(data_type).ValueOrDie(); + ARROW_DCHECK(metadata_before.is_fixed_length == metadata_after.is_fixed_length && + metadata_before.fixed_length == metadata_after.fixed_length); + } +#endif + Clear(/*release_buffers=*/false); + log_num_rows_min_ = log_num_rows_min; + data_type_ = data_type; + pool_ = pool; +} + +void ResizableArrayData::Clear(bool release_buffers) { + num_rows_ = 0; + if (release_buffers) { + buffers_[kValidityBuffer].reset(); + buffers_[kFixedLengthBuffer].reset(); + buffers_[kVariableLengthBuffer].reset(); + num_rows_allocated_ = 0; + var_len_buf_size_ = 0; + } +} + +Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) { + ARROW_DCHECK(num_rows_new >= 0); + if (num_rows_new <= num_rows_allocated_) { + num_rows_ = num_rows_new; + return Status::OK(); + } + + int num_rows_allocated_new = 1 << log_num_rows_min_; + while (num_rows_allocated_new < num_rows_new) { + num_rows_allocated_new *= 2; + } + + KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie(); + + if (buffers_[kFixedLengthBuffer] == NULLPTR) { + ARROW_DCHECK(buffers_[kValidityBuffer] == NULLPTR && + buffers_[kVariableLengthBuffer] == NULLPTR); + + ARROW_ASSIGN_OR_RAISE( + buffers_[kValidityBuffer], + AllocateResizableBuffer( + bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, pool_)); + if (column_metadata.is_fixed_length) { + if (column_metadata.fixed_length == 0) { + ARROW_ASSIGN_OR_RAISE( + buffers_[kFixedLengthBuffer], + AllocateResizableBuffer( + bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, + pool_)); + } else { + ARROW_ASSIGN_OR_RAISE( + buffers_[kFixedLengthBuffer], + AllocateResizableBuffer( + num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes, + pool_)); + } + } else { + ARROW_ASSIGN_OR_RAISE( + buffers_[kFixedLengthBuffer], + AllocateResizableBuffer( + (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes, pool_)); + } + + ARROW_ASSIGN_OR_RAISE( + buffers_[kVariableLengthBuffer], + AllocateResizableBuffer(sizeof(uint64_t) + kNumPaddingBytes, pool_)); + + var_len_buf_size_ = sizeof(uint64_t); + } else { + ARROW_DCHECK(buffers_[kValidityBuffer] != NULLPTR && + buffers_[kVariableLengthBuffer] != NULLPTR); + + RETURN_NOT_OK(buffers_[kValidityBuffer]->Resize( + bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes)); + + if (column_metadata.is_fixed_length) { + if (column_metadata.fixed_length == 0) { + RETURN_NOT_OK(buffers_[kFixedLengthBuffer]->Resize( + bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes)); + } else { + RETURN_NOT_OK(buffers_[kFixedLengthBuffer]->Resize( + num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes)); + } + } else { + RETURN_NOT_OK(buffers_[kFixedLengthBuffer]->Resize( + (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes)); + } + } + + num_rows_allocated_ = num_rows_allocated_new; + num_rows_ = num_rows_new; + + return Status::OK(); +} + +Status ResizableArrayData::ResizeVaryingLengthBuffer() { + KeyColumnMetadata column_metadata; + column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie(); + + if (!column_metadata.is_fixed_length) { + int min_new_size = static_cast(reinterpret_cast( + buffers_[kFixedLengthBuffer]->data())[num_rows_]); + ARROW_DCHECK(var_len_buf_size_ > 0); + if (var_len_buf_size_ < min_new_size) { + int new_size = var_len_buf_size_; + while (new_size < min_new_size) { + new_size *= 2; + } + RETURN_NOT_OK(buffers_[kVariableLengthBuffer]->Resize(new_size + kNumPaddingBytes)); + var_len_buf_size_ = new_size; + } + } + + return Status::OK(); +} + +KeyColumnArray ResizableArrayData::column_array() const { + KeyColumnMetadata column_metadata; + column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie(); + return KeyColumnArray(column_metadata, num_rows_, + buffers_[kValidityBuffer]->mutable_data(), + buffers_[kFixedLengthBuffer]->mutable_data(), + buffers_[kVariableLengthBuffer]->mutable_data()); +} + +std::shared_ptr ResizableArrayData::array_data() const { + KeyColumnMetadata column_metadata; + column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie(); + + auto valid_count = arrow::internal::CountSetBits( + buffers_[kValidityBuffer]->data(), /*offset=*/0, static_cast(num_rows_)); + int null_count = static_cast(num_rows_) - static_cast(valid_count); + + if (column_metadata.is_fixed_length) { + return ArrayData::Make(data_type_, num_rows_, + {buffers_[kValidityBuffer], buffers_[kFixedLengthBuffer]}, + null_count); + } else { + return ArrayData::Make(data_type_, num_rows_, + {buffers_[kValidityBuffer], buffers_[kFixedLengthBuffer], + buffers_[kVariableLengthBuffer]}, + null_count); + } +} + +int ExecBatchBuilder::NumRowsToSkip(const std::shared_ptr& column, + int num_rows, const uint16_t* row_ids, + int num_tail_bytes_to_skip) { +#ifndef NDEBUG + // Ids must be in non-decreasing order + // + for (int i = 1; i < num_rows; ++i) { + ARROW_DCHECK(row_ids[i] >= row_ids[i - 1]); + } +#endif + + KeyColumnMetadata column_metadata = + ColumnMetadataFromDataType(column->type).ValueOrDie(); + + int num_rows_left = num_rows; + int num_bytes_skipped = 0; + while (num_rows_left > 0 && num_bytes_skipped < num_tail_bytes_to_skip) { + if (column_metadata.is_fixed_length) { + if (column_metadata.fixed_length == 0) { + num_rows_left = std::max(num_rows_left, 8) - 8; + ++num_bytes_skipped; + } else { + --num_rows_left; + num_bytes_skipped += column_metadata.fixed_length; + } + } else { + --num_rows_left; + int row_id_removed = row_ids[num_rows_left]; + const uint32_t* offsets = + reinterpret_cast(column->buffers[1]->data()); + num_bytes_skipped += offsets[row_id_removed + 1] - offsets[row_id_removed]; + } + } + + return num_rows - num_rows_left; +} + +template +void ExecBatchBuilder::CollectBitsImp(const uint8_t* input_bits, + int64_t input_bits_offset, uint8_t* output_bits, + int64_t output_bits_offset, int num_rows, + const uint16_t* row_ids) { + if (!OUTPUT_BYTE_ALIGNED) { + ARROW_DCHECK(output_bits_offset % 8 > 0); + output_bits[output_bits_offset / 8] &= + static_cast((1 << (output_bits_offset % 8)) - 1); + } else { + ARROW_DCHECK(output_bits_offset % 8 == 0); + } + constexpr int unroll = 8; + for (int i = 0; i < num_rows / unroll; ++i) { + const uint16_t* row_ids_base = row_ids + unroll * i; + uint8_t result; + result = bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[0]) ? 1 : 0; + result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[1]) ? 2 : 0; + result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[2]) ? 4 : 0; + result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[3]) ? 8 : 0; + result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[4]) ? 16 : 0; + result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[5]) ? 32 : 0; + result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[6]) ? 64 : 0; + result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[7]) ? 128 : 0; + if (OUTPUT_BYTE_ALIGNED) { + output_bits[output_bits_offset / 8 + i] = result; + } else { + output_bits[output_bits_offset / 8 + i] |= + static_cast(result << (output_bits_offset % 8)); + output_bits[output_bits_offset / 8 + i + 1] = + static_cast(result >> (8 - (output_bits_offset % 8))); + } + } + if (num_rows % unroll > 0) { + for (int i = num_rows - (num_rows % unroll); i < num_rows; ++i) { + bit_util::SetBitTo(output_bits, output_bits_offset + i, + bit_util::GetBit(input_bits, input_bits_offset + row_ids[i])); + } + } +} + +void ExecBatchBuilder::CollectBits(const uint8_t* input_bits, int64_t input_bits_offset, + uint8_t* output_bits, int64_t output_bits_offset, + int num_rows, const uint16_t* row_ids) { + if (output_bits_offset % 8 > 0) { + CollectBitsImp(input_bits, input_bits_offset, output_bits, output_bits_offset, + num_rows, row_ids); + } else { + CollectBitsImp(input_bits, input_bits_offset, output_bits, output_bits_offset, + num_rows, row_ids); + } +} + +template +void ExecBatchBuilder::Visit(const std::shared_ptr& column, int num_rows, + const uint16_t* row_ids, PROCESS_VALUE_FN process_value_fn) { + KeyColumnMetadata metadata = ColumnMetadataFromDataType(column->type).ValueOrDie(); + + if (!metadata.is_fixed_length) { + const uint8_t* ptr_base = column->buffers[2]->data(); + const uint32_t* offsets = + reinterpret_cast(column->buffers[1]->data()) + column->offset; + for (int i = 0; i < num_rows; ++i) { + uint16_t row_id = row_ids[i]; + const uint8_t* field_ptr = ptr_base + offsets[row_id]; + uint32_t field_length = offsets[row_id + 1] - offsets[row_id]; + process_value_fn(i, field_ptr, field_length); + } + } else { + ARROW_DCHECK(metadata.fixed_length > 0); + for (int i = 0; i < num_rows; ++i) { + uint16_t row_id = row_ids[i]; + const uint8_t* field_ptr = + column->buffers[1]->data() + + (column->offset + row_id) * static_cast(metadata.fixed_length); + process_value_fn(i, field_ptr, metadata.fixed_length); + } + } +} + +Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source, + ResizableArrayData* target, + int num_rows_to_append, const uint16_t* row_ids, + MemoryPool* pool) { + int num_rows_before = target->num_rows(); + ARROW_DCHECK(num_rows_before >= 0); + int num_rows_after = num_rows_before + num_rows_to_append; + if (target->num_rows() == 0) { + target->Init(source->type, pool, kLogNumRows); + } + RETURN_NOT_OK(target->ResizeFixedLengthBuffers(num_rows_after)); + + KeyColumnMetadata column_metadata = + ColumnMetadataFromDataType(source->type).ValueOrDie(); + + if (column_metadata.is_fixed_length) { + // Fixed length column + // + uint32_t fixed_length = column_metadata.fixed_length; + switch (fixed_length) { + case 0: + CollectBits(source->buffers[1]->data(), source->offset, target->mutable_data(1), + num_rows_before, num_rows_to_append, row_ids); + break; + case 1: + Visit(source, num_rows_to_append, row_ids, + [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + target->mutable_data(1)[num_rows_before + i] = *ptr; + }); + break; + case 2: + Visit( + source, num_rows_to_append, row_ids, + [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + reinterpret_cast(target->mutable_data(1))[num_rows_before + i] = + *reinterpret_cast(ptr); + }); + break; + case 4: + Visit( + source, num_rows_to_append, row_ids, + [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + reinterpret_cast(target->mutable_data(1))[num_rows_before + i] = + *reinterpret_cast(ptr); + }); + break; + case 8: + Visit( + source, num_rows_to_append, row_ids, + [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + reinterpret_cast(target->mutable_data(1))[num_rows_before + i] = + *reinterpret_cast(ptr); + }); + break; + default: { + int num_rows_to_process = + num_rows_to_append - + NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t)); + Visit(source, num_rows_to_process, row_ids, + [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + uint64_t* dst = reinterpret_cast( + target->mutable_data(1) + + static_cast(num_bytes) * (num_rows_before + i)); + const uint64_t* src = reinterpret_cast(ptr); + for (uint32_t word_id = 0; + word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t)); + ++word_id) { + util::SafeStore(dst + word_id, util::SafeLoad(src + word_id)); + } + }); + if (num_rows_to_append > num_rows_to_process) { + Visit(source, num_rows_to_append - num_rows_to_process, + row_ids + num_rows_to_process, + [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + uint64_t* dst = reinterpret_cast( + target->mutable_data(1) + + static_cast(num_bytes) * + (num_rows_before + num_rows_to_process + i)); + const uint64_t* src = reinterpret_cast(ptr); + memcpy(dst, src, num_bytes); + }); + } + } + } + } else { + // Varying length column + // + + // Step 1: calculate target offsets + // + uint32_t* offsets = reinterpret_cast(target->mutable_data(1)); + uint32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before]; + Visit(source, num_rows_to_append, row_ids, + [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + offsets[num_rows_before + i] = num_bytes; + }); + for (int i = 0; i < num_rows_to_append; ++i) { + uint32_t length = offsets[num_rows_before + i]; + offsets[num_rows_before + i] = sum; + sum += length; + } + offsets[num_rows_before + num_rows_to_append] = sum; + + // Step 2: resize output buffers + // + RETURN_NOT_OK(target->ResizeVaryingLengthBuffer()); + + // Step 3: copy varying-length data + // + int num_rows_to_process = + num_rows_to_append - + NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t)); + Visit(source, num_rows_to_process, row_ids, + [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + uint64_t* dst = reinterpret_cast(target->mutable_data(2) + + offsets[num_rows_before + i]); + const uint64_t* src = reinterpret_cast(ptr); + for (uint32_t word_id = 0; + word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t)); ++word_id) { + util::SafeStore(dst + word_id, util::SafeLoad(src + word_id)); + } + }); + Visit(source, num_rows_to_append - num_rows_to_process, row_ids + num_rows_to_process, + [&](int i, const uint8_t* ptr, uint32_t num_bytes) { + uint64_t* dst = reinterpret_cast( + target->mutable_data(2) + + offsets[num_rows_before + num_rows_to_process + i]); + const uint64_t* src = reinterpret_cast(ptr); + memcpy(dst, src, num_bytes); + }); + } + + // Process nulls + // + if (source->buffers[0] == NULLPTR) { + uint8_t* dst = target->mutable_data(0); + dst[num_rows_before / 8] |= static_cast(~0ULL << (num_rows_before & 7)); + for (int i = num_rows_before / 8 + 1; + i < bit_util::BytesForBits(num_rows_before + num_rows_to_append); ++i) { + dst[i] = 0xff; + } + } else { + CollectBits(source->buffers[0]->data(), source->offset, target->mutable_data(0), + num_rows_before, num_rows_to_append, row_ids); + } + + return Status::OK(); +} + +Status ExecBatchBuilder::AppendNulls(const std::shared_ptr& type, + ResizableArrayData& target, int num_rows_to_append, + MemoryPool* pool) { + int num_rows_before = target.num_rows(); + int num_rows_after = num_rows_before + num_rows_to_append; + if (target.num_rows() == 0) { + target.Init(type, pool, kLogNumRows); + } + RETURN_NOT_OK(target.ResizeFixedLengthBuffers(num_rows_after)); + + KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(type).ValueOrDie(); + + // Process fixed length buffer + // + if (column_metadata.is_fixed_length) { + uint8_t* dst = target.mutable_data(1); + if (column_metadata.fixed_length == 0) { + dst[num_rows_before / 8] &= static_cast((1 << (num_rows_before % 8)) - 1); + int64_t offset_begin = num_rows_before / 8 + 1; + int64_t offset_end = bit_util::BytesForBits(num_rows_after); + if (offset_end > offset_begin) { + memset(dst + offset_begin, 0, offset_end - offset_begin); + } + } else { + memset(dst + num_rows_before * static_cast(column_metadata.fixed_length), + 0, static_cast(column_metadata.fixed_length) * num_rows_to_append); + } + } else { + uint32_t* dst = reinterpret_cast(target.mutable_data(1)); + uint32_t sum = num_rows_before == 0 ? 0 : dst[num_rows_before]; + for (int64_t i = num_rows_before; i <= num_rows_after; ++i) { + dst[i] = sum; + } + } + + // Process nulls + // + uint8_t* dst = target.mutable_data(0); + dst[num_rows_before / 8] &= static_cast((1 << (num_rows_before % 8)) - 1); + int64_t offset_begin = num_rows_before / 8 + 1; + int64_t offset_end = bit_util::BytesForBits(num_rows_after); + if (offset_end > offset_begin) { + memset(dst + offset_begin, 0, offset_end - offset_begin); + } + + return Status::OK(); +} + +Status ExecBatchBuilder::AppendSelected(MemoryPool* pool, const ExecBatch& batch, + int num_rows_to_append, const uint16_t* row_ids, + int num_cols, const int* col_ids) { + if (num_rows_to_append == 0) { + return Status::OK(); + } + + if (num_rows() + num_rows_to_append > num_rows_max()) { + return Status::CapacityError("ExecBatch builder exceeded limit of accumulated rows"); + } + + // If this is the first time we append rows, then initialize output buffers. + // + if (values_.empty()) { + values_.resize(num_cols); + for (int i = 0; i < num_cols; ++i) { + const Datum& data = batch.values[col_ids ? col_ids[i] : i]; + ARROW_DCHECK(data.is_array()); + const std::shared_ptr& array_data = data.array(); + values_[i].Init(array_data->type, pool, kLogNumRows); + } + } + + for (size_t i = 0; i < values_.size(); ++i) { + const Datum& data = batch.values[col_ids ? col_ids[i] : i]; + ARROW_DCHECK(data.is_array()); + const std::shared_ptr& array_data = data.array(); + RETURN_NOT_OK( + AppendSelected(array_data, &values_[i], num_rows_to_append, row_ids, pool)); + } + + return Status::OK(); +} + +Status ExecBatchBuilder::AppendNulls(MemoryPool* pool, + const std::vector>& types, + int num_rows_to_append) { + if (num_rows_to_append == 0) { + return Status::OK(); + } + + if (num_rows() + num_rows_to_append > num_rows_max()) { + return Status::CapacityError("ExecBatch builder exceeded limit of accumulated rows."); + } + + // If this is the first time we append rows, then initialize output buffers. + // + if (values_.empty()) { + values_.resize(types.size()); + for (size_t i = 0; i < types.size(); ++i) { + values_[i].Init(types[i], pool, kLogNumRows); + } + } + + for (size_t i = 0; i < values_.size(); ++i) { + RETURN_NOT_OK(AppendNulls(types[i], values_[i], num_rows_to_append, pool)); + } + + return Status::OK(); +} + +ExecBatch ExecBatchBuilder::Flush() { + ARROW_DCHECK(num_rows() > 0); + ExecBatch out({}, num_rows()); + out.values.resize(values_.size()); + for (size_t i = 0; i < values_.size(); ++i) { + out.values[i] = values_[i].array_data(); + values_[i].Clear(true); + } + return out; +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h new file mode 100644 index 00000000000..0856e3e8aa5 --- /dev/null +++ b/cpp/src/arrow/compute/light_array.h @@ -0,0 +1,382 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/array.h" +#include "arrow/compute/exec.h" +#include "arrow/type.h" +#include "arrow/util/logging.h" + +/// This file contains lightweight containers for Arrow buffers. These containers +/// makes compromises in terms of strong ownership and the range of data types supported +/// in order to gain performance and reduced overhead. + +namespace arrow { +namespace compute { + +/// \brief Description of the layout of a "key" column +/// +/// A "key" column is a non-nested, non-union column. +/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers +/// and no children. +/// +/// This metadata object is a zero-allocation analogue of arrow::DataType +struct ARROW_EXPORT KeyColumnMetadata { + KeyColumnMetadata() = default; + KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in, + bool is_null_type_in = false) + : is_fixed_length(is_fixed_length_in), + is_null_type(is_null_type_in), + fixed_length(fixed_length_in) {} + /// \brief True if the column is not a varying-length binary type + /// + /// If this is true the column will have a validity buffer and + /// a data buffer and the third buffer will be unused. + bool is_fixed_length; + /// \brief True if this column is the null type + bool is_null_type; + /// \brief The number of bytes for each item + /// + /// Zero has a special meaning, indicating a bit vector with one bit per value if it + /// isn't a null type column. + /// + /// For a varying-length binary column this represents the number of bytes per offset. + uint32_t fixed_length; +}; + +/// \brief A lightweight view into a "key" array +/// +/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata +/// +/// This metadata object is a zero-allocation analogue of arrow::ArrayData +class ARROW_EXPORT KeyColumnArray { + public: + /// \brief Create an uninitialized KeyColumnArray + KeyColumnArray() = default; + /// \brief Create a read-only view from buffers + /// + /// This is a view only and does not take ownership of the buffers. The lifetime + /// of the buffers must exceed the lifetime of this view + KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, + const uint8_t* validity_buffer, const uint8_t* fixed_length_buffer, + const uint8_t* var_length_buffer, int bit_offset_validity = 0, + int bit_offset_fixed = 0); + /// \brief Create a mutable view from buffers + /// + /// This is a view only and does not take ownership of the buffers. The lifetime + /// of the buffers must exceed the lifetime of this view + KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, + uint8_t* validity_buffer, uint8_t* fixed_length_buffer, + uint8_t* var_length_buffer, int bit_offset_validity = 0, + int bit_offset_fixed = 0); + /// \brief Create a sliced view of `this` + /// + /// The number of rows used in offset must be divisible by 8 + /// in order to not split bit vectors within a single byte. + KeyColumnArray Slice(int64_t offset, int64_t length) const; + /// \brief Create a copy of `this` with a buffer from `other` + /// + /// The copy will be identical to `this` except the buffer at buffer_id_to_replace + /// will be replaced by the corresponding buffer in `other`. + KeyColumnArray WithBufferFrom(const KeyColumnArray& other, + int buffer_id_to_replace) const; + + /// \brief Create a copy of `this` with new metadata + KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const; + + // Constants used for accessing buffers using data() and mutable_data(). + static constexpr int kValidityBuffer = 0; + static constexpr int kFixedLengthBuffer = 1; + static constexpr int kVariableLengthBuffer = 2; + + /// \brief Return one of the underlying mutable buffers + uint8_t* mutable_data(int i) { + ARROW_DCHECK(i >= 0 && i <= kMaxBuffers); + return mutable_buffers_[i]; + } + /// \brief Return one of the underlying read-only buffers + const uint8_t* data(int i) const { + ARROW_DCHECK(i >= 0 && i <= kMaxBuffers); + return buffers_[i]; + } + /// \brief Return a mutable version of the offsets buffer + /// + /// Only valid if this is a view into a varbinary type + uint32_t* mutable_offsets() { + DCHECK(!metadata_.is_fixed_length); + return reinterpret_cast(mutable_data(kFixedLengthBuffer)); + } + /// \brief Return a read-only version of the offsets buffer + /// + /// Only valid if this is a view into a varbinary type + const uint32_t* offsets() const { + DCHECK(!metadata_.is_fixed_length); + return reinterpret_cast(data(kFixedLengthBuffer)); + } + /// \brief Return the type metadata + const KeyColumnMetadata& metadata() const { return metadata_; } + /// \brief Return the length (in rows) of the array + int64_t length() const { return length_; } + /// \brief Return the bit offset into the corresponding vector + /// + /// if i == 1 then this must be a bool array + int bit_offset(int i) const { + ARROW_DCHECK(i >= 0 && i < kMaxBuffers); + return bit_offset_[i]; + } + + private: + static constexpr int kMaxBuffers = 3; + const uint8_t* buffers_[kMaxBuffers]; + uint8_t* mutable_buffers_[kMaxBuffers]; + KeyColumnMetadata metadata_; + int64_t length_; + // Starting bit offset within the first byte (between 0 and 7) + // to be used when accessing buffers that store bit vectors. + int bit_offset_[kMaxBuffers - 1]; +}; + +/// \brief Create KeyColumnMetadata from a DataType +/// +/// If `type` is a dictionary type then this will return the KeyColumnMetadata for +/// the indices type +/// +/// This should only be called on "key" columns. Calling this with +/// a non-key column will return Status::TypeError. +ARROW_EXPORT Result ColumnMetadataFromDataType( + const std::shared_ptr& type); + +/// \brief Create KeyColumnArray from ArrayData +/// +/// If `type` is a dictionary type then this will return the KeyColumnArray for +/// the indices array +/// +/// The caller should ensure this is only called on "key" columns. +/// \see ColumnMetadataFromDataType for details +ARROW_EXPORT Result ColumnArrayFromArrayData( + const std::shared_ptr& array_data, int start_row, int num_rows); + +/// \brief Create KeyColumnMetadata instances from an ExecBatch +/// +/// column_metadatas will be resized to fit +/// +/// All columns in `batch` must be eligible "key" columns and have an array shape +/// \see ColumnMetadataFromDataType for more details +ARROW_EXPORT Status ColumnMetadatasFromExecBatch( + const ExecBatch& batch, std::vector* column_metadatas); + +/// \brief Create KeyColumnArray instances from a slice of an ExecBatch +/// +/// column_arrays will be resized to fit +/// +/// All columns in `batch` must be eligible "key" columns and have an array shape +/// \see ColumnArrayFromArrayData for more details +ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, + int num_rows, + std::vector* column_arrays); + +/// \brief Create KeyColumnArray instances from an ExecBatch +/// +/// column_arrays will be resized to fit +/// +/// All columns in `batch` must be eligible "key" columns and have an array shape +/// \see ColumnArrayFromArrayData for more details +ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch, + std::vector* column_arrays); + +/// A lightweight resizable array for "key" columns +/// +/// Unlike KeyColumnArray this instance owns its buffers +/// +/// Resizing is handled by arrow::ResizableBuffer and a doubling approach is +/// used so that resizes will always grow up to the next power of 2 +class ARROW_EXPORT ResizableArrayData { + public: + /// \brief Create an uninitialized instance + /// + /// Init must be called before calling any other operations + ResizableArrayData() + : log_num_rows_min_(0), + pool_(NULLPTR), + num_rows_(0), + num_rows_allocated_(0), + var_len_buf_size_(0) {} + + ~ResizableArrayData() { Clear(true); } + + /// \brief Initialize the array + /// \param data_type The data type this array is holding data for. + /// \param pool The pool to make allocations on + /// \param log_num_rows_min All resize operations will allocate at least enough + /// space for (1 << log_num_rows_min) rows + void Init(const std::shared_ptr& data_type, MemoryPool* pool, + int log_num_rows_min); + + /// \brief Resets the array back to an empty state + /// \param release_buffers If true then allocated memory is released and the + /// next resize operation will have to reallocate memory + void Clear(bool release_buffers); + + /// \brief Resize the fixed length buffers + /// + /// The buffers will be resized to hold at least `num_rows_new` rows of data + Status ResizeFixedLengthBuffers(int num_rows_new); + + /// \brief Resize the varying length buffer if this array is a variable binary type + /// + /// This must be called after offsets have been populated and the buffer will be + /// resized to hold at least as much data as the offsets require + /// + /// Does nothing if the array is not a variable binary type + Status ResizeVaryingLengthBuffer(); + + /// \brief The current length (in rows) of the array + int num_rows() const { return num_rows_; } + + /// \brief A non-owning view into this array + KeyColumnArray column_array() const; + + /// \brief A lightweight descriptor of the data held by this array + Result column_metadata() const { + return ColumnMetadataFromDataType(data_type_); + } + + /// \brief Convert the data to an arrow::ArrayData + /// + /// This is a zero copy operation and the created ArrayData will reference the + /// buffers held by this instance. + std::shared_ptr array_data() const; + + // Constants used for accessing buffers using mutable_data(). + static constexpr int kValidityBuffer = 0; + static constexpr int kFixedLengthBuffer = 1; + static constexpr int kVariableLengthBuffer = 2; + + /// \brief A raw pointer to the requested buffer + /// + /// If i is 0 (kValidityBuffer) then this returns the validity buffer + /// If i is 1 (kFixedLengthBuffer) then this returns the buffer used for values (if this + /// is a fixed + /// length data type) or offsets (if this is a variable binary type) + /// If i is 2 (kVariableLengthBuffer) then this returns the buffer used for variable + /// length binary data + uint8_t* mutable_data(int i) { return buffers_[i]->mutable_data(); } + + private: + static constexpr int64_t kNumPaddingBytes = 64; + int log_num_rows_min_; + std::shared_ptr data_type_; + MemoryPool* pool_; + int num_rows_; + int num_rows_allocated_; + int var_len_buf_size_; + static constexpr int kMaxBuffers = 3; + std::shared_ptr buffers_[kMaxBuffers]; +}; + +/// \brief A builder to concatenate batches of data into a larger batch +/// +/// Will only store num_rows_max() rows +class ARROW_EXPORT ExecBatchBuilder { + public: + /// \brief Add rows from `source` into `target` column + /// + /// If `target` is uninitialized or cleared it will be initialized to use + /// the given pool. + static Status AppendSelected(const std::shared_ptr& source, + ResizableArrayData* target, int num_rows_to_append, + const uint16_t* row_ids, MemoryPool* pool); + + /// \brief Add nulls into `target` column + /// + /// If `target` is uninitialized or cleared it will be initialized to use + /// the given pool. + static Status AppendNulls(const std::shared_ptr& type, + ResizableArrayData& target, int num_rows_to_append, + MemoryPool* pool); + + /// \brief Add selected rows from `batch` + /// + /// If `col_ids` is null then `num_cols` should less than batch.num_values() and + /// the first `num_cols` columns of batch will be appended. + /// + /// All columns in `batch` must have array shape + Status AppendSelected(MemoryPool* pool, const ExecBatch& batch, int num_rows_to_append, + const uint16_t* row_ids, int num_cols, + const int* col_ids = NULLPTR); + + /// \brief Add all-null rows + Status AppendNulls(MemoryPool* pool, + const std::vector>& types, + int num_rows_to_append); + + /// \brief Create an ExecBatch with the data that has been appended so far + /// and clear this builder to be used again + /// + /// Should only be called if num_rows() returns non-zero. + ExecBatch Flush(); + + int num_rows() const { return values_.empty() ? 0 : values_[0].num_rows(); } + + static int num_rows_max() { return 1 << kLogNumRows; } + + private: + static constexpr int kLogNumRows = 15; + + // Calculate how many rows to skip from the tail of the + // sequence of selected rows, such that the total size of skipped rows is at + // least equal to the size specified by the caller. + // + // Skipping of the tail rows + // is used to allow for faster processing by the caller of remaining rows + // without checking buffer bounds (useful with SIMD or fixed size memory loads + // and stores). + // + // The sequence of row_ids provided must be non-decreasing. + // + static int NumRowsToSkip(const std::shared_ptr& column, int num_rows, + const uint16_t* row_ids, int num_tail_bytes_to_skip); + + // The supplied lambda will be called for each row in the given list of rows. + // The arguments given to it will be: + // - index of a row (within the set of selected rows), + // - pointer to the value, + // - byte length of the value. + // + // The information about nulls (validity bitmap) is not used in this call and + // has to be processed separately. + // + template + static void Visit(const std::shared_ptr& column, int num_rows, + const uint16_t* row_ids, PROCESS_VALUE_FN process_value_fn); + + template + static void CollectBitsImp(const uint8_t* input_bits, int64_t input_bits_offset, + uint8_t* output_bits, int64_t output_bits_offset, + int num_rows, const uint16_t* row_ids); + static void CollectBits(const uint8_t* input_bits, int64_t input_bits_offset, + uint8_t* output_bits, int64_t output_bits_offset, int num_rows, + const uint16_t* row_ids); + + std::vector values_; +}; + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/light_array_test.cc b/cpp/src/arrow/compute/light_array_test.cc new file mode 100644 index 00000000000..3f6d4780352 --- /dev/null +++ b/cpp/src/arrow/compute/light_array_test.cc @@ -0,0 +1,481 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/light_array.h" + +#include + +#include "arrow/compute/exec/test_util.h" +#include "arrow/testing/generator.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" +#include "arrow/util/checked_cast.h" + +namespace arrow { +namespace compute { + +const std::vector> kSampleFixedDataTypes = { + int8(), int16(), int32(), int64(), uint8(), + uint16(), uint32(), uint64(), decimal128(38, 6), decimal256(76, 6)}; +const std::vector> kSampleBinaryTypes = {utf8(), binary()}; + +TEST(KeyColumnMetadata, FromDataType) { + KeyColumnMetadata metadata = ColumnMetadataFromDataType(boolean()).ValueOrDie(); + ASSERT_EQ(0, metadata.fixed_length); + ASSERT_EQ(true, metadata.is_fixed_length); + ASSERT_EQ(false, metadata.is_null_type); + + metadata = ColumnMetadataFromDataType(null()).ValueOrDie(); + ASSERT_EQ(true, metadata.is_null_type); + + for (const auto& type : kSampleFixedDataTypes) { + int byte_width = + arrow::internal::checked_pointer_cast(type)->bit_width() / 8; + metadata = ColumnMetadataFromDataType(type).ValueOrDie(); + ASSERT_EQ(byte_width, metadata.fixed_length); + ASSERT_EQ(true, metadata.is_fixed_length); + ASSERT_EQ(false, metadata.is_null_type); + } + + for (const auto& type : {binary(), utf8()}) { + metadata = ColumnMetadataFromDataType(type).ValueOrDie(); + ASSERT_EQ(4, metadata.fixed_length); + ASSERT_EQ(false, metadata.is_fixed_length); + ASSERT_EQ(false, metadata.is_null_type); + } + + for (const auto& type : {large_binary(), large_utf8()}) { + metadata = ColumnMetadataFromDataType(type).ValueOrDie(); + ASSERT_EQ(8, metadata.fixed_length); + ASSERT_EQ(false, metadata.is_fixed_length); + ASSERT_EQ(false, metadata.is_null_type); + } +} + +TEST(KeyColumnArray, FromArrayData) { + for (const auto& type : kSampleFixedDataTypes) { + ARROW_SCOPED_TRACE("Type: ", type->ToString()); + // `array_offset` is the offset of the source array (e.g. if we are given a sliced + // source array) while `offset` is the offset we pass when constructing the + // KeyColumnArray + for (auto array_offset : {0, 1}) { + ARROW_SCOPED_TRACE("Array offset: ", array_offset); + for (auto offset : {0, 1}) { + ARROW_SCOPED_TRACE("Constructor offset: ", offset); + std::shared_ptr array; + int byte_width = + arrow::internal::checked_pointer_cast(type)->bit_width() / 8; + if (is_decimal(type->id())) { + array = ArrayFromJSON(type, R"(["1.123123", "2.123123", null])"); + } else { + array = ArrayFromJSON(type, "[1, 2, null]"); + } + array = array->Slice(array_offset); + int length = static_cast(array->length()) - offset - array_offset; + int buffer_offset_bytes = (offset + array_offset) * byte_width; + KeyColumnArray kc_array = + ColumnArrayFromArrayData(array->data(), offset, length).ValueOrDie(); + // Maximum tested offset is < 8 so validity is just bit offset + ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0)); + ASSERT_EQ(0, kc_array.bit_offset(1)); + ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0)); + ASSERT_EQ(array->data()->buffers[1]->data() + buffer_offset_bytes, + kc_array.data(1)); + ASSERT_EQ(nullptr, kc_array.data(2)); + ASSERT_EQ(length, kc_array.length()); + // When creating from ArrayData always create read-only + ASSERT_EQ(nullptr, kc_array.mutable_data(0)); + ASSERT_EQ(nullptr, kc_array.mutable_data(1)); + ASSERT_EQ(nullptr, kc_array.mutable_data(2)); + } + } + } +} + +TEST(KeyColumnArray, FromArrayDataBinary) { + for (const auto& type : kSampleBinaryTypes) { + ARROW_SCOPED_TRACE("Type: ", type->ToString()); + for (auto array_offset : {0, 1}) { + ARROW_SCOPED_TRACE("Array offset: ", array_offset); + for (auto offset : {0, 1}) { + ARROW_SCOPED_TRACE("Constructor offset: ", offset); + std::shared_ptr array = ArrayFromJSON(type, R"(["xyz", "abcabc", null])"); + int offsets_width = + static_cast(arrow::internal::checked_pointer_cast(type) + ->layout() + .buffers[1] + .byte_width); + array = array->Slice(array_offset); + int length = static_cast(array->length()) - offset - array_offset; + int buffer_offset_bytes = (offset + array_offset) * offsets_width; + KeyColumnArray kc_array = + ColumnArrayFromArrayData(array->data(), offset, length).ValueOrDie(); + ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0)); + ASSERT_EQ(0, kc_array.bit_offset(1)); + ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0)); + ASSERT_EQ(array->data()->buffers[1]->data() + buffer_offset_bytes, + kc_array.data(1)); + ASSERT_EQ(array->data()->buffers[2]->data(), kc_array.data(2)); + ASSERT_EQ(length, kc_array.length()); + // When creating from ArrayData always create read-only + ASSERT_EQ(nullptr, kc_array.mutable_data(0)); + ASSERT_EQ(nullptr, kc_array.mutable_data(1)); + ASSERT_EQ(nullptr, kc_array.mutable_data(2)); + } + } + } +} + +TEST(KeyColumnArray, FromArrayDataBool) { + for (auto array_offset : {0, 1}) { + ARROW_SCOPED_TRACE("Array offset: ", array_offset); + for (auto offset : {0, 1}) { + ARROW_SCOPED_TRACE("Constructor offset: ", offset); + std::shared_ptr array = ArrayFromJSON(boolean(), "[true, false, null]"); + array = array->Slice(array_offset); + int length = static_cast(array->length()) - offset - array_offset; + KeyColumnArray kc_array = + ColumnArrayFromArrayData(array->data(), offset, length).ValueOrDie(); + ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0)); + ASSERT_EQ(offset + array_offset, kc_array.bit_offset(1)); + ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0)); + ASSERT_EQ(array->data()->buffers[1]->data(), kc_array.data(1)); + ASSERT_EQ(length, kc_array.length()); + ASSERT_EQ(nullptr, kc_array.mutable_data(0)); + ASSERT_EQ(nullptr, kc_array.mutable_data(1)); + } + } +} + +TEST(KeyColumnArray, Slice) { + constexpr int kValuesByteLength = 128; + // Size needed for validity depends on byte_width but 16 will always be big enough + constexpr int kValidityByteLength = 16; + uint8_t validity_buffer[kValidityByteLength]; + uint8_t values_buffer[kValuesByteLength]; + for (auto byte_width : {2, 4}) { + ARROW_SCOPED_TRACE("Byte Width: ", byte_width); + int64_t length = kValuesByteLength / byte_width; + KeyColumnMetadata metadata(true, byte_width); + KeyColumnArray array(metadata, length, validity_buffer, values_buffer, nullptr); + + for (int offset : {0, 4, 12}) { + ARROW_SCOPED_TRACE("Offset: ", offset); + for (int length : {0, 4}) { + ARROW_SCOPED_TRACE("Length: ", length); + KeyColumnArray sliced = array.Slice(offset, length); + int expected_validity_bit_offset = (offset == 0) ? 0 : 4; + int expected_validity_byte_offset = (offset == 12) ? 1 : 0; + int expected_values_byte_offset = byte_width * offset; + ASSERT_EQ(expected_validity_bit_offset, sliced.bit_offset(0)); + ASSERT_EQ(0, sliced.bit_offset(1)); + ASSERT_EQ(validity_buffer + expected_validity_byte_offset, + sliced.mutable_data(0)); + ASSERT_EQ(values_buffer + expected_values_byte_offset, sliced.mutable_data(1)); + } + } + } +} + +TEST(KeyColumnArray, SliceBool) { + constexpr int kValuesByteLength = 2; + constexpr int kValidityByteLength = 2; + uint8_t validity_buffer[kValidityByteLength]; + uint8_t values_buffer[kValuesByteLength]; + int length = 16; + KeyColumnMetadata metadata(true, /*byte_width=*/0); + KeyColumnArray array(metadata, length, validity_buffer, values_buffer, nullptr); + + for (int offset : {0, 4, 12}) { + ARROW_SCOPED_TRACE("Offset: ", offset); + for (int length : {0, 4}) { + ARROW_SCOPED_TRACE("Length: ", length); + KeyColumnArray sliced = array.Slice(offset, length); + int expected_bit_offset = (offset == 0) ? 0 : 4; + int expected_byte_offset = (offset == 12) ? 1 : 0; + ASSERT_EQ(expected_bit_offset, sliced.bit_offset(0)); + ASSERT_EQ(expected_bit_offset, sliced.bit_offset(1)); + ASSERT_EQ(validity_buffer + expected_byte_offset, sliced.mutable_data(0)); + ASSERT_EQ(values_buffer + expected_byte_offset, sliced.mutable_data(1)); + } + } +} + +TEST(KeyColumnArray, FromExecBatch) { + ExecBatch batch = + ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); + std::vector arrays; + ASSERT_OK(ColumnArraysFromExecBatch(batch, &arrays)); + + ASSERT_EQ(2, arrays.size()); + ASSERT_EQ(8, arrays[0].metadata().fixed_length); + ASSERT_EQ(0, arrays[1].metadata().fixed_length); + ASSERT_EQ(3, arrays[0].length()); + ASSERT_EQ(3, arrays[1].length()); + + ASSERT_OK(ColumnArraysFromExecBatch(batch, 1, 1, &arrays)); + + ASSERT_EQ(2, arrays.size()); + ASSERT_EQ(8, arrays[0].metadata().fixed_length); + ASSERT_EQ(0, arrays[1].metadata().fixed_length); + ASSERT_EQ(1, arrays[0].length()); + ASSERT_EQ(1, arrays[1].length()); +} + +TEST(ResizableArrayData, Basic) { + std::unique_ptr pool = MemoryPool::CreateDefault(); + for (const auto& type : kSampleFixedDataTypes) { + ARROW_SCOPED_TRACE("Type: ", type->ToString()); + int byte_width = + arrow::internal::checked_pointer_cast(type)->bit_width() / 8; + { + ResizableArrayData array; + array.Init(type, pool.get(), /*log_num_rows_min=*/16); + ASSERT_EQ(0, array.num_rows()); + ASSERT_OK(array.ResizeFixedLengthBuffers(2)); + ASSERT_EQ(2, array.num_rows()); + // Even though we are only asking for 2 rows we specified a rather high + // log_num_rows_min so it should allocate at least that many rows. Padding + // and rounding up to a power of 2 will make the allocations larger. + int min_bytes_needed_for_values = byte_width * (1 << 16); + int min_bytes_needed_for_validity = (1 << 16) / 8; + int min_bytes_needed = min_bytes_needed_for_values + min_bytes_needed_for_validity; + ASSERT_LT(min_bytes_needed, pool->bytes_allocated()); + ASSERT_GT(min_bytes_needed * 2, pool->bytes_allocated()); + + ASSERT_OK(array.ResizeFixedLengthBuffers(1 << 17)); + ASSERT_LT(min_bytes_needed * 2, pool->bytes_allocated()); + ASSERT_GT(min_bytes_needed * 4, pool->bytes_allocated()); + ASSERT_EQ(1 << 17, array.num_rows()); + + // Shrinking array won't shrink allocated RAM + ASSERT_OK(array.ResizeFixedLengthBuffers(2)); + ASSERT_LT(min_bytes_needed * 2, pool->bytes_allocated()); + ASSERT_GT(min_bytes_needed * 4, pool->bytes_allocated()); + ASSERT_EQ(2, array.num_rows()); + } + // After array is destroyed buffers should be freed + ASSERT_EQ(0, pool->bytes_allocated()); + } +} + +TEST(ResizableArrayData, Binary) { + std::unique_ptr pool = MemoryPool::CreateDefault(); + for (const auto& type : kSampleBinaryTypes) { + ARROW_SCOPED_TRACE("Type: ", type->ToString()); + { + ResizableArrayData array; + array.Init(type, pool.get(), /*log_num_rows_min=*/4); + ASSERT_EQ(0, array.num_rows()); + ASSERT_OK(array.ResizeFixedLengthBuffers(2)); + ASSERT_EQ(2, array.num_rows()); + // At this point the offets memory has been allocated and needs to be filled + // in before we allocate the variable length memory + int offsets_width = + static_cast(arrow::internal::checked_pointer_cast(type) + ->layout() + .buffers[1] + .byte_width); + if (offsets_width == 4) { + auto offsets = reinterpret_cast(array.mutable_data(1)); + offsets[0] = 0; + offsets[1] = 1000; + offsets[2] = 2000; + } else if (offsets_width == 8) { + auto offsets = reinterpret_cast(array.mutable_data(1)); + offsets[0] = 0; + offsets[1] = 1000; + offsets[2] = 2000; + } else { + FAIL() << "Unexpected offsets_width: " << offsets_width; + } + ASSERT_OK(array.ResizeVaryingLengthBuffer()); + // Each string is 1000 bytes. The offsets, padding, etc. should be less than 1000 + // bytes + ASSERT_LT(2000, pool->bytes_allocated()); + ASSERT_GT(3000, pool->bytes_allocated()); + } + // After array is destroyed buffers should be freed + ASSERT_EQ(0, pool->bytes_allocated()); + } +} + +TEST(ExecBatchBuilder, AppendBatches) { + std::unique_ptr owned_pool = MemoryPool::CreateDefault(); + MemoryPool* pool = owned_pool.get(); + ExecBatch batch_one = + ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); + ExecBatch batch_two = + ExecBatchFromJSON({int64(), boolean()}, "[[null, true], [5, true], [6, false]]"); + ExecBatch combined = ExecBatchFromJSON( + {int64(), boolean()}, + "[[1, true], [2, false], [null, null], [null, true], [5, true], [6, false]]"); + { + ExecBatchBuilder builder; + uint16_t row_ids[3] = {0, 1, 2}; + ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/2)); + ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/2)); + ExecBatch built = builder.Flush(); + ASSERT_EQ(combined, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + ASSERT_EQ(0, pool->bytes_allocated()); +} + +TEST(ExecBatchBuilder, AppendBatchesSomeRows) { + std::unique_ptr owned_pool = MemoryPool::CreateDefault(); + MemoryPool* pool = owned_pool.get(); + ExecBatch batch_one = + ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); + ExecBatch batch_two = + ExecBatchFromJSON({int64(), boolean()}, "[[null, true], [5, true], [6, false]]"); + ExecBatch combined = ExecBatchFromJSON( + {int64(), boolean()}, "[[1, true], [2, false], [null, true], [5, true]]"); + { + ExecBatchBuilder builder; + uint16_t row_ids[2] = {0, 1}; + ASSERT_OK(builder.AppendSelected(pool, batch_one, 2, row_ids, /*num_cols=*/2)); + ASSERT_OK(builder.AppendSelected(pool, batch_two, 2, row_ids, /*num_cols=*/2)); + ExecBatch built = builder.Flush(); + ASSERT_EQ(combined, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + ASSERT_EQ(0, pool->bytes_allocated()); +} + +TEST(ExecBatchBuilder, AppendBatchesSomeCols) { + std::unique_ptr owned_pool = MemoryPool::CreateDefault(); + MemoryPool* pool = owned_pool.get(); + ExecBatch batch_one = + ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); + ExecBatch batch_two = + ExecBatchFromJSON({int64(), boolean()}, "[[null, true], [5, true], [6, false]]"); + ExecBatch first_col_only = + ExecBatchFromJSON({int64()}, "[[1], [2], [null], [null], [5], [6]]"); + ExecBatch last_col_only = ExecBatchFromJSON( + {boolean()}, "[[true], [false], [null], [true], [true], [false]]"); + { + ExecBatchBuilder builder; + uint16_t row_ids[3] = {0, 1, 2}; + int first_col_ids[1] = {0}; + ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/1, + first_col_ids)); + ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/1, + first_col_ids)); + ExecBatch built = builder.Flush(); + ASSERT_EQ(first_col_only, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + { + ExecBatchBuilder builder; + uint16_t row_ids[3] = {0, 1, 2}; + // If we don't specify col_ids and num_cols is 1 it is implicitly the first col + ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/1)); + ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/1)); + ExecBatch built = builder.Flush(); + ASSERT_EQ(first_col_only, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + { + ExecBatchBuilder builder; + uint16_t row_ids[3] = {0, 1, 2}; + int last_col_ids[1] = {1}; + ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/1, + last_col_ids)); + ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/1, + last_col_ids)); + ExecBatch built = builder.Flush(); + ASSERT_EQ(last_col_only, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + ASSERT_EQ(0, pool->bytes_allocated()); +} + +TEST(ExecBatchBuilder, AppendNulls) { + std::unique_ptr owned_pool = MemoryPool::CreateDefault(); + MemoryPool* pool = owned_pool.get(); + ExecBatch batch_one = + ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]"); + ExecBatch combined = ExecBatchFromJSON( + {int64(), boolean()}, + "[[1, true], [2, false], [null, null], [null, null], [null, null]]"); + ExecBatch just_nulls = + ExecBatchFromJSON({int64(), boolean()}, "[[null, null], [null, null]]"); + { + ExecBatchBuilder builder; + uint16_t row_ids[3] = {0, 1, 2}; + ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/2)); + ASSERT_OK(builder.AppendNulls(pool, {int64(), boolean()}, 2)); + ExecBatch built = builder.Flush(); + ASSERT_EQ(combined, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + { + ExecBatchBuilder builder; + ASSERT_OK(builder.AppendNulls(pool, {int64(), boolean()}, 2)); + ExecBatch built = builder.Flush(); + ASSERT_EQ(just_nulls, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + ASSERT_EQ(0, pool->bytes_allocated()); +} + +TEST(ExecBatchBuilder, AppendNullsBeyondLimit) { + std::unique_ptr owned_pool = MemoryPool::CreateDefault(); + int num_rows_max = ExecBatchBuilder::num_rows_max(); + MemoryPool* pool = owned_pool.get(); + { + ExecBatchBuilder builder; + ASSERT_OK(builder.AppendNulls(pool, {int64(), boolean()}, 10)); + ASSERT_RAISES(CapacityError, + builder.AppendNulls(pool, {int64(), boolean()}, num_rows_max + 1 - 10)); + ExecBatch built = builder.Flush(); + ASSERT_EQ(10, built.length); + ASSERT_NE(0, pool->bytes_allocated()); + } + ASSERT_EQ(0, pool->bytes_allocated()); +} + +TEST(ExecBatchBuilder, AppendValuesBeyondLimit) { + std::unique_ptr owned_pool = MemoryPool::CreateDefault(); + MemoryPool* pool = owned_pool.get(); + int num_rows_max = ExecBatchBuilder::num_rows_max(); + std::shared_ptr values = ConstantArrayGenerator::Int32(num_rows_max + 1); + std::shared_ptr trimmed_values = ConstantArrayGenerator::Int32(10); + ExecBatch batch({values}, num_rows_max + 1); + ExecBatch trimmed_batch({trimmed_values}, 10); + std::vector first_set_row_ids(10); + std::iota(first_set_row_ids.begin(), first_set_row_ids.end(), 0); + std::vector second_set_row_ids(num_rows_max + 1 - 10); + std::iota(second_set_row_ids.begin(), second_set_row_ids.end(), 10); + { + ExecBatchBuilder builder; + ASSERT_OK(builder.AppendSelected(pool, batch, 10, first_set_row_ids.data(), + /*num_cols=*/1)); + ASSERT_RAISES(CapacityError, + builder.AppendSelected(pool, batch, num_rows_max + 1 - 10, + second_set_row_ids.data(), + /*num_cols=*/1)); + ExecBatch built = builder.Flush(); + ASSERT_EQ(trimmed_batch, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + ASSERT_EQ(0, pool->bytes_allocated()); +} + +} // namespace compute +} // namespace arrow