Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ if(ARROW_COMPUTE)
compute/api_vector.cc
compute/cast.cc
compute/exec.cc
compute/exec/aggregate.cc
compute/exec/aggregate_node.cc
compute/exec/bloom_filter.cc
compute/exec/exec_plan.cc
Expand All @@ -389,8 +390,6 @@ if(ARROW_COMPUTE)
compute/exec/hash_join_dict.cc
compute/exec/hash_join_node.cc
compute/exec/ir_consumer.cc
compute/exec/key_compare.cc
compute/exec/key_encode.cc
compute/exec/key_hash.cc
compute/exec/key_map.cc
compute/exec/order_by_impl.cc
Expand Down Expand Up @@ -442,17 +441,21 @@ if(ARROW_COMPUTE)
compute/kernels/vector_nested.cc
compute/kernels/vector_replace.cc
compute/kernels/vector_selection.cc
compute/kernels/vector_sort.cc)
compute/kernels/vector_sort.cc
compute/row/encode_internal.cc
compute/row/compare_internal.cc
compute/row/grouper.cc
compute/row/row_internal.cc)

append_avx2_src(compute/kernels/aggregate_basic_avx2.cc)
append_avx512_src(compute/kernels/aggregate_basic_avx512.cc)

append_avx2_src(compute/exec/bloom_filter_avx2.cc)
append_avx2_src(compute/exec/key_compare_avx2.cc)
append_avx2_src(compute/exec/key_encode_avx2.cc)
append_avx2_src(compute/exec/key_hash_avx2.cc)
append_avx2_src(compute/exec/key_map_avx2.cc)
append_avx2_src(compute/exec/util_avx2.cc)
append_avx2_src(compute/row/compare_internal_avx2.cc)
append_avx2_src(compute/row/encode_internal_avx2.cc)

list(APPEND ARROW_TESTING_SRCS compute/exec/test_util.cc)
endif()
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,4 @@ add_arrow_benchmark(function_benchmark PREFIX "arrow-compute")
add_subdirectory(kernels)

add_subdirectory(exec)
add_subdirectory(row)
6 changes: 6 additions & 0 deletions cpp/src/arrow/compute/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,9 @@
/// @}

#include "arrow/compute/exec/options.h" // IWYU pragma: export

/// \defgroup execnode-row Utilities for working with data in a row-major format
/// @{
/// @}

#include "arrow/compute/row/grouper.h" // IWYU pragma: export
85 changes: 0 additions & 85 deletions cpp/src/arrow/compute/api_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -395,84 +395,6 @@ Result<Datum> Index(const Datum& value, const IndexOptions& options,

namespace internal {

/// Internal use only: streaming group identifier.
/// Consumes batches of keys and yields batches of the group ids.
class ARROW_EXPORT Grouper {
public:
virtual ~Grouper() = default;

/// Construct a Grouper which receives the specified key types
static Result<std::unique_ptr<Grouper>> Make(const std::vector<ValueDescr>& descrs,
ExecContext* ctx = default_exec_context());

/// Consume a batch of keys, producing the corresponding group ids as an integer array.
/// Currently only uint32 indices will be produced, eventually the bit width will only
/// be as wide as necessary.
virtual Result<Datum> Consume(const ExecBatch& batch) = 0;

/// Get current unique keys. May be called multiple times.
virtual Result<ExecBatch> GetUniques() = 0;

/// Get the current number of groups.
virtual uint32_t num_groups() const = 0;

/// \brief Assemble lists of indices of identical elements.
///
/// \param[in] ids An unsigned, all-valid integral array which will be
/// used as grouping criteria.
/// \param[in] num_groups An upper bound for the elements of ids
/// \return A num_groups-long ListArray where the slot at i contains a
/// list of indices where i appears in ids.
///
/// MakeGroupings([
/// 2,
/// 2,
/// 5,
/// 5,
/// 2,
/// 3
/// ], 8) == [
/// [],
/// [],
/// [0, 1, 4],
/// [5],
/// [],
/// [2, 3],
/// [],
/// []
/// ]
static Result<std::shared_ptr<ListArray>> MakeGroupings(
const UInt32Array& ids, uint32_t num_groups,
ExecContext* ctx = default_exec_context());

/// \brief Produce a ListArray whose slots are selections of `array` which correspond to
/// the provided groupings.
///
/// For example,
/// ApplyGroupings([
/// [],
/// [],
/// [0, 1, 4],
/// [5],
/// [],
/// [2, 3],
/// [],
/// []
/// ], [2, 2, 5, 5, 2, 3]) == [
/// [],
/// [],
/// [2, 2, 2],
/// [3],
/// [],
/// [5, 5],
/// [],
/// []
/// ]
static Result<std::shared_ptr<ListArray>> ApplyGroupings(
const ListArray& groupings, const Array& array,
ExecContext* ctx = default_exec_context());
};

/// \brief Configure a grouped aggregation
struct ARROW_EXPORT Aggregate {
/// the name of the aggregation function
Expand All @@ -482,13 +404,6 @@ struct ARROW_EXPORT Aggregate {
const FunctionOptions* options;
};

/// Internal use only: helper function for testing HashAggregateKernels.
/// This will be replaced by streaming execution operators.
ARROW_EXPORT
Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Datum>& keys,
const std::vector<Aggregate>& aggregates, bool use_threads = false,
ExecContext* ctx = default_exec_context());

} // namespace internal
} // namespace compute
} // namespace arrow
239 changes: 239 additions & 0 deletions cpp/src/arrow/compute/exec/aggregate.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/compute/exec/aggregate.h"

