From 4d3187ab791cdfdf25bd33bd9ed8141ac6686e98 Mon Sep 17 00:00:00 2001 From: zhaochangle Date: Tue, 9 Jun 2026 17:02:12 +0800 Subject: [PATCH 1/6] demo --- be/src/core/column/column.h | 9 + be/src/core/column/column_nullable.h | 13 + be/src/core/column/column_vector.cpp | 82 +++--- be/src/core/column/column_vector.h | 234 +++++++++++++++--- .../core/data_type/data_type_number_base.cpp | 6 +- be/src/exec/scan/olap_scanner.cpp | 11 +- be/src/exec/scan/scanner.cpp | 141 +++++++++++ be/src/exec/scan/scanner.h | 10 + .../exprs/aggregate/aggregate_function_sum.h | 56 ++++- be/src/storage/segment/bitshuffle_page.h | 4 +- be/src/storage/segment/page_decoder.h | 13 + be/src/storage/segment/parsed_page.h | 8 +- be/src/storage/segment/plain_page.h | 5 +- 13 files changed, 515 insertions(+), 77 deletions(-) diff --git a/be/src/core/column/column.h b/be/src/core/column/column.h index 6e5b69d907789b..b07ef2afd43bcd 100644 --- a/be/src/core/column/column.h +++ b/be/src/core/column/column.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -245,6 +246,14 @@ class IColumn : public COW { "Method insert_many_fix_len_data is not supported for " + get_name()); } + // Fixed-width page decoders can pass a shared owner for the source memory. Columns that know + // how to read immutable external storage may keep that owner and avoid an immediate memcpy; + // the default implementation preserves the historical eager-copy behavior for all columns. + virtual void insert_many_fix_len_data_with_owner(const char* pos, size_t num, + std::shared_ptr owner) { + insert_many_fix_len_data(pos, num); + } + // todo(zeno) Use dict_args temp object to cover all arguments virtual void insert_many_dict_data(const int32_t* data_array, size_t start_index, const StringRef* dict, size_t data_num, diff --git a/be/src/core/column/column_nullable.h b/be/src/core/column/column_nullable.h index 2161260bf9f1b6..dbc604e3b23085 100644 --- a/be/src/core/column/column_nullable.h +++ b/be/src/core/column/column_nullable.h @@ -20,6 +20,9 @@ #pragma once +#include +#include + #include "common/status.h" #include "core/assert_cast.h" #include "core/column/column.h" @@ -162,6 +165,16 @@ class ColumnNullable final : public COWHelper { get_nested_column().insert_many_fix_len_data(pos, num); } + void insert_many_fix_len_data_with_owner(const char* pos, size_t num, + std::shared_ptr owner) override { + // A nullable column has two physical children: the nested data and the null map. This entry + // point is used only when storage has already proved the appended range contains no NULLs, + // so we append a false null map and may let the nested fixed-width column borrow page + // memory. Ranges with real NULLs are decoded through nullable-specific paths instead. + push_false_to_nullmap(num); + get_nested_column().insert_many_fix_len_data_with_owner(pos, num, std::move(owner)); + } + void insert_many_raw_data(const char* pos, size_t num) override { DCHECK(pos); push_false_to_nullmap(num); diff --git a/be/src/core/column/column_vector.cpp b/be/src/core/column/column_vector.cpp index 502074cc66c0b0..3666d5b4720cca 100644 --- a/be/src/core/column/column_vector.cpp +++ b/be/src/core/column/column_vector.cpp @@ -47,12 +47,14 @@ namespace doris { template size_t ColumnVector::serialize_impl(char* pos, const size_t row) const { - memcpy_fixed(pos, (char*)&data[row]); + auto values = immutable_data(); + memcpy_fixed(pos, reinterpret_cast(&values[row])); return sizeof(value_type); } template size_t ColumnVector::deserialize_impl(const char* pos) { + materialize_external_data(); data.push_back(unaligned_load(pos)); return sizeof(value_type); } @@ -137,23 +139,24 @@ void ColumnVector::deserialize_with_nullable(StringRef* keys, const size_t nu template void ColumnVector::update_hash_with_value(size_t n, SipHash& hash) const { - hash.update(data[n]); + hash.update(immutable_data()[n]); } template void ColumnVector::update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const { - auto s = size(); + auto values = immutable_data(); + auto s = values.size(); if (null_data) { - for (int i = 0; i < s; i++) { + for (size_t i = 0; i < s; i++) { if (null_data[i] == 0) { - hashes[i] = HashUtil::xxHash64WithSeed(reinterpret_cast(&data[i]), + hashes[i] = HashUtil::xxHash64WithSeed(reinterpret_cast(&values[i]), sizeof(value_type), hashes[i]); } } } else { - for (int i = 0; i < s; i++) { - hashes[i] = HashUtil::xxHash64WithSeed(reinterpret_cast(&data[i]), + for (size_t i = 0; i < s; i++) { + hashes[i] = HashUtil::xxHash64WithSeed(reinterpret_cast(&values[i]), sizeof(value_type), hashes[i]); } } @@ -171,15 +174,16 @@ void ColumnVector::compare_internal(size_t rhs_row_id, const IColumn& rhs, int nan_direction_hint, int direction, std::vector& cmp_res, uint8_t* __restrict filter) const { - const auto sz = data.size(); + auto values = immutable_data(); + const auto sz = values.size(); DCHECK(cmp_res.size() == sz); const auto& cmp_base = assert_cast&, TypeCheckOnRelease::DISABLE>(rhs) - .get_data()[rhs_row_id]; + .immutable_data()[rhs_row_id]; size_t begin = simd::find_zero(cmp_res, 0); while (begin < sz) { size_t end = simd::find_one(cmp_res, begin + 1); for (size_t row_id = begin; row_id < end; row_id++) { - auto value_a = data[row_id]; + auto value_a = values[row_id]; int res = value_a > cmp_base ? 1 : (value_a < cmp_base ? -1 : 0); cmp_res[row_id] = (res != 0); filter[row_id] = (res * direction < 0); @@ -190,7 +194,7 @@ void ColumnVector::compare_internal(size_t rhs_row_id, const IColumn& rhs, template Field ColumnVector::operator[](size_t n) const { - return Field::create_field(*(typename PrimitiveTypeTraits::CppType*)(&data[n])); + return Field::create_field(immutable_data()[n]); } template @@ -215,25 +219,27 @@ void ColumnVector::update_crcs_with_value(uint32_t* __restrict hashes, Primit template uint32_t ColumnVector::_zlib_crc32_hash(uint32_t hash, size_t idx) const { + auto values = immutable_data(); if constexpr (is_date_or_datetime(T)) { char buf[64]; - const auto& date_val = (const VecDateTimeValue&)data[idx]; + const auto& date_val = (const VecDateTimeValue&)values[idx]; auto len = date_val.to_buffer(buf); return HashUtil::zlib_crc_hash(buf, len, hash); } else { - return HashUtil::zlib_crc32_fixed(data[idx], hash); + return HashUtil::zlib_crc32_fixed(values[idx], hash); } } template uint32_t ColumnVector::_crc32c_hash(uint32_t hash, size_t idx) const { + auto values = immutable_data(); if constexpr (is_date_or_datetime(T)) { char buf[64]; - const auto& date_val = (const VecDateTimeValue&)data[idx]; + const auto& date_val = (const VecDateTimeValue&)values[idx]; auto len = date_val.to_buffer(buf); return crc32c_extend(hash, (const uint8_t*)buf, len); } else { - return HashUtil::crc32c_fixed(data[idx], hash); + return HashUtil::crc32c_fixed(values[idx], hash); } } @@ -273,29 +279,35 @@ void ColumnVector::update_crc32c_single(size_t start, size_t end, uint32_t& h template struct ColumnVector::less { const Self& parent; + ImmContainer values; int nan_direction_hint; less(const Self& parent_, int nan_direction_hint_) - : parent(parent_), nan_direction_hint(nan_direction_hint_) {} + : parent(parent_), + values(parent_.immutable_data()), + nan_direction_hint(nan_direction_hint_) {} bool operator()(size_t lhs, size_t rhs) const { - return Compare::less(parent.data[lhs], parent.data[rhs]); + return Compare::less(values[lhs], values[rhs]); } }; template struct ColumnVector::greater { const Self& parent; + ImmContainer values; int nan_direction_hint; greater(const Self& parent_, int nan_direction_hint_) - : parent(parent_), nan_direction_hint(nan_direction_hint_) {} + : parent(parent_), + values(parent_.immutable_data()), + nan_direction_hint(nan_direction_hint_) {} bool operator()(size_t lhs, size_t rhs) const { - return Compare::greater(parent.data[lhs], parent.data[rhs]); + return Compare::greater(values[lhs], values[rhs]); } }; template void ColumnVector::get_permutation(bool reverse, size_t limit, int nan_direction_hint, HybridSorter& sorter, IColumn::Permutation& res) const { - size_t s = data.size(); + size_t s = size(); res.resize(s); if (s == 0) return; @@ -330,7 +342,8 @@ MutableColumnPtr ColumnVector::clone_resized(size_t size) const { auto& new_col = *res; size_t count = std::min(this->size(), size); new_col.data.resize(count); - memcpy(new_col.data.data(), data.data(), count * sizeof(data[0])); + auto values = immutable_data(); + memcpy(new_col.data.data(), values.data(), count * sizeof(value_type)); if (size > count) { new_col.insert_many_defaults(size - count); @@ -343,17 +356,19 @@ MutableColumnPtr ColumnVector::clone_resized(size_t size) const { template void ColumnVector::insert_range_from(const IColumn& src, size_t start, size_t length) { const ColumnVector& src_vec = assert_cast(src); - // size_t(start) start > src_vec.data.size() || length > src_vec.data.size() should not be negative which cause overflow - if (start + length > src_vec.data.size()) { + auto src_values = src_vec.immutable_data(); + // size_t(start) start > src_vec.size() || length > src_vec.size() should not be negative which cause overflow + if (start + length > src_values.size()) { throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, "Parameters start = {}, length = {}, are out of bound in " "ColumnVector::insert_range_from method (data.size() = {}).", - start, length, src_vec.data.size()); + start, length, src_values.size()); } + materialize_external_data(); size_t old_size = data.size(); data.resize(old_size + length); - memcpy(data.data() + old_size, &src_vec.data[start], length * sizeof(data[0])); + memcpy(data.data() + old_size, src_values.data() + start, length * sizeof(value_type)); } template @@ -361,6 +376,7 @@ void ColumnVector::insert_indices_from(const IColumn& src, const uint32_t* in const uint32_t* indices_end) { auto origin_size = size(); auto new_size = indices_end - indices_begin; + materialize_external_data(); data.resize(origin_size + new_size); auto copy = [](const value_type* __restrict src, value_type* __restrict dest, @@ -376,7 +392,8 @@ void ColumnVector::insert_indices_from(const IColumn& src, const uint32_t* in template ColumnPtr ColumnVector::filter(const IColumn::Filter& filt, ssize_t result_size_hint) const { - size_t size = data.size(); + auto values = immutable_data(); + size_t size = values.size(); column_match_filter_size(size, filt.size()); auto res = this->create(); @@ -386,7 +403,7 @@ ColumnPtr ColumnVector::filter(const IColumn::Filter& filt, ssize_t result_si const UInt8* filt_pos = filt.data(); const UInt8* filt_end = filt_pos + size; - const value_type* data_pos = data.data(); + const value_type* data_pos = values.data(); /** A slightly more optimized version. * Based on the assumption that often pieces of consecutive values @@ -426,6 +443,7 @@ ColumnPtr ColumnVector::filter(const IColumn::Filter& filt, ssize_t result_si template size_t ColumnVector::filter(const IColumn::Filter& filter) { + materialize_external_data(); size_t size = data.size(); column_match_filter_size(size, filter.size()); @@ -480,15 +498,17 @@ size_t ColumnVector::filter(const IColumn::Filter& filter) { template void ColumnVector::insert_many_from(const IColumn& src, size_t position, size_t length) { + materialize_external_data(); auto old_size = data.size(); data.resize(old_size + length); - auto& vals = assert_cast(src).get_data(); + auto vals = assert_cast(src).immutable_data(); std::fill(&data[old_size], &data[old_size + length], vals[position]); } template MutableColumnPtr ColumnVector::permute(const IColumn::Permutation& perm, size_t limit) const { - size_t size = data.size(); + auto values = immutable_data(); + size_t size = values.size(); if (limit == 0) limit = size; @@ -503,13 +523,14 @@ MutableColumnPtr ColumnVector::permute(const IColumn::Permutation& perm, size auto res = this->create(limit); typename Self::Container& res_data = res->get_data(); - for (size_t i = 0; i < limit; ++i) res_data[i] = data[perm[i]]; + for (size_t i = 0; i < limit; ++i) res_data[i] = values[perm[i]]; return res; } template void ColumnVector::replace_column_null_data(const uint8_t* __restrict null_map) { + materialize_external_data(); auto s = size(); auto value = default_value(); for (size_t i = 0; i < s; ++i) { @@ -520,6 +541,7 @@ void ColumnVector::replace_column_null_data(const uint8_t* __restrict null_ma template void ColumnVector::replace_float_special_values() { if constexpr (is_float_or_double(T)) { + materialize_external_data(); auto s = size(); for (size_t i = 0; i < s; ++i) { NormalizeFloat(data[i]); diff --git a/be/src/core/column/column_vector.h b/be/src/core/column/column_vector.h index d39326fa8c07f3..f94106661c9ee3 100644 --- a/be/src/core/column/column_vector.h +++ b/be/src/core/column/column_vector.h @@ -29,9 +29,12 @@ #include #include #include +#include +#include #include #include #include +#include #include #include "common/compare.h" @@ -85,32 +88,48 @@ class ColumnVector final : public COWHelper> { public: using value_type = typename PrimitiveTypeTraits::CppType; using Container = PaddedPODArray; + using ImmContainer = std::span; private: + struct ExternalDataSpan { + std::shared_ptr owner; + const value_type* data = nullptr; + size_t size = 0; + }; + ColumnVector() = default; explicit ColumnVector(const size_t n) : data(n) {} explicit ColumnVector(const size_t n, const value_type x) : data(n, x) {} - ColumnVector(const ColumnVector& src) : data(src.data.begin(), src.data.end()) {} + ColumnVector(const ColumnVector& src) { + data.reserve(src.size()); + src.for_each_immutable_data_span( + [this](ImmContainer values) { data.insert(values.begin(), values.end()); }); + } /// Sugar constructor. ColumnVector(std::initializer_list il) : data {il} {} public: - size_t size() const override { return data.size(); } + size_t size() const override { return data.size() + _external_size; } StringRef get_data_at(size_t n) const override { - return {reinterpret_cast(&data[n]), sizeof(data[n])}; + auto values = immutable_data(); + return {reinterpret_cast(&values[n]), sizeof(values[n])}; } void insert_from(const IColumn& src, size_t n) override { - data.push_back(assert_cast(src).get_data()[n]); + materialize_external_data(); + data.push_back( + assert_cast(src).immutable_data()[n]); } void insert_data(const char* pos, size_t /*length*/) override { + materialize_external_data(); data.push_back(unaligned_load(pos)); } void insert_many_vals(value_type val, size_t n) { + materialize_external_data(); auto old_size = data.size(); data.resize(old_size + n); std::fill(data.data() + old_size, data.data() + old_size + n, val); @@ -121,6 +140,7 @@ class ColumnVector final : public COWHelper> { void insert_range_of_integer(value_type begin, value_type end) { if constexpr (!is_float_or_double(T) && T != TYPE_TIMEV2 && T != TYPE_TIMESTAMPTZ && !is_date_type(T)) { + materialize_external_data(); auto old_size = data.size(); auto new_size = old_size + static_cast(end - begin); data.resize(new_size); @@ -132,6 +152,7 @@ class ColumnVector final : public COWHelper> { } void insert_date_column(const char* data_ptr, size_t num) { + materialize_external_data(); data.reserve(data.size() + num); constexpr size_t input_value_size = sizeof(uint24_t); @@ -147,6 +168,7 @@ class ColumnVector final : public COWHelper> { } void insert_datetime_column(const char* data_ptr, size_t num) { + materialize_external_data(); data.reserve(data.size() + num); size_t value_size = sizeof(uint64_t); for (int i = 0; i < num; i++) { @@ -170,22 +192,56 @@ class ColumnVector final : public COWHelper> { } } + void insert_many_fix_len_data_with_owner(const char* data_ptr, size_t num, + std::shared_ptr owner) override { + if constexpr (T == TYPE_DATE || T == TYPE_DATETIME) { + // The legacy DATE/DATETIME encodings stored on page are not byte-identical to the + // in-memory VecDateTimeValue layout. They must keep the existing decode-and-convert + // path instead of adopting page memory directly. + insert_many_fix_len_data(data_ptr, num); + return; + } + const bool can_use_external_data = + owner != nullptr && data.empty() && + reinterpret_cast(data_ptr) % alignof(value_type) == 0; + if (can_use_external_data) { + // The page decoder already owns decoded fixed-width values in naturally aligned page + // memory. Keep the page owner and append a read-only span instead of copying it into + // the local PODArray. A large Doris block can cross several storage pages, so this is + // intentionally segmented; consumers that can iterate spans stay zero-copy, while + // legacy consumers that require one contiguous array call immutable_data()/get_data() + // and materialize once. + _append_external_data_span(reinterpret_cast(data_ptr), num, + std::move(owner)); + return; + } + insert_many_fix_len_data(data_ptr, num); + } + void insert_many_raw_data(const char* data_ptr, size_t num) override { DCHECK(data_ptr); + materialize_external_data(); auto old_size = data.size(); data.resize(old_size + num); memcpy(data.data() + old_size, data_ptr, num * sizeof(value_type)); } - void insert_default() override { data.push_back(default_value()); } + void insert_default() override { + materialize_external_data(); + data.push_back(default_value()); + } void insert_many_defaults(size_t length) override { + materialize_external_data(); size_t old_size = data.size(); data.resize(old_size + length); std::fill(data.data() + old_size, data.data() + old_size + length, default_value()); } - void pop_back(size_t n) override { data.resize_assume_reserved(data.size() - n); } + void pop_back(size_t n) override { + materialize_external_data(); + data.resize_assume_reserved(data.size() - n); + } StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override; @@ -203,30 +259,32 @@ class ColumnVector final : public COWHelper> { void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash, const uint8_t* __restrict null_data) const override { + auto values = immutable_data(); if (null_data) { for (size_t i = start; i < end; i++) { if (null_data[i] == 0) { - hash = HashUtil::xxHash64WithSeed(reinterpret_cast(&data[i]), + hash = HashUtil::xxHash64WithSeed(reinterpret_cast(&values[i]), sizeof(value_type), hash); } } } else { for (size_t i = start; i < end; i++) { - hash = HashUtil::xxHash64WithSeed(reinterpret_cast(&data[i]), + hash = HashUtil::xxHash64WithSeed(reinterpret_cast(&values[i]), sizeof(value_type), hash); } } } void ALWAYS_INLINE update_crc_with_value_without_null(size_t idx, uint32_t& hash) const { + auto values = immutable_data(); if constexpr (is_date_or_datetime(T)) { char buf[64]; - const auto& date_val = (const VecDateTimeValue&)data[idx]; + const auto& date_val = (const VecDateTimeValue&)values[idx]; auto len = date_val.to_buffer(buf); hash = HashUtil::zlib_crc_hash(buf, len, hash); } else { - hash = HashUtil::zlib_crc_hash(&data[idx], sizeof(value_type), hash); + hash = HashUtil::zlib_crc_hash(&values[idx], sizeof(value_type), hash); } } @@ -260,16 +318,30 @@ class ColumnVector final : public COWHelper> { void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override; - size_t byte_size() const override { return data.size() * sizeof(data[0]); } + size_t byte_size() const override { return size() * sizeof(value_type); } - size_t allocated_bytes() const override { return data.allocated_bytes(); } + size_t allocated_bytes() const override { + // External spans point into page-cache owned memory. Count only ColumnVector metadata here; + // charging the page bytes again would double-count memory already tracked by the storage + // page cache. + return data.allocated_bytes() + _external_data_spans.capacity() * sizeof(ExternalDataSpan); + } bool has_enough_capacity(const IColumn& src) const override { + // Capacity reuse is meaningful only for the mutable local PODArray. A page-backed column has + // immutable external spans, so any append-style reuse must first materialize and should not + // be selected by generic block reuse heuristics. + if (_has_external_data()) { + return false; + } const auto& src_vec = assert_cast(src); - return data.capacity() - data.size() > src_vec.data.size(); + return data.capacity() - data.size() > src_vec.size(); } - void insert_value(const value_type value) { data.push_back(value); } + void insert_value(const value_type value) { + materialize_external_data(); + data.push_back(value); + } Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) override { Self* output = assert_cast(col_ptr); @@ -285,8 +357,10 @@ class ColumnVector final : public COWHelper> { /// This method implemented in header because it could be possibly devirtualized. int compare_at(size_t n, size_t m, const IColumn& rhs_, int nan_direction_hint) const override { - return Compare::compare( - data[n], assert_cast(rhs_).data[m]); + const auto lhs_values = immutable_data(); + const auto rhs_values = + assert_cast(rhs_).immutable_data(); + return Compare::compare(lhs_values[n], rhs_values[m]); } void get_permutation(bool reverse, size_t limit, int nan_direction_hint, HybridSorter& sorter, @@ -294,7 +368,10 @@ class ColumnVector final : public COWHelper> { void reserve(size_t n) override { data.reserve(n); } - void resize(size_t n) override { data.resize(n); } + void resize(size_t n) override { + materialize_external_data(); + data.resize(n); + } std::string get_name() const override { return type_to_string(T); } @@ -304,11 +381,14 @@ class ColumnVector final : public COWHelper> { void get(size_t n, Field& res) const override { res = (*this)[n]; } - void clear() override { data.clear(); } + void clear() override { + data.clear(); + _reset_external_data(); + } bool get_bool(size_t n) const override { if constexpr (T == TYPE_BOOLEAN) { - return bool(data[n]); + return bool(immutable_data()[n]); } else { throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Method get_int is not supported for " + get_name()); @@ -322,7 +402,7 @@ class ColumnVector final : public COWHelper> { "Method get_int is not supported for " + get_name()); return 0; } else { - return Int64(data[n]); + return Int64(immutable_data()[n]); } } @@ -333,7 +413,10 @@ class ColumnVector final : public COWHelper> { // but its type is different from column's data type (int64 vs uint64), so that during column // insert method, should use NearestFieldType to get the Field and get it actual // uint8 value and then insert into column. - void insert(const Field& x) override { data.push_back(x.get()); } + void insert(const Field& x) override { + materialize_external_data(); + data.push_back(x.get()); + } void insert_range_from(const IColumn& src, size_t start, size_t length) override; @@ -346,7 +429,8 @@ class ColumnVector final : public COWHelper> { MutableColumnPtr permute(const IColumn::Permutation& perm, size_t limit) const override; StringRef get_raw_data() const override { - return StringRef(reinterpret_cast(data.data()), data.size()); + auto values = immutable_data(); + return StringRef(reinterpret_cast(values.data()), values.size()); } bool structure_equals(const IColumn& rhs) const override { @@ -354,27 +438,63 @@ class ColumnVector final : public COWHelper> { } /** More efficient methods of manipulation - to manipulate with data directly. */ - Container& get_data() { return data; } + Container& get_data() { + materialize_external_data(); + return data; + } - const Container& get_data() const { return data; } + const Container& get_data() const { + materialize_external_data(); + return data; + } - const value_type& get_element(size_t n) const { return data[n]; } + // Read-only consumers should prefer for_each_immutable_data_span() when they can process + // segmented input. Scan blocks often span multiple storage pages; immutable_data() still keeps + // the historical contiguous-span contract and therefore materializes multi-page columns. + ImmContainer immutable_data() const { + if (_has_external_data()) { + if (_external_data_spans.size() == 1) { + const auto& span = _external_data_spans.front(); + return {span.data, span.size}; + } + materialize_external_data(); + } + return {data.data(), data.size()}; + } - value_type& get_element(size_t n) { return data[n]; } + template + void for_each_immutable_data_span(Func&& func) const { + if (!data.empty()) { + func(ImmContainer {data.data(), data.size()}); + } + for (const auto& span : _external_data_spans) { + func(ImmContainer {span.data, span.size}); + } + } + + const value_type& get_element(size_t n) const { return immutable_data()[n]; } + + value_type& get_element(size_t n) { + materialize_external_data(); + return data[n]; + } void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 0) override { DCHECK(size() > self_row); - data[self_row] = assert_cast(rhs).data[row]; + materialize_external_data(); + data[self_row] = + assert_cast(rhs).immutable_data()[row]; } // Optimized batch version using memcpy for continuous range void replace_column_data_range(const IColumn& src, size_t src_start, size_t count, size_t self_start) override { DCHECK(size() >= self_start + count); + materialize_external_data(); const auto& src_col = assert_cast(src); DCHECK(src_col.size() >= src_start + count); - memcpy(data.data() + self_start, src_col.data.data() + src_start, - count * sizeof(value_type)); + auto src_values = src_col.immutable_data(); + memcpy(data.data() + self_start, src_values.data() + src_start, count * sizeof(value_type)); } bool support_replace_column_data_range() const override { return true; } @@ -393,6 +513,7 @@ class ColumnVector final : public COWHelper> { uint8_t* __restrict filter) const override; void erase(size_t start, size_t length) override { + materialize_external_data(); if (start >= data.size() || length == 0) { return; } @@ -415,10 +536,63 @@ class ColumnVector final : public COWHelper> { } } +private: + bool _has_external_data() const { return !_external_data_spans.empty(); } + + void _append_external_data_span(const value_type* external_data, size_t external_size, + std::shared_ptr owner) { + if (external_size == 0) { + return; + } + DCHECK(external_data != nullptr); + DCHECK(owner != nullptr); + if (!_external_data_spans.empty()) { + auto& last = _external_data_spans.back(); + if (last.owner.get() == owner.get() && last.data + last.size == external_data) { + // Consecutive decoder calls can return adjacent slices from the same page. Merge + // them so read-only consumers see fewer spans without changing ownership semantics. + last.size += external_size; + _external_size += external_size; + return; + } + } + _external_data_spans.push_back( + ExternalDataSpan {.owner = std::move(owner), + .data = external_data, + .size = external_size}); + _external_size += external_size; + } + + void _reset_external_data() const { + _external_data_spans.clear(); + _external_size = 0; + } + + void materialize_external_data() const { + if (!_has_external_data()) { + return; + } + // This is the boundary back to the traditional ColumnVector contract: many generic + // expression, sort, filter, and serialization paths assume a single contiguous + // PaddedPODArray. Page-backed scan spans are safe only for read-only consumers that + // explicitly iterate spans; all other paths pay one copy here and then behave exactly like + // an ordinary ColumnVector. + const auto old_size = data.size(); + data.resize(old_size + _external_size); + auto* dst = data.data() + old_size; + for (const auto& span : _external_data_spans) { + memcpy(dst, span.data, span.size * sizeof(value_type)); + dst += span.size; + } + _reset_external_data(); + } + protected: uint32_t _zlib_crc32_hash(uint32_t hash, size_t idx) const; uint32_t _crc32c_hash(uint32_t hash, size_t idx) const; - Container data; + mutable Container data; + mutable std::vector _external_data_spans; + mutable size_t _external_size = 0; }; using ColumnUInt8 = ColumnVector; diff --git a/be/src/core/data_type/data_type_number_base.cpp b/be/src/core/data_type/data_type_number_base.cpp index dcd167bce1df0d..0cac78a87affb8 100644 --- a/be/src/core/data_type/data_type_number_base.cpp +++ b/be/src/core/data_type/data_type_number_base.cpp @@ -124,10 +124,10 @@ char* DataTypeNumberBase::serialize(const IColumn& column, char* buf, // mem_size = real_need_copy_num * sizeof(T) auto mem_size = real_need_copy_num * sizeof(typename PrimitiveTypeTraits::CppType); - const auto* origin_data = + const auto values = assert_cast::ColumnType&>(*data_column) - .get_data() - .data(); + .immutable_data(); + const auto* origin_data = values.data(); // column data if (mem_size <= SERIALIZED_MEM_SIZE_LIMIT) { diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp index 320976814679b9..b1e1e792e632a8 100644 --- a/be/src/exec/scan/olap_scanner.cpp +++ b/be/src/exec/scan/olap_scanner.cpp @@ -187,11 +187,12 @@ Status OlapScanner::_prepare_impl() { // set limit to reduce end of rowset and segment mem use _tablet_reader = std::make_unique(); - // batch size is passed down to segment iterator, use _state->batch_size() - // instead of _parent->limit(), because if _parent->limit() is a very small - // value (e.g. select a from t where a .. and b ... limit 1), - // it will be very slow when reading data in segment iterator - _tablet_reader->set_batch_size(_state->batch_size()); + // Batch size is passed down to segment iterator. Use the scanner-selected storage batch + // instead of _parent->limit(): tiny LIMIT values would make segment iteration very slow. For a + // pure fixed-width slot-ref scan, Scanner may raise the storage batch up to the byte-budgeted + // session maximum so the reader directly produces large page-backed blocks; doing this in the + // storage reader preserves the zero-copy fixed-width page spans. + _tablet_reader->set_batch_size(_storage_read_batch_size(_state)); // Adaptive batch size: pass byte-budget settings to the storage reader. // The reader still uses batch_size() as the row ceiling. _tablet_reader->set_preferred_block_size_bytes(_state->preferred_block_size_bytes()); diff --git a/be/src/exec/scan/scanner.cpp b/be/src/exec/scan/scanner.cpp index 8788331c24546d..8cb7a4c236132f 100644 --- a/be/src/exec/scan/scanner.cpp +++ b/be/src/exec/scan/scanner.cpp @@ -19,15 +19,20 @@ #include +#include + #include "common/config.h" #include "common/status.h" #include "core/block/column_with_type_and_name.h" #include "core/column/column_nothing.h" +#include "exec/common/util.hpp" #include "exec/operator/scan_operator.h" #include "exec/scan/scan_node.h" #include "exprs/vexpr_context.h" +#include "exprs/vslot_ref.h" #include "runtime/descriptors.h" #include "runtime/runtime_profile.h" +#include "storage/segment/adaptive_block_size_predictor.h" #include "util/concurrency_stats.h" #include "util/defer_op.h" @@ -80,9 +85,76 @@ Status Scanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) { } } + // Pure slot-ref projections do not compute new values; they only reorder or forward scan + // columns. Record the source column ids once here so the hot projection path can move the + // existing ColumnPtr instead of executing VSlotRef and then mutating it, which would clone + // the column when the input block still shares the same pointer. + if (!_projections.empty() && _intermediate_projections.empty()) { + _direct_slot_ref_projection_column_ids.reserve(_projections.size()); + for (const auto& projection : _projections) { + auto slot_ref = std::dynamic_pointer_cast(projection->root()); + if (slot_ref == nullptr) { + _direct_slot_ref_projection_column_ids.clear(); + break; + } + _direct_slot_ref_projection_column_ids.push_back(slot_ref->column_id()); + } + if (!_direct_slot_ref_projection_column_ids.empty() && _conjuncts.empty() && + _limit <= 0 && _shared_scan_limit == nullptr && _output_row_descriptor != nullptr && + _output_row_descriptor->num_materialized_slots() == + _direct_slot_ref_projection_column_ids.size()) { + size_t row_bytes = 0; + bool all_fixed_width = true; + for (const auto& tuple_desc : _output_row_descriptor->tuple_descriptors()) { + for (const auto* slot_desc : tuple_desc->slots()) { + const auto& type = slot_desc->get_data_type_ptr(); + if (!type->have_maximum_size_of_value()) { + all_fixed_width = false; + break; + } + row_bytes += std::max(1, type->get_size_of_value_in_memory()); + } + if (!all_fixed_width) { + break; + } + } + if (all_fixed_width && row_bytes > 0) { + // This enables OLAP storage to produce fewer, larger blocks for the hot arithmetic + // scan shape. Do it before page decoding rather than by merging blocks later: + // MutableBlock::merge() appends through generic Column APIs and would materialize + // the page-backed fixed-width spans that the storage decoder can otherwise forward + // without copying. Varlen and complex slots stay on the session batch size because + // their per-row memory is data-dependent and large blocks can amplify memory spikes. + _direct_slot_ref_projection_row_bytes = row_bytes; + } + } + } + return Status::OK(); } +int Scanner::_storage_read_batch_size(RuntimeState* state) const { + const int session_batch_size = state->batch_size(); + if (_direct_slot_ref_projection_row_bytes == 0) { + return session_batch_size; + } + + // Keep the same hard upper bound used by the segment adaptive reader and by the public + // batch_size contract. The byte budget decides whether a fixed-width scan can safely use that + // many rows; the session batch size remains the lower bound so this path never shrinks existing + // scans. + static constexpr size_t kMaxReadBatchRows = + AdaptiveBlockSizePredictor::kDefaultBlockSizeRows; + const size_t rows_by_bytes = + state->preferred_block_size_bytes() / _direct_slot_ref_projection_row_bytes; + if (rows_by_bytes == 0) { + return session_batch_size; + } + const size_t target_rows = + std::max(session_batch_size, std::min(kMaxReadBatchRows, rows_by_bytes)); + return static_cast(target_rows); +} + Status Scanner::get_block_after_projects(RuntimeState* state, Block* block, bool* eos) { SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().vscanner_get_block); auto& row_descriptor = _local_state->_parent->row_descriptor(); @@ -222,6 +294,40 @@ Status Scanner::_do_projections(Block* origin_block, Block* output_block) { } DCHECK_EQ(rows, input_block.rows()); + // Fast path for the common scan shape: PhysicalProject only forwards slot refs and the + // output slot type is exactly the same as the scan column type. The exact type check keeps + // casts/nullability changes on the generic expression path; only a pure column ownership + // transfer is handled here. + if (!_direct_slot_ref_projection_column_ids.empty() && + _direct_slot_ref_projection_column_ids.size() == _projections.size()) { + bool can_use_direct_projection = true; + auto direct_projection_columns = + VectorizedUtils::create_columns_with_type_and_name(*_output_row_descriptor); + if (direct_projection_columns.size() != _direct_slot_ref_projection_column_ids.size()) { + return Status::InternalError( + "direct slot ref projection size mismatch, output columns {}, projections {}", + direct_projection_columns.size(), + _direct_slot_ref_projection_column_ids.size()); + } + for (size_t i = 0; i < _direct_slot_ref_projection_column_ids.size(); ++i) { + auto column_id = _direct_slot_ref_projection_column_ids[i]; + if (column_id < 0 || column_id >= origin_block->columns()) { + return Status::InternalError( + "slot ref projection column id {} is out of range, origin block columns {}", + column_id, origin_block->columns()); + } + const auto& src = origin_block->get_by_position(column_id); + if (!direct_projection_columns[i].type->equals(*src.type)) { + can_use_direct_projection = false; + break; + } + } + if (can_use_direct_projection) { + return _do_direct_slot_ref_projection(origin_block, output_block, + std::move(direct_projection_columns)); + } + } + auto scoped_mutable_block = VectorizedUtils::build_scoped_mutable_mem_reuse_block( output_block, *_output_row_descriptor); auto& mutable_block = scoped_mutable_block.mutable_block(); @@ -250,6 +356,41 @@ Status Scanner::_do_projections(Block* origin_block, Block* output_block) { return Status::OK(); } +Status Scanner::_do_direct_slot_ref_projection(Block* origin_block, Block* output_block, + ColumnsWithTypeAndName&& output_columns) { + const size_t rows = origin_block->rows(); + + for (size_t i = 0; i < _direct_slot_ref_projection_column_ids.size(); ++i) { + auto column_id = _direct_slot_ref_projection_column_ids[i]; + if (column_id < 0 || column_id >= origin_block->columns()) { + return Status::InternalError( + "slot ref projection column id {} is out of range, origin block columns {}", + column_id, origin_block->columns()); + } + + auto column = + origin_block->get_by_position(column_id).column->convert_to_full_column_if_const(); + if (column->size() != rows) { + return Status::InternalError( + "direct slot ref projection result column size {} not equal input rows {}", + column->size(), rows); + } + output_columns[i].column = std::move(column); + } + + Block projected_block(std::move(output_columns)); + output_block->swap(projected_block); + + // The output block now owns the scan columns. Replace the origin block with empty columns so + // later block reuse/destruction does not keep an extra shared reference that would make later + // mutable paths clone the forwarded columns. + auto empty_columns = origin_block->clone_empty_columns(); + origin_block->set_columns(std::move(empty_columns)); + DCHECK_EQ(output_block->rows(), rows); + + return Status::OK(); +} + Status Scanner::try_append_late_arrival_runtime_filter() { if (_applied_rf_num == _total_rf_num) { return Status::OK(); diff --git a/be/src/exec/scan/scanner.h b/be/src/exec/scan/scanner.h index a91395b911efe7..05a0aecc056279 100644 --- a/be/src/exec/scan/scanner.h +++ b/be/src/exec/scan/scanner.h @@ -111,6 +111,8 @@ class Scanner { // Subclass should implement this to return data. virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) = 0; + int _storage_read_batch_size(RuntimeState* state) const; + Status _merge_padding_block() { if (_padding_block.empty()) { _padding_block.swap(_origin_block); @@ -141,6 +143,8 @@ class Scanner { Status _filter_output_block(Block* block); Status _do_projections(Block* origin_block, Block* output_block); + Status _do_direct_slot_ref_projection(Block* origin_block, Block* output_block, + ColumnsWithTypeAndName&& output_columns); private: void _start_scan_cpu_timer() { @@ -249,6 +253,12 @@ class Scanner { VExprContextSPtrs _projections; // Used in common subexpression elimination to compute intermediate results. std::vector _intermediate_projections; + // Non-empty only when final projections are all VSlotRef and there is no intermediate CSE + // projection. Values are source block column ids used by the no-copy projection fast path. + std::vector _direct_slot_ref_projection_column_ids; + // Estimated output bytes per row for the pure slot-ref projection fast path. Non-zero only for + // fixed-width slots and simple scans where it is safe for OLAP storage to produce larger blocks. + size_t _direct_slot_ref_projection_row_bytes = 0; Block _origin_block; Block _padding_block; diff --git a/be/src/exprs/aggregate/aggregate_function_sum.h b/be/src/exprs/aggregate/aggregate_function_sum.h index c42c77f7d13d90..75a34c1a6a2c6a 100644 --- a/be/src/exprs/aggregate/aggregate_function_sum.h +++ b/be/src/exprs/aggregate/aggregate_function_sum.h @@ -22,6 +22,7 @@ #include +#include #include #include @@ -86,6 +87,16 @@ class AggregateFunctionSum final using ColVecType = typename PrimitiveTypeTraits::ColumnType; using ColVecResult = typename PrimitiveTypeTraits::ColumnType; +private: + static decltype(auto) _column_values(const ColVecType& column) { + if constexpr (is_decimal(T)) { + return column.get_data(); + } else { + return column.immutable_data(); + } + } + +public: String get_name() const override { return "sum"; } AggregateFunctionSum(const DataTypes& argument_types_) @@ -107,8 +118,40 @@ class AggregateFunctionSum final Arena&) const override { const auto& column = assert_cast(*columns[0]); - this->data(place).add( - typename PrimitiveTypeTraits::CppType(column.get_data()[row_num])); + const auto& values = _column_values(column); + this->data(place).add(typename PrimitiveTypeTraits::CppType(values[row_num])); + } + + void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns, + Arena&) const override { + const auto& column = + assert_cast(*columns[0]); + auto& aggregate_data = this->data(place); + if constexpr (is_decimal(T)) { + // Decimal columns are already stored in Doris-owned contiguous memory and their + // arithmetic goes through the existing decimal container semantics. + const auto& source_values = _column_values(column); + const auto* __restrict values = source_values.data(); + for (size_t i = 0; i < batch_size; ++i) { + aggregate_data.add(typename PrimitiveTypeTraits::CppType(values[i])); + } + } else { + // This is semantically the same as the default IAggregateFunctionHelper loop over + // add(), but it is intentionally written for the hot no-group-by SUM path. Fixed-width + // scan columns can be backed by several storage pages inside one large block; iterating + // page spans keeps those pages read-only and avoids materializing them into one + // contiguous PODArray just to add the values. + size_t remaining = batch_size; + column.for_each_immutable_data_span([&](typename ColVecType::ImmContainer source_values) { + const size_t rows = std::min(remaining, source_values.size()); + const auto* __restrict values = source_values.data(); + for (size_t i = 0; i < rows; ++i) { + aggregate_data.add(typename PrimitiveTypeTraits::CppType(values[i])); + } + remaining -= rows; + }); + DCHECK_EQ(remaining, 0); + } } void reset(AggregateDataPtr place) const override { this->data(place).sum = {}; } @@ -152,7 +195,8 @@ class AggregateFunctionSum final DCHECK(col.item_size() == sizeof(Data)) << "size is not equal: " << col.item_size() << " " << sizeof(Data); col.resize(num_rows); - auto* src_data = src.get_data().data(); + const auto& source_values = _column_values(src); + auto* src_data = source_values.data(); auto* dst_data = col.get_data().data(); for (size_t i = 0; i != num_rows; ++i) { auto& state = *reinterpret_cast(&dst_data[sizeof(Data) * i]); @@ -223,7 +267,8 @@ class AggregateFunctionSum final if (*could_use_previous_result) { const auto& column = assert_cast(*columns[0]); - const auto* data = column.get_data().data(); + const auto& source_values = _column_values(column); + const auto* data = source_values.data(); auto outcoming_pos = frame_start - 1; auto incoming_pos = frame_end - 1; if (!previous_is_nul && outcoming_pos >= partition_start && @@ -256,9 +301,10 @@ class AggregateFunctionSum final } else { const auto& column = assert_cast(*columns[0]); + const auto& source_values = _column_values(column); for (size_t row_num = current_frame_start; row_num < current_frame_end; ++row_num) { this->data(place).add( - typename PrimitiveTypeTraits::CppType(column.get_data()[row_num])); + typename PrimitiveTypeTraits::CppType(source_values[row_num])); } *use_null_result = false; *could_use_previous_result = true; diff --git a/be/src/storage/segment/bitshuffle_page.h b/be/src/storage/segment/bitshuffle_page.h index 8e13ecdd22c08b..649d9a0f396417 100644 --- a/be/src/storage/segment/bitshuffle_page.h +++ b/be/src/storage/segment/bitshuffle_page.h @@ -393,7 +393,9 @@ class BitShufflePageDecoder : public PageDecoder { size_t max_fetch = std::min(*n, _num_elements - _cur_index); - dst->insert_many_fix_len_data(get_data(_cur_index), max_fetch); + // Pass the page owner with the contiguous fixed-width range. ColumnVector can borrow this + // read-only page memory when safe; other columns fall back to the normal copy path. + dst->insert_many_fix_len_data_with_owner(get_data(_cur_index), max_fetch, _page_data_owner); *n = max_fetch; if constexpr (forward_index) { _cur_index += max_fetch; diff --git a/be/src/storage/segment/page_decoder.h b/be/src/storage/segment/page_decoder.h index 39f94e88a7a0ef..a1591a371de582 100644 --- a/be/src/storage/segment/page_decoder.h +++ b/be/src/storage/segment/page_decoder.h @@ -17,6 +17,9 @@ #pragma once +#include +#include + #include "common/status.h" // for Status #include "core/column/column.h" @@ -87,8 +90,18 @@ class PageDecoder { return Status::NotSupported("get_dict_word_info not implement"); } + // Fixed-width decoders can hand out pointers into their decoded page buffer. The owner keeps + // that page alive after next_batch() returns if the destination column chooses a zero-copy + // immutable view; destinations that cannot use it still copy through the default column API. + void set_page_data_owner(std::shared_ptr page_data_owner) { + _page_data_owner = std::move(page_data_owner); + } + private: DISALLOW_COPY_AND_ASSIGN(PageDecoder); + +protected: + std::shared_ptr _page_data_owner; }; } // namespace segment_v2 diff --git a/be/src/storage/segment/parsed_page.h b/be/src/storage/segment/parsed_page.h index 5bab09400a22f3..ed4899e14e2f1b 100644 --- a/be/src/storage/segment/parsed_page.h +++ b/be/src/storage/segment/parsed_page.h @@ -20,6 +20,7 @@ #include #include +#include #include "common/status.h" #include "storage/segment/binary_dict_page.h" @@ -43,7 +44,7 @@ struct ParsedPage { PageDecoderOptions opts = PageDecoderOptions()) { result->~ParsedPage(); ParsedPage* page = new (result)(ParsedPage); - page->page_handle = std::move(handle); + page->page_handle = std::make_shared(std::move(handle)); auto null_size = footer.nullmap_size(); page->has_null = null_size > 0; @@ -58,6 +59,9 @@ struct ParsedPage { PageDecoder* decoder; RETURN_IF_ERROR(encoding->create_page_decoder(data_slice, opts, &decoder)); page->data_decoder.reset(decoder); + // The decoder may forward fixed-width values as immutable column memory. Keep the + // underlying page alive through any output column that adopts this shared owner. + page->data_decoder->set_page_data_owner(page->page_handle); RETURN_IF_ERROR(page->data_decoder->init()); if (encoding->encoding() == DICT_ENCODING) { @@ -78,7 +82,7 @@ struct ParsedPage { ~ParsedPage() { data_decoder = nullptr; } - PageHandle page_handle; + std::shared_ptr page_handle; bool has_null; Slice null_bitmap; diff --git a/be/src/storage/segment/plain_page.h b/be/src/storage/segment/plain_page.h index 52c4c975198bb9..986132f4710b7f 100644 --- a/be/src/storage/segment/plain_page.h +++ b/be/src/storage/segment/plain_page.h @@ -191,7 +191,10 @@ class PlainPageDecoder : public PageDecoder { size_t max_fetch = std::min(*n, static_cast(_num_elems - _cur_idx)); const void* src_data = &_data[PLAIN_PAGE_HEADER_SIZE + _cur_idx * SIZE_OF_TYPE]; - dst->insert_many_fix_len_data((const char*)src_data, max_fetch); + // Pass the page owner with the contiguous fixed-width range. ColumnVector can borrow this + // read-only page memory when safe; other columns fall back to the normal copy path. + dst->insert_many_fix_len_data_with_owner((const char*)src_data, max_fetch, + _page_data_owner); *n = max_fetch; _cur_idx += max_fetch; From b1ecd23a6d70291c981ca897d149a2e36ebfff4f Mon Sep 17 00:00:00 2001 From: zhaochangle Date: Wed, 24 Jun 2026 16:10:13 +0800 Subject: [PATCH 2/6] [improvement](be) Limit zero-copy fixed columns to one page view ### What problem does this PR solve? Issue Number: N/A Related PR: N/A Problem Summary: The zero-copy fixed-width column demo kept a vector of external page spans and exposed a span-iteration helper so consumers could process multi-page scan blocks without materializing. That made ColumnVector carry a more complex segmented immutable-data contract than the execution layer normally expects. This change matches the StarRocks-style single-resource model: ColumnVector may adopt one page-owned fixed-width view when the destination is empty and aligned, while any later append or mutable access materializes the view into Doris-owned storage before continuing. Legacy DATE/DATETIME still use the existing decode-and-convert path because the page bytes are not byte-identical to the in-memory layout. SUM no longer needs a multi-span hot path and reads the normal single immutable view. ### Release note None ### Check List (For Author) - Test: Manual test - PATH=/mnt/disk6/common/ldb_toolchain_toucan/bin:$PATH build-support/clang-format.sh - git diff --check -- be/src/core/column/column_vector.h be/src/exprs/aggregate/aggregate_function_sum.h - ./build.sh --be -j 90 - Behavior changed: Yes. Fixed-width zero-copy scan columns now keep at most one page-backed external view and materialize when additional page ranges are appended. - Does this need documentation: No --- be/src/core/column/column_vector.h | 98 ++++++------------- be/src/exec/scan/scanner.cpp | 7 +- .../exprs/aggregate/aggregate_function_sum.h | 29 +----- 3 files changed, 36 insertions(+), 98 deletions(-) diff --git a/be/src/core/column/column_vector.h b/be/src/core/column/column_vector.h index f94106661c9ee3..63dab3432d4dbd 100644 --- a/be/src/core/column/column_vector.h +++ b/be/src/core/column/column_vector.h @@ -90,20 +90,13 @@ class ColumnVector final : public COWHelper> { using Container = PaddedPODArray; using ImmContainer = std::span; -private: - struct ExternalDataSpan { - std::shared_ptr owner; - const value_type* data = nullptr; - size_t size = 0; - }; - ColumnVector() = default; explicit ColumnVector(const size_t n) : data(n) {} explicit ColumnVector(const size_t n, const value_type x) : data(n, x) {} ColumnVector(const ColumnVector& src) { data.reserve(src.size()); - src.for_each_immutable_data_span( - [this](ImmContainer values) { data.insert(values.begin(), values.end()); }); + const auto values = src.immutable_data(); + data.insert(values.begin(), values.end()); } /// Sugar constructor. @@ -202,17 +195,16 @@ class ColumnVector final : public COWHelper> { return; } const bool can_use_external_data = - owner != nullptr && data.empty() && + owner != nullptr && data.empty() && !_has_external_data() && reinterpret_cast(data_ptr) % alignof(value_type) == 0; if (can_use_external_data) { // The page decoder already owns decoded fixed-width values in naturally aligned page - // memory. Keep the page owner and append a read-only span instead of copying it into - // the local PODArray. A large Doris block can cross several storage pages, so this is - // intentionally segmented; consumers that can iterate spans stay zero-copy, while - // legacy consumers that require one contiguous array call immutable_data()/get_data() - // and materialize once. - _append_external_data_span(reinterpret_cast(data_ptr), num, - std::move(owner)); + // memory. Keep the page owner and adopt one read-only view instead of copying it into + // the local PODArray. This deliberately supports only one page-backed view, matching + // SR's single-resource model. If a later append brings another page range, the normal + // append path materializes this view first and then copies the new values. + _set_external_data(reinterpret_cast(data_ptr), num, + std::move(owner)); return; } insert_many_fix_len_data(data_ptr, num); @@ -321,15 +313,15 @@ class ColumnVector final : public COWHelper> { size_t byte_size() const override { return size() * sizeof(value_type); } size_t allocated_bytes() const override { - // External spans point into page-cache owned memory. Count only ColumnVector metadata here; + // External data points into page-cache owned memory. Count only ColumnVector metadata here; // charging the page bytes again would double-count memory already tracked by the storage // page cache. - return data.allocated_bytes() + _external_data_spans.capacity() * sizeof(ExternalDataSpan); + return data.allocated_bytes(); } bool has_enough_capacity(const IColumn& src) const override { - // Capacity reuse is meaningful only for the mutable local PODArray. A page-backed column has - // immutable external spans, so any append-style reuse must first materialize and should not + // Capacity reuse is meaningful only for the mutable local PODArray. A page-backed column + // has immutable external data, so append-style reuse must first materialize and should not // be selected by generic block reuse heuristics. if (_has_external_data()) { return false; @@ -448,30 +440,13 @@ class ColumnVector final : public COWHelper> { return data; } - // Read-only consumers should prefer for_each_immutable_data_span() when they can process - // segmented input. Scan blocks often span multiple storage pages; immutable_data() still keeps - // the historical contiguous-span contract and therefore materializes multi-page columns. ImmContainer immutable_data() const { if (_has_external_data()) { - if (_external_data_spans.size() == 1) { - const auto& span = _external_data_spans.front(); - return {span.data, span.size}; - } - materialize_external_data(); + return {_external_data, _external_size}; } return {data.data(), data.size()}; } - template - void for_each_immutable_data_span(Func&& func) const { - if (!data.empty()) { - func(ImmContainer {data.data(), data.size()}); - } - for (const auto& span : _external_data_spans) { - func(ImmContainer {span.data, span.size}); - } - } - const value_type& get_element(size_t n) const { return immutable_data()[n]; } value_type& get_element(size_t n) { @@ -537,34 +512,24 @@ class ColumnVector final : public COWHelper> { } private: - bool _has_external_data() const { return !_external_data_spans.empty(); } + bool _has_external_data() const { return _external_owner != nullptr; } - void _append_external_data_span(const value_type* external_data, size_t external_size, - std::shared_ptr owner) { + void _set_external_data(const value_type* external_data, size_t external_size, + std::shared_ptr owner) { if (external_size == 0) { return; } DCHECK(external_data != nullptr); DCHECK(owner != nullptr); - if (!_external_data_spans.empty()) { - auto& last = _external_data_spans.back(); - if (last.owner.get() == owner.get() && last.data + last.size == external_data) { - // Consecutive decoder calls can return adjacent slices from the same page. Merge - // them so read-only consumers see fewer spans without changing ownership semantics. - last.size += external_size; - _external_size += external_size; - return; - } - } - _external_data_spans.push_back( - ExternalDataSpan {.owner = std::move(owner), - .data = external_data, - .size = external_size}); - _external_size += external_size; + DCHECK(!_has_external_data()); + _external_owner = std::move(owner); + _external_data = external_data; + _external_size = external_size; } void _reset_external_data() const { - _external_data_spans.clear(); + _external_owner.reset(); + _external_data = nullptr; _external_size = 0; } @@ -572,18 +537,12 @@ class ColumnVector final : public COWHelper> { if (!_has_external_data()) { return; } - // This is the boundary back to the traditional ColumnVector contract: many generic - // expression, sort, filter, and serialization paths assume a single contiguous - // PaddedPODArray. Page-backed scan spans are safe only for read-only consumers that - // explicitly iterate spans; all other paths pay one copy here and then behave exactly like - // an ordinary ColumnVector. + // This is the boundary back to the traditional ColumnVector contract. The zero-copy path + // keeps at most one page-backed span. Any mutable access or append that cannot reuse that + // single page resource copies it into Doris-owned storage before continuing. const auto old_size = data.size(); data.resize(old_size + _external_size); - auto* dst = data.data() + old_size; - for (const auto& span : _external_data_spans) { - memcpy(dst, span.data, span.size * sizeof(value_type)); - dst += span.size; - } + memcpy(data.data() + old_size, _external_data, _external_size * sizeof(value_type)); _reset_external_data(); } @@ -591,7 +550,8 @@ class ColumnVector final : public COWHelper> { uint32_t _zlib_crc32_hash(uint32_t hash, size_t idx) const; uint32_t _crc32c_hash(uint32_t hash, size_t idx) const; mutable Container data; - mutable std::vector _external_data_spans; + mutable std::shared_ptr _external_owner; + mutable const value_type* _external_data = nullptr; mutable size_t _external_size = 0; }; diff --git a/be/src/exec/scan/scanner.cpp b/be/src/exec/scan/scanner.cpp index 8cb7a4c236132f..7ae77553a22aa0 100644 --- a/be/src/exec/scan/scanner.cpp +++ b/be/src/exec/scan/scanner.cpp @@ -99,8 +99,8 @@ Status Scanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) { } _direct_slot_ref_projection_column_ids.push_back(slot_ref->column_id()); } - if (!_direct_slot_ref_projection_column_ids.empty() && _conjuncts.empty() && - _limit <= 0 && _shared_scan_limit == nullptr && _output_row_descriptor != nullptr && + if (!_direct_slot_ref_projection_column_ids.empty() && _conjuncts.empty() && _limit <= 0 && + _shared_scan_limit == nullptr && _output_row_descriptor != nullptr && _output_row_descriptor->num_materialized_slots() == _direct_slot_ref_projection_column_ids.size()) { size_t row_bytes = 0; @@ -143,8 +143,7 @@ int Scanner::_storage_read_batch_size(RuntimeState* state) const { // batch_size contract. The byte budget decides whether a fixed-width scan can safely use that // many rows; the session batch size remains the lower bound so this path never shrinks existing // scans. - static constexpr size_t kMaxReadBatchRows = - AdaptiveBlockSizePredictor::kDefaultBlockSizeRows; + static constexpr size_t kMaxReadBatchRows = AdaptiveBlockSizePredictor::kDefaultBlockSizeRows; const size_t rows_by_bytes = state->preferred_block_size_bytes() / _direct_slot_ref_projection_row_bytes; if (rows_by_bytes == 0) { diff --git a/be/src/exprs/aggregate/aggregate_function_sum.h b/be/src/exprs/aggregate/aggregate_function_sum.h index 75a34c1a6a2c6a..381dfcd5bbd0ec 100644 --- a/be/src/exprs/aggregate/aggregate_function_sum.h +++ b/be/src/exprs/aggregate/aggregate_function_sum.h @@ -22,7 +22,6 @@ #include -#include #include #include @@ -127,30 +126,10 @@ class AggregateFunctionSum final const auto& column = assert_cast(*columns[0]); auto& aggregate_data = this->data(place); - if constexpr (is_decimal(T)) { - // Decimal columns are already stored in Doris-owned contiguous memory and their - // arithmetic goes through the existing decimal container semantics. - const auto& source_values = _column_values(column); - const auto* __restrict values = source_values.data(); - for (size_t i = 0; i < batch_size; ++i) { - aggregate_data.add(typename PrimitiveTypeTraits::CppType(values[i])); - } - } else { - // This is semantically the same as the default IAggregateFunctionHelper loop over - // add(), but it is intentionally written for the hot no-group-by SUM path. Fixed-width - // scan columns can be backed by several storage pages inside one large block; iterating - // page spans keeps those pages read-only and avoids materializing them into one - // contiguous PODArray just to add the values. - size_t remaining = batch_size; - column.for_each_immutable_data_span([&](typename ColVecType::ImmContainer source_values) { - const size_t rows = std::min(remaining, source_values.size()); - const auto* __restrict values = source_values.data(); - for (size_t i = 0; i < rows; ++i) { - aggregate_data.add(typename PrimitiveTypeTraits::CppType(values[i])); - } - remaining -= rows; - }); - DCHECK_EQ(remaining, 0); + const auto& source_values = _column_values(column); + const auto* __restrict values = source_values.data(); + for (size_t i = 0; i < batch_size; ++i) { + aggregate_data.add(typename PrimitiveTypeTraits::CppType(values[i])); } } From 6f1b3a072d756c9da870f3ac478c754a4ddcb8f8 Mon Sep 17 00:00:00 2001 From: zhaochangle Date: Thu, 25 Jun 2026 11:13:41 +0800 Subject: [PATCH 3/6] [fix](be) Fix zero-copy scan CI failures ### What problem does this PR solve? Issue Number: close #xxx Related PR: #64296 Problem Summary: BE UT and several regression jobs exposed two issues in the zero-copy scan branch. First, insert_many_from() is allowed to be a no-op when length is zero, including the case where position is src.size(), but several concrete column implementations read the source row before checking length. Page-backed spans assert on that one-past-end read. The fix makes the concrete implementations return before reading when length is zero. Second, the new storage read batch enlargement let simple fixed-width scans pass a much larger batch into SegmentIterator, while lower lazy-materialization selector paths still assume the normal session batch contract. That caused regression coredumps in filter_by_selector. The fix restores the OLAP reader batch size to RuntimeState::batch_size() and keeps the no-copy slot-ref projection fast path limited to simple projection scans. ### Release note None ### Check List (For Author) - Test: Unit Test - ./run-be-ut.sh --run --filter=ColumnArrayTest.InsertManyFromTest -j 90 - Behavior changed: No - Does this need documentation: No --- be/src/core/column/column_complex.h | 4 ++ be/src/core/column/column_const.h | 4 ++ be/src/core/column/column_decimal.cpp | 4 ++ be/src/core/column/column_nullable.cpp | 4 ++ be/src/core/column/column_string.cpp | 4 ++ be/src/core/column/column_struct.cpp | 4 ++ be/src/core/column/column_vector.cpp | 5 ++ be/src/exec/scan/olap_scanner.cpp | 9 ++-- be/src/exec/scan/scanner.cpp | 64 ++++---------------------- be/src/exec/scan/scanner.h | 5 -- 10 files changed, 42 insertions(+), 65 deletions(-) diff --git a/be/src/core/column/column_complex.h b/be/src/core/column/column_complex.h index 9f0d7b45e72cf4..e99e9275bd764e 100644 --- a/be/src/core/column/column_complex.h +++ b/be/src/core/column/column_complex.h @@ -170,6 +170,10 @@ class ColumnComplexType final : public COWHelper> } void insert_many_from(const IColumn& src, size_t position, size_t length) override { + if (length == 0) { + return; + } + const Self& src_vec = assert_cast(src); auto val = src_vec.get_element(position); for (uint32_t i = 0; i < length; ++i) { diff --git a/be/src/core/column/column_const.h b/be/src/core/column/column_const.h index fa647aca466799..c06a18b03b96f7 100644 --- a/be/src/core/column/column_const.h +++ b/be/src/core/column/column_const.h @@ -152,6 +152,10 @@ class ColumnConst final : public COWHelper { } void insert_many_from(const IColumn& src, size_t position, size_t length) override { + if (length == 0) { + return; + } + if (!is_column_const(src) || compare_at(0, 0, src, 0) != 0) { throw Exception( ErrorCode::INTERNAL_ERROR, diff --git a/be/src/core/column/column_decimal.cpp b/be/src/core/column/column_decimal.cpp index 4d9c3adac6f019..3a79c749c8d34e 100644 --- a/be/src/core/column/column_decimal.cpp +++ b/be/src/core/column/column_decimal.cpp @@ -372,6 +372,10 @@ void ColumnDecimal::insert_many_fix_len_data(const char* data_ptr, size_t num template void ColumnDecimal::insert_many_from(const IColumn& src, size_t position, size_t length) { + if (length == 0) { + return; + } + auto old_size = data.size(); data.resize(old_size + length); auto& vals = assert_cast(src).get_data(); diff --git a/be/src/core/column/column_nullable.cpp b/be/src/core/column/column_nullable.cpp index d3ce21fc8e39ab..9d57f2e323bd59 100644 --- a/be/src/core/column/column_nullable.cpp +++ b/be/src/core/column/column_nullable.cpp @@ -301,6 +301,10 @@ void ColumnNullable::insert_many_strings(const StringRef* strings, size_t num) { } void ColumnNullable::insert_many_from(const IColumn& src, size_t position, size_t length) { + if (length == 0) { + return; + } + const auto& nullable_col = assert_cast(src); get_null_map_column().insert_many_from(nullable_col.get_null_map_column(), position, length); get_nested_column().insert_many_from(*nullable_col._nested_column, position, length); diff --git a/be/src/core/column/column_string.cpp b/be/src/core/column/column_string.cpp index 399c0952b892da..fd68aaa5a287e2 100644 --- a/be/src/core/column/column_string.cpp +++ b/be/src/core/column/column_string.cpp @@ -195,6 +195,10 @@ void ColumnStr::insert_range_from(const IColumn& src, size_t start, size_t le template void ColumnStr::insert_many_from(const IColumn& src, size_t position, size_t length) { + if (length == 0) { + return; + } + const auto& string_column = assert_cast&>(src); auto [data_val, data_length] = string_column.get_data_at(position); diff --git a/be/src/core/column/column_struct.cpp b/be/src/core/column/column_struct.cpp index 9b0c5d2d27ad76..643904e38c3d1f 100644 --- a/be/src/core/column/column_struct.cpp +++ b/be/src/core/column/column_struct.cpp @@ -280,6 +280,10 @@ void ColumnStruct::insert_indices_from(const IColumn& src, const uint32_t* indic } void ColumnStruct::insert_many_from(const IColumn& src, size_t position, size_t length) { + if (length == 0) { + return; + } + const auto& src_concrete = assert_cast(src); for (size_t i = 0; i < columns.size(); ++i) { columns[i]->insert_many_from(src_concrete.get_column(i), position, length); diff --git a/be/src/core/column/column_vector.cpp b/be/src/core/column/column_vector.cpp index 3666d5b4720cca..68f75522f93324 100644 --- a/be/src/core/column/column_vector.cpp +++ b/be/src/core/column/column_vector.cpp @@ -498,6 +498,11 @@ size_t ColumnVector::filter(const IColumn::Filter& filter) { template void ColumnVector::insert_many_from(const IColumn& src, size_t position, size_t length) { + if (length == 0) { + // Empty insertion is a valid no-op even when position points one past the source end. + return; + } + materialize_external_data(); auto old_size = data.size(); data.resize(old_size + length); diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp index b1e1e792e632a8..c7acba9885a418 100644 --- a/be/src/exec/scan/olap_scanner.cpp +++ b/be/src/exec/scan/olap_scanner.cpp @@ -187,12 +187,9 @@ Status OlapScanner::_prepare_impl() { // set limit to reduce end of rowset and segment mem use _tablet_reader = std::make_unique(); - // Batch size is passed down to segment iterator. Use the scanner-selected storage batch - // instead of _parent->limit(): tiny LIMIT values would make segment iteration very slow. For a - // pure fixed-width slot-ref scan, Scanner may raise the storage batch up to the byte-budgeted - // session maximum so the reader directly produces large page-backed blocks; doing this in the - // storage reader preserves the zero-copy fixed-width page spans. - _tablet_reader->set_batch_size(_storage_read_batch_size(_state)); + // Batch size is passed down to segment iterator. Use session batch size instead of + // _parent->limit(): tiny LIMIT values would make segment iteration very slow. + _tablet_reader->set_batch_size(_state->batch_size()); // Adaptive batch size: pass byte-budget settings to the storage reader. // The reader still uses batch_size() as the row ceiling. _tablet_reader->set_preferred_block_size_bytes(_state->preferred_block_size_bytes()); diff --git a/be/src/exec/scan/scanner.cpp b/be/src/exec/scan/scanner.cpp index 7ae77553a22aa0..a52245899b8f4c 100644 --- a/be/src/exec/scan/scanner.cpp +++ b/be/src/exec/scan/scanner.cpp @@ -32,7 +32,6 @@ #include "exprs/vslot_ref.h" #include "runtime/descriptors.h" #include "runtime/runtime_profile.h" -#include "storage/segment/adaptive_block_size_predictor.h" #include "util/concurrency_stats.h" #include "util/defer_op.h" @@ -89,71 +88,28 @@ Status Scanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) { // columns. Record the source column ids once here so the hot projection path can move the // existing ColumnPtr instead of executing VSlotRef and then mutating it, which would clone // the column when the input block still shares the same pointer. - if (!_projections.empty() && _intermediate_projections.empty()) { - _direct_slot_ref_projection_column_ids.reserve(_projections.size()); + if (!_projections.empty() && _intermediate_projections.empty() && _conjuncts.empty() && + _limit <= 0 && _shared_scan_limit == nullptr && _output_row_descriptor != nullptr && + _output_row_descriptor->num_materialized_slots() == _projections.size()) { + std::vector direct_slot_ref_projection_column_ids; + direct_slot_ref_projection_column_ids.reserve(_projections.size()); for (const auto& projection : _projections) { auto slot_ref = std::dynamic_pointer_cast(projection->root()); if (slot_ref == nullptr) { - _direct_slot_ref_projection_column_ids.clear(); + direct_slot_ref_projection_column_ids.clear(); break; } - _direct_slot_ref_projection_column_ids.push_back(slot_ref->column_id()); + direct_slot_ref_projection_column_ids.push_back(slot_ref->column_id()); } - if (!_direct_slot_ref_projection_column_ids.empty() && _conjuncts.empty() && _limit <= 0 && - _shared_scan_limit == nullptr && _output_row_descriptor != nullptr && - _output_row_descriptor->num_materialized_slots() == - _direct_slot_ref_projection_column_ids.size()) { - size_t row_bytes = 0; - bool all_fixed_width = true; - for (const auto& tuple_desc : _output_row_descriptor->tuple_descriptors()) { - for (const auto* slot_desc : tuple_desc->slots()) { - const auto& type = slot_desc->get_data_type_ptr(); - if (!type->have_maximum_size_of_value()) { - all_fixed_width = false; - break; - } - row_bytes += std::max(1, type->get_size_of_value_in_memory()); - } - if (!all_fixed_width) { - break; - } - } - if (all_fixed_width && row_bytes > 0) { - // This enables OLAP storage to produce fewer, larger blocks for the hot arithmetic - // scan shape. Do it before page decoding rather than by merging blocks later: - // MutableBlock::merge() appends through generic Column APIs and would materialize - // the page-backed fixed-width spans that the storage decoder can otherwise forward - // without copying. Varlen and complex slots stay on the session batch size because - // their per-row memory is data-dependent and large blocks can amplify memory spikes. - _direct_slot_ref_projection_row_bytes = row_bytes; - } + if (!direct_slot_ref_projection_column_ids.empty()) { + _direct_slot_ref_projection_column_ids = + std::move(direct_slot_ref_projection_column_ids); } } return Status::OK(); } -int Scanner::_storage_read_batch_size(RuntimeState* state) const { - const int session_batch_size = state->batch_size(); - if (_direct_slot_ref_projection_row_bytes == 0) { - return session_batch_size; - } - - // Keep the same hard upper bound used by the segment adaptive reader and by the public - // batch_size contract. The byte budget decides whether a fixed-width scan can safely use that - // many rows; the session batch size remains the lower bound so this path never shrinks existing - // scans. - static constexpr size_t kMaxReadBatchRows = AdaptiveBlockSizePredictor::kDefaultBlockSizeRows; - const size_t rows_by_bytes = - state->preferred_block_size_bytes() / _direct_slot_ref_projection_row_bytes; - if (rows_by_bytes == 0) { - return session_batch_size; - } - const size_t target_rows = - std::max(session_batch_size, std::min(kMaxReadBatchRows, rows_by_bytes)); - return static_cast(target_rows); -} - Status Scanner::get_block_after_projects(RuntimeState* state, Block* block, bool* eos) { SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().vscanner_get_block); auto& row_descriptor = _local_state->_parent->row_descriptor(); diff --git a/be/src/exec/scan/scanner.h b/be/src/exec/scan/scanner.h index 05a0aecc056279..08eff265514130 100644 --- a/be/src/exec/scan/scanner.h +++ b/be/src/exec/scan/scanner.h @@ -111,8 +111,6 @@ class Scanner { // Subclass should implement this to return data. virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) = 0; - int _storage_read_batch_size(RuntimeState* state) const; - Status _merge_padding_block() { if (_padding_block.empty()) { _padding_block.swap(_origin_block); @@ -256,9 +254,6 @@ class Scanner { // Non-empty only when final projections are all VSlotRef and there is no intermediate CSE // projection. Values are source block column ids used by the no-copy projection fast path. std::vector _direct_slot_ref_projection_column_ids; - // Estimated output bytes per row for the pure slot-ref projection fast path. Non-zero only for - // fixed-width slots and simple scans where it is safe for OLAP storage to produce larger blocks. - size_t _direct_slot_ref_projection_row_bytes = 0; Block _origin_block; Block _padding_block; From 216d40bee9b3856448e33cde29b2004e73eb199e Mon Sep 17 00:00:00 2001 From: zhaochangle Date: Fri, 26 Jun 2026 10:53:29 +0800 Subject: [PATCH 4/6] [fix](be) Fix selector on page-backed vector columns ### What problem does this PR solve? Issue Number: N/A Related PR: #64296 Problem Summary: The zero-copy fixed-length column path can keep values in an external page instead of the local PODArray. Lazy materialization calls ColumnVector::filter_by_selector through SegmentIterator::copy_column_data_by_selector, but the inherited selector implementation read the local PODArray directly, so page-backed columns with an empty local data buffer could crash with PODArray::operator[] checks. This commit overrides ColumnVector::filter_by_selector to read through immutable_data(), giving local and external storage the same selector semantics. It also makes non-empty ColumnVector::insert_many_from from an invalid source row throw a Doris exception before materialization, replacing container-specific one-past UB/std::span assertions with an explicit contract, and updates the generic column test accordingly while preserving the historical Array debug-check skip. ### Release note None ### Check List (For Author) - Test: Unit Test - ./run-be-ut.sh --run --filter=ColumnIPTest.InsertManyFromTest:ColumnVectorTest.filter_by_selector_from_external_page -j 90 - ./run-be-ut.sh --run --filter='ColumnIPTest.InsertManyFromTest:ColumnVectorTest.*:ColumnArrayTest.InsertManyFromTest:ColumnNullableTest.*:ColumnStringTest.filter_by_selector:ColumnStringTest.insert_many_from:ColumnDictionaryTest.filter_by_selector:PredicateColumnTest.*' -j 90 - ./build.sh --be - git diff --check -- be/src/core/column/column_vector.h be/src/core/column/column_vector.cpp be/test/core/column/common_column_test.h be/test/core/column/column_vector_test.cpp - Behavior changed: No - Does this need documentation: No --- be/src/core/column/column_vector.cpp | 25 +++++++++++++++++++++- be/src/core/column/column_vector.h | 1 + be/test/core/column/column_vector_test.cpp | 22 ++++++++++++++++++- be/test/core/column/common_column_test.h | 22 +++++++++++++++---- 4 files changed, 64 insertions(+), 6 deletions(-) diff --git a/be/src/core/column/column_vector.cpp b/be/src/core/column/column_vector.cpp index 68f75522f93324..778bf6578d2286 100644 --- a/be/src/core/column/column_vector.cpp +++ b/be/src/core/column/column_vector.cpp @@ -496,6 +496,22 @@ size_t ColumnVector::filter(const IColumn::Filter& filter) { return new_size; } +template +Status ColumnVector::filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) { + const auto values = immutable_data(); + auto* col = assert_cast(col_ptr); + auto& res_data = col->get_data(); + const auto old_size = res_data.size(); + res_data.resize(old_size + sel_size); + for (size_t i = 0; i < sel_size; ++i) { + // Lazy materialization can filter a page-backed fixed-length column before it is + // materialized into `data`. Read through immutable_data() so both local PODArray storage + // and the single external page view have the same selector semantics. + res_data[old_size + i] = values[sel[i]]; + } + return Status::OK(); +} + template void ColumnVector::insert_many_from(const IColumn& src, size_t position, size_t length) { if (length == 0) { @@ -503,10 +519,17 @@ void ColumnVector::insert_many_from(const IColumn& src, size_t position, size return; } + auto vals = assert_cast(src).immutable_data(); + if (UNLIKELY(position >= vals.size())) { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Position {} is out of bound in ColumnVector::insert_many_from " + "method (data.size() = {}).", + position, vals.size()); + } + materialize_external_data(); auto old_size = data.size(); data.resize(old_size + length); - auto vals = assert_cast(src).immutable_data(); std::fill(&data[old_size], &data[old_size + length], vals[position]); } diff --git a/be/src/core/column/column_vector.h b/be/src/core/column/column_vector.h index 63dab3432d4dbd..5cfb3e5bc47678 100644 --- a/be/src/core/column/column_vector.h +++ b/be/src/core/column/column_vector.h @@ -417,6 +417,7 @@ class ColumnVector final : public COWHelper> { ColumnPtr filter(const IColumn::Filter& filt, ssize_t result_size_hint) const override; size_t filter(const IColumn::Filter& filter) override; + Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) override; MutableColumnPtr permute(const IColumn::Permutation& perm, size_t limit) const override; diff --git a/be/test/core/column/column_vector_test.cpp b/be/test/core/column/column_vector_test.cpp index 1517a2df37046c..945189309590ad 100644 --- a/be/test/core/column/column_vector_test.cpp +++ b/be/test/core/column/column_vector_test.cpp @@ -20,6 +20,8 @@ #include #include +#include +#include #include "core/column/column.h" #include "core/column/common_column_test.h" @@ -438,6 +440,24 @@ TEST_F(ColumnVectorTest, pop_back) { TEST_F(ColumnVectorTest, filter) { _column_vector_common_test(assert_column_vector_filter_callback); } + +TEST_F(ColumnVectorTest, filter_by_selector_from_external_page) { + auto source = ColumnInt32::create(); + auto owner = + std::make_shared>(std::initializer_list {10, 20, 30, 40}); + source->insert_many_fix_len_data_with_owner(reinterpret_cast(owner->data()), + owner->size(), owner); + + uint16_t selector[] = {3, 1, 3}; + auto result = ColumnInt32::create(); + ASSERT_TRUE(source->filter_by_selector(selector, 3, result.get()).ok()); + + ASSERT_EQ(result->size(), 3); + EXPECT_EQ(result->get_element(0), 40); + EXPECT_EQ(result->get_element(1), 20); + EXPECT_EQ(result->get_element(2), 40); +} + TEST_F(ColumnVectorTest, get_permutation) { assert_column_permutations2(*column_int8, dt_int8); assert_column_permutations2(*column_int16, dt_int16); @@ -570,4 +590,4 @@ TEST_F(ColumnVectorTest, ScalaTypeInt32Test2erase) { } } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/test/core/column/common_column_test.h b/be/test/core/column/common_column_test.h index 149776b238a485..8a614e146f25c0 100644 --- a/be/test/core/column/common_column_test.h +++ b/be/test/core/column/common_column_test.h @@ -691,14 +691,28 @@ class CommonColumnTest : public ::testing::Test { source_column->size() + 1, (source_column->size() + 1) >> 1}; for (auto pos = check_start_pos.begin(); pos < check_start_pos.end(); ++pos) { + const bool is_array_column = is_column( + remove_nullable( + static_cast(source_column.get())->get_ptr()) + .get()); if (*pos > source_column->size() || *cl > source_column->size()) { // insert_range_from now we have no any exception error data to handle, so here will meet crash continue; + } else if (*pos >= source_column->size() && *cl > 0) { + if (is_array_column) { + // ColumnArray still treats invalid offset ranges as debug-check-only + // inputs, so keep the historical skip for this generic stress case. + continue; + } + target_column->clear(); + // insert_many_from repeats one existing source row. A non-empty repeat from + // one-past-the-end used to hit container-specific UB; keep the test aligned + // with the explicit Doris exception contract. + EXPECT_THROW(target_column->insert_many_from(*source_column, *pos, *cl), + doris::Exception); + continue; } else if (*pos + *cl > source_column->size()) { - if (is_column( - remove_nullable(static_cast(source_column.get()) - ->get_ptr()) - .get())) { + if (is_array_column) { // insert_range_from in array has DCHECK_LG continue; } From 90e37eaa5db690774a302887d734412fd53c6b1f Mon Sep 17 00:00:00 2001 From: zhaochangle Date: Sat, 27 Jun 2026 19:07:29 +0800 Subject: [PATCH 5/6] [fix](be) Resolve zero-copy selector conflict Problem Summary: Upstream master added an inline ColumnVector::filter_by_selector implementation, while the zero-copy branch still carried a separate declaration and out-of-line template definition. The merged PR source therefore hit a duplicate member declaration and failed TeamCity compile. Fold the page-backed read path into the upstream inline implementation and remove the duplicate implementation. Release note: None Test Plan: PATH=/mnt/disk6/common/ldb_toolchain_toucan/bin:/mnt/disk3/zhaochangle/.bun/bin:/mnt/disk3/zhaochangle/.opencode/bin:/mnt/disk3/zhaochangle/.local/bin:/mnt/disk6/common/apache-maven-3.9.14/bin:/mnt/disk6/common/ldb_toolchain_028/bin:/mnt/disk6/common/jdk-17.0.16/bin:/mnt/disk6/common/node-v24.14.1-linux-x64/bin:/mnt/disk6/common/apache-maven-3.9.14/bin:/mnt/disk6/common/ldb_toolchain_028/bin:/mnt/disk6/common/jdk-17.0.16/bin:/mnt/disk6/common/node-v24.14.1-linux-x64/bin:/mnt/disk6/common/node-v24.14.1-linux-x64/lib/node_modules/@openai/codex/node_modules/@openai/codex-linux-x64/vendor/x86_64-unknown-linux-musl/codex-path:/mnt/disk3/zhaochangle/.codex/tmp/arg0/codex-arg03yyoos:/mnt/disk3/zhaochangle/.bun/bin:/mnt/disk3/zhaochangle/.opencode/bin:/mnt/disk3/zhaochangle/.local/bin:/mnt/disk6/common/apache-maven-3.9.14/bin:/mnt/disk6/common/ldb_toolchain_028/bin:/mnt/disk6/common/jdk-17.0.16/bin:/mnt/disk6/common/node-v24.14.1-linux-x64/bin:/mnt/disk6/common/apache-maven-3.9.14/bin:/mnt/disk6/common/ldb_toolchain_028/bin:/mnt/disk6/common/jdk-17.0.16/bin:/mnt/disk6/common/node-v24.14.1-linux-x64/bin:/mnt/disk3/zhaochangle/.opencode/bin:/mnt/disk3/zhaochangle/.local/bin:/mnt/disk3/zhaochangle/.local/bin:/mnt/disk3/zhaochangle/bin:/mnt/disk6/common/apache-maven-3.9.14/bin:/mnt/disk6/common/ldb_toolchain_028/bin:/mnt/disk6/common/jdk-17.0.16/bin:/mnt/disk6/common/node-v24.14.1-linux-x64/bin:/usr/share/Modules/bin:/usr/lib64/ccache:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/opt/tableau/tableau_server/packages/customer-bin.20251.25.0520.1026 build-support/clang-format.sh Test Plan: git diff --check -- be/src/core/column/column_vector.h be/src/core/column/column_vector.cpp be/test/core/column/common_column_test.h be/test/core/column/column_vector_test.cpp Test Plan: ./build.sh --be Test Plan: ./run-be-ut.sh --run --filter='ColumnIPTest.InsertManyFromTest:ColumnVectorTest.*:ColumnArrayTest.InsertManyFromTest:ColumnNullableTest.*:ColumnStringTest.filter_by_selector:ColumnStringTest.insert_many_from:ColumnDictionaryTest.filter_by_selector:PredicateColumnTest.*' -j 90 --- be/src/core/column/column_vector.cpp | 16 ---------------- be/src/core/column/column_vector.h | 8 +++++--- 2 files changed, 5 insertions(+), 19 deletions(-) diff --git a/be/src/core/column/column_vector.cpp b/be/src/core/column/column_vector.cpp index 778bf6578d2286..ee28c601f03b3e 100644 --- a/be/src/core/column/column_vector.cpp +++ b/be/src/core/column/column_vector.cpp @@ -496,22 +496,6 @@ size_t ColumnVector::filter(const IColumn::Filter& filter) { return new_size; } -template -Status ColumnVector::filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) { - const auto values = immutable_data(); - auto* col = assert_cast(col_ptr); - auto& res_data = col->get_data(); - const auto old_size = res_data.size(); - res_data.resize(old_size + sel_size); - for (size_t i = 0; i < sel_size; ++i) { - // Lazy materialization can filter a page-backed fixed-length column before it is - // materialized into `data`. Read through immutable_data() so both local PODArray storage - // and the single external page view have the same selector semantics. - res_data[old_size + i] = values[sel[i]]; - } - return Status::OK(); -} - template void ColumnVector::insert_many_from(const IColumn& src, size_t position, size_t length) { if (length == 0) { diff --git a/be/src/core/column/column_vector.h b/be/src/core/column/column_vector.h index 5cfb3e5bc47678..0769eb4a992adf 100644 --- a/be/src/core/column/column_vector.h +++ b/be/src/core/column/column_vector.h @@ -336,13 +336,17 @@ class ColumnVector final : public COWHelper> { } Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) override { + const auto values = immutable_data(); Self* output = assert_cast(col_ptr); auto& res_data = output->get_data(); DCHECK(res_data.empty()) << "filter_by_selector requires the destination column to be empty"; res_data.resize(sel_size); for (size_t i = 0; i < sel_size; i++) { - res_data[i] = data[sel[i]]; + // A lazily decoded fixed-length column may still point at one external page. Read + // through immutable_data() so selector filtering has the same semantics before and + // after materialization. + res_data[i] = values[sel[i]]; } return Status::OK(); } @@ -417,8 +421,6 @@ class ColumnVector final : public COWHelper> { ColumnPtr filter(const IColumn::Filter& filt, ssize_t result_size_hint) const override; size_t filter(const IColumn::Filter& filter) override; - Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) override; - MutableColumnPtr permute(const IColumn::Permutation& perm, size_t limit) const override; StringRef get_raw_data() const override { From 29ccfb50c2cafff0bfd3ac59b3754824175e5997 Mon Sep 17 00:00:00 2001 From: zhaochangle Date: Mon, 29 Jun 2026 01:05:37 +0800 Subject: [PATCH 6/6] [fix](be) Fix zero-copy column reuse edge cases ### What problem does this PR solve? Issue Number: None Related PR: #64296 Problem Summary: The zero-copy fixed-length column path exposed two edge cases in CI. HeapSorterTest read an input block after append_block(), but the sorter can clear that input during partial sorting, so the test now captures the expected value before transfer. In projection nullable promotion, a reused nullable output column appended non-null nested values but resized its null map only to the current batch row count; when the output column already had rows this produced mismatched nested/null-map sizes such as BOOL 8 vs null map 4. The fix grows the null map to the appended nested size and keeps the zero-copy fixed-length raw-data API in byte units. ### Release note None ### Check List (For Author) - Test: Unit Test - ./run-be-ut.sh --run --filter=HeapSorterTest.test_topn_sorter1:ColumnVectorTest.filter_by_selector_from_external_page:ColumnVectorTest.raw_data_and_insert_indices_from_external_page -j 90 - PATH=/mnt/disk6/common/ldb_toolchain_toucan/bin:/mnt/disk3/zhaochangle/.bun/bin:/mnt/disk3/zhaochangle/.opencode/bin:/mnt/disk3/zhaochangle/.local/bin:/mnt/disk6/common/apache-maven-3.9.14/bin:/mnt/disk6/common/ldb_toolchain_028/bin:/mnt/disk6/common/jdk-17.0.16/bin:/mnt/disk6/common/node-v24.14.1-linux-x64/bin:/mnt/disk6/common/apache-maven-3.9.14/bin:/mnt/disk6/common/ldb_toolchain_028/bin:/mnt/disk6/common/jdk-17.0.16/bin:/mnt/disk6/common/node-v24.14.1-linux-x64/bin:/mnt/disk6/common/node-v24.14.1-linux-x64/lib/node_modules/@openai/codex/node_modules/@openai/codex-linux-x64/vendor/x86_64-unknown-linux-musl/codex-path:/mnt/disk3/zhaochangle/.codex/tmp/arg0/codex-arg03yyoos:/mnt/disk3/zhaochangle/.bun/bin:/mnt/disk3/zhaochangle/.opencode/bin:/mnt/disk3/zhaochangle/.local/bin:/mnt/disk6/common/apache-maven-3.9.14/bin:/mnt/disk6/common/ldb_toolchain_028/bin:/mnt/disk6/common/jdk-17.0.16/bin:/mnt/disk6/common/node-v24.14.1-linux-x64/bin:/mnt/disk6/common/apache-maven-3.9.14/bin:/mnt/disk6/common/ldb_toolchain_028/bin:/mnt/disk6/common/jdk-17.0.16/bin:/mnt/disk6/common/node-v24.14.1-linux-x64/bin:/mnt/disk3/zhaochangle/.opencode/bin:/mnt/disk3/zhaochangle/.local/bin:/mnt/disk3/zhaochangle/.local/bin:/mnt/disk3/zhaochangle/bin:/mnt/disk6/common/apache-maven-3.9.14/bin:/mnt/disk6/common/ldb_toolchain_028/bin:/mnt/disk6/common/jdk-17.0.16/bin:/mnt/disk6/common/node-v24.14.1-linux-x64/bin:/usr/share/Modules/bin:/usr/lib64/ccache:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/opt/tableau/tableau_server/packages/customer-bin.20251.25.0520.1026 build-support/check-format.sh - ./run-regression-test.sh --run -d query_p0/cte -s test_cte_multicast_complex (did not reach SQL locally because the worktree has no listening FE; JDBC failed with Connection refused) - Behavior changed: No - Does this need documentation: No --- be/src/core/column/column_vector.cpp | 5 +++-- be/src/core/column/column_vector.h | 3 ++- be/src/exec/operator/operator.cpp | 8 +++++++- be/test/core/column/column_vector_test.cpp | 22 ++++++++++++++++++++++ be/test/exec/sort/heap_sorter_test.cpp | 8 +++++--- 5 files changed, 39 insertions(+), 7 deletions(-) diff --git a/be/src/core/column/column_vector.cpp b/be/src/core/column/column_vector.cpp index ee28c601f03b3e..04e30909761a60 100644 --- a/be/src/core/column/column_vector.cpp +++ b/be/src/core/column/column_vector.cpp @@ -378,6 +378,8 @@ void ColumnVector::insert_indices_from(const IColumn& src, const uint32_t* in auto new_size = indices_end - indices_begin; materialize_external_data(); data.resize(origin_size + new_size); + const auto src_values = + assert_cast(src).immutable_data(); auto copy = [](const value_type* __restrict src, value_type* __restrict dest, const uint32_t* __restrict begin, const uint32_t* __restrict end) { @@ -386,8 +388,7 @@ void ColumnVector::insert_indices_from(const IColumn& src, const uint32_t* in ++dest; } }; - copy(reinterpret_cast(src.get_raw_data().data), data.data() + origin_size, - indices_begin, indices_end); + copy(src_values.data(), data.data() + origin_size, indices_begin, indices_end); } template diff --git a/be/src/core/column/column_vector.h b/be/src/core/column/column_vector.h index 0769eb4a992adf..143d992df5b89b 100644 --- a/be/src/core/column/column_vector.h +++ b/be/src/core/column/column_vector.h @@ -425,7 +425,8 @@ class ColumnVector final : public COWHelper> { StringRef get_raw_data() const override { auto values = immutable_data(); - return StringRef(reinterpret_cast(values.data()), values.size()); + return StringRef(reinterpret_cast(values.data()), + values.size() * sizeof(value_type)); } bool structure_equals(const IColumn& rhs) const override { diff --git a/be/src/exec/operator/operator.cpp b/be/src/exec/operator/operator.cpp index 1f255078b5d6cf..92fa16cb10fa72 100644 --- a/be/src/exec/operator/operator.cpp +++ b/be/src/exec/operator/operator.cpp @@ -358,8 +358,14 @@ Status OperatorXBase::do_projections(RuntimeState* state, Block* origin_block, if (is_column_nullable(*to) && !is_column_nullable(*from)) { if (_keep_origin || !from->is_exclusive()) { auto& null_column = reinterpret_cast(*to); + const auto origin_size = null_column.size(); null_column.get_nested_column().insert_range_from(*from, 0, rows); - null_column.get_null_map_column().get_data().resize_fill(rows, 0); + // Nullable promotion appends non-null values into the nested column, so the + // null map must grow to the appended nested size instead of being reset to this + // batch's row count. Reused output columns may already contain rows. + null_column.get_null_map_column().get_data().resize_fill(origin_size + rows, 0); + DCHECK_EQ(null_column.get_nested_column().size(), + null_column.get_null_map_column().size()); bytes_usage += null_column.allocated_bytes(); } else { to = make_nullable(from, false)->assert_mutable(); diff --git a/be/test/core/column/column_vector_test.cpp b/be/test/core/column/column_vector_test.cpp index 945189309590ad..27dfb30f01f8c0 100644 --- a/be/test/core/column/column_vector_test.cpp +++ b/be/test/core/column/column_vector_test.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -458,6 +459,27 @@ TEST_F(ColumnVectorTest, filter_by_selector_from_external_page) { EXPECT_EQ(result->get_element(2), 40); } +TEST_F(ColumnVectorTest, raw_data_and_insert_indices_from_external_page) { + auto source = ColumnInt32::create(); + auto owner = + std::make_shared>(std::initializer_list {10, 20, 30, 40}); + source->insert_many_fix_len_data_with_owner(reinterpret_cast(owner->data()), + owner->size(), owner); + + const auto raw_data = source->get_raw_data(); + ASSERT_EQ(raw_data.size, owner->size() * sizeof(int32_t)); + EXPECT_EQ(std::memcmp(raw_data.data, owner->data(), raw_data.size), 0); + + uint32_t indices[] = {2, 0, 3}; + auto result = ColumnInt32::create(); + result->insert_indices_from(*source, indices, indices + 3); + + ASSERT_EQ(result->size(), 3); + EXPECT_EQ(result->get_element(0), 30); + EXPECT_EQ(result->get_element(1), 10); + EXPECT_EQ(result->get_element(2), 40); +} + TEST_F(ColumnVectorTest, get_permutation) { assert_column_permutations2(*column_int8, dt_int8); assert_column_permutations2(*column_int16, dt_int16); diff --git a/be/test/exec/sort/heap_sorter_test.cpp b/be/test/exec/sort/heap_sorter_test.cpp index 9c91db2e5e3833..b0e763be31ba51 100644 --- a/be/test/exec/sort/heap_sorter_test.cpp +++ b/be/test/exec/sort/heap_sorter_test.cpp @@ -86,14 +86,16 @@ TEST_F(HeapSorterTest, test_topn_sorter1) { { Block block = ColumnHelper::create_block({6}, {6}); + // append_block may clear the input block after partial sorting, so keep the expected value + // before transferring the block into the sorter. + Field real; + block.get_by_position(0).column->get(0, real); auto st = sorter->append_block(&block); EXPECT_TRUE(st.ok()); EXPECT_EQ(sorter->_queue_row_num, 6); auto value = sorter->get_top_value(); - Field real; - block.get_by_position(0).column->get(0, real); EXPECT_EQ(value, real); } @@ -117,4 +119,4 @@ TEST_F(HeapSorterTest, test_topn_sorter1) { } } -} // namespace doris \ No newline at end of file +} // namespace doris