#include <mutex>

#include "arrow/compute/exec_internal.h"
#include "arrow/compute/registry.h"
#include "arrow/compute/row/grouper.h"
#include "arrow/util/task_group.h"

namespace arrow {
namespace compute {
namespace internal {

Result<std::vector<const HashAggregateKernel*>> GetKernels(
ExecContext* ctx, const std::vector<Aggregate>& aggregates,
const std::vector<ValueDescr>& in_descrs) {
if (aggregates.size() != in_descrs.size()) {
return Status::Invalid(aggregates.size(), " aggregate functions were specified but ",
in_descrs.size(), " arguments were provided.");
}

std::vector<const HashAggregateKernel*> kernels(in_descrs.size());

for (size_t i = 0; i < aggregates.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(auto function,
ctx->func_registry()->GetFunction(aggregates[i].function));
ARROW_ASSIGN_OR_RAISE(
const Kernel* kernel,
function->DispatchExact({in_descrs[i], ValueDescr::Array(uint32())}));
kernels[i] = static_cast<const HashAggregateKernel*>(kernel);
}
return kernels;
}

Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
const std::vector<Aggregate>& aggregates, const std::vector<ValueDescr>& in_descrs) {
std::vector<std::unique_ptr<KernelState>> states(kernels.size());

for (size_t i = 0; i < aggregates.size(); ++i) {
auto options = aggregates[i].options;

if (options == nullptr) {
// use known default options for the named function if possible
auto maybe_function = ctx->func_registry()->GetFunction(aggregates[i].function);
if (maybe_function.ok()) {
options = maybe_function.ValueOrDie()->default_options();
}
}

KernelContext kernel_ctx{ctx};
ARROW_ASSIGN_OR_RAISE(
states[i],
kernels[i]->init(&kernel_ctx, KernelInitArgs{kernels[i],
{
in_descrs[i],
ValueDescr::Array(uint32()),
},
options}));
}

return std::move(states);
}

Result<FieldVector> ResolveKernels(
const std::vector<Aggregate>& aggregates,
const std::vector<const HashAggregateKernel*>& kernels,
const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
const std::vector<ValueDescr>& descrs) {
FieldVector fields(descrs.size());

for (size_t i = 0; i < kernels.size(); ++i) {
KernelContext kernel_ctx{ctx};
kernel_ctx.SetState(states[i].get());

ARROW_ASSIGN_OR_RAISE(auto descr, kernels[i]->signature->out_type().Resolve(
&kernel_ctx, {
descrs[i],
ValueDescr::Array(uint32()),
}));
fields[i] = field(aggregates[i].function, std::move(descr.type));
}
return fields;
}

Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Datum>& keys,
const std::vector<Aggregate>& aggregates, bool use_threads,
ExecContext* ctx) {
auto task_group =
use_threads
? arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool())
: arrow::internal::TaskGroup::MakeSerial();

std::vector<const HashAggregateKernel*> kernels;
std::vector<std::vector<std::unique_ptr<KernelState>>> states;
FieldVector out_fields;

using arrow::compute::detail::ExecBatchIterator;
std::unique_ptr<ExecBatchIterator> argument_batch_iterator;

if (!arguments.empty()) {
ARROW_ASSIGN_OR_RAISE(ExecBatch args_batch, ExecBatch::Make(arguments));

// Construct and initialize HashAggregateKernels
auto argument_descrs = args_batch.GetDescriptors();

ARROW_ASSIGN_OR_RAISE(kernels, GetKernels(ctx, aggregates, argument_descrs));

states.resize(task_group->parallelism());
for (auto& state : states) {
ARROW_ASSIGN_OR_RAISE(state,
InitKernels(kernels, ctx, aggregates, argument_descrs));
}

ARROW_ASSIGN_OR_RAISE(
out_fields, ResolveKernels(aggregates, kernels, states[0], ctx, argument_descrs));

ARROW_ASSIGN_OR_RAISE(
argument_batch_iterator,
ExecBatchIterator::Make(args_batch.values, ctx->exec_chunksize()));
}

// Construct Groupers
ARROW_ASSIGN_OR_RAISE(ExecBatch keys_batch, ExecBatch::Make(keys));
auto key_descrs = keys_batch.GetDescriptors();

std::vector<std::unique_ptr<Grouper>> groupers(task_group->parallelism());
for (auto& grouper : groupers) {
ARROW_ASSIGN_OR_RAISE(grouper, Grouper::Make(key_descrs, ctx));
}

std::mutex mutex;
std::unordered_map<std::thread::id, size_t> thread_ids;

int i = 0;
for (ValueDescr& key_descr : key_descrs) {
out_fields.push_back(field("key_" + std::to_string(i++), std::move(key_descr.type)));
}

ARROW_ASSIGN_OR_RAISE(
auto key_batch_iterator,
ExecBatchIterator::Make(keys_batch.values, ctx->exec_chunksize()));

// start "streaming" execution
ExecBatch key_batch, argument_batch;
while ((argument_batch_iterator == NULLPTR ||
argument_batch_iterator->Next(&argument_batch)) &&
key_batch_iterator->Next(&key_batch)) {
if (key_batch.length == 0) continue;

task_group->Append([&, key_batch, argument_batch] {
size_t thread_index;
{
std::unique_lock<std::mutex> lock(mutex);
auto it = thread_ids.emplace(std::this_thread::get_id(), thread_ids.size()).first;
thread_index = it->second;
DCHECK_LT(static_cast<int>(thread_index), task_group->parallelism());
}

auto grouper = groupers[thread_index].get();

// compute a batch of group ids
ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));

// consume group ids with HashAggregateKernels
for (size_t i = 0; i < kernels.size(); ++i) {
KernelContext batch_ctx{ctx};
batch_ctx.SetState(states[thread_index][i].get());
ARROW_ASSIGN_OR_RAISE(auto batch, ExecBatch::Make({argument_batch[i], id_batch}));
RETURN_NOT_OK(kernels[i]->resize(&batch_ctx, grouper->num_groups()));
RETURN_NOT_OK(kernels[i]->consume(&batch_ctx, batch));
}

return Status::OK();
});
}

RETURN_NOT_OK(task_group->Finish());

// Merge if necessary
for (size_t thread_index = 1; thread_index < thread_ids.size(); ++thread_index) {
ARROW_ASSIGN_OR_RAISE(ExecBatch other_keys, groupers[thread_index]->GetUniques());
ARROW_ASSIGN_OR_RAISE(Datum transposition, groupers[0]->Consume(other_keys));
groupers[thread_index].reset();

for (size_t idx = 0; idx < kernels.size(); ++idx) {
KernelContext batch_ctx{ctx};
batch_ctx.SetState(states[0][idx].get());

RETURN_NOT_OK(kernels[idx]->resize(&batch_ctx, groupers[0]->num_groups()));
RETURN_NOT_OK(kernels[idx]->merge(&batch_ctx, std::move(*states[thread_index][idx]),
*transposition.array()));
states[thread_index][idx].reset();
}
}

// Finalize output
ArrayDataVector out_data(arguments.size() + keys.size());
auto it = out_data.begin();

for (size_t idx = 0; idx < kernels.size(); ++idx) {
KernelContext batch_ctx{ctx};
batch_ctx.SetState(states[0][idx].get());
Datum out;
RETURN_NOT_OK(kernels[idx]->finalize(&batch_ctx, &out));
*it++ = out.array();
}

ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, groupers[0]->GetUniques());
for (const auto& key : out_keys.values) {
*it++ = key.array();
}

int64_t length = out_data[0]->length;
return ArrayData::Make(struct_(std::move(out_fields)), length,
{/*null_bitmap=*/nullptr}, std::move(out_data),
/*null_count=*/0);
}

} // namespace internal
} // namespace compute
} // namespace arrow
Loading