Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/paimon/predicate/leaf_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ enum class FieldType;
/// Leaf node of a `Predicate` tree. Compares a field with literals.
class PAIMON_EXPORT LeafPredicate : virtual public Predicate {
public:
/// The field's position in the schema this predicate is currently bound to.
/// At construction the value reflects the schema the caller supplied to
/// `PredicateBuilder`; predicates obtained from `InternalReadContext::GetPredicate()`
/// have already been projected onto the read schema (see
/// `InternalReadContext::GetPredicate()` for the projection semantics).
int32_t FieldIndex() const {
return field_index_;
}
Expand Down
23 changes: 19 additions & 4 deletions include/paimon/predicate/predicate_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,28 @@ enum class FieldType;
///
/// PredicateBuilder provides static factory methods to create various types of predicates
/// that can be used for filtering data in Paimon tables.
///
/// The `field_index` parameter accepted by every factory method is the position of the
/// field in the schema the caller is working with — typically the latest table schema.
/// When the resulting predicate is later attached to an `InternalReadContext`,
/// `InternalReadContext::Create` projects each leaf onto the read schema (mirrors
/// paimon Java's `PredicateProjectionConverter`): leaf field indices are rewritten via
/// the table-schema → read-schema position mapping (keyed by the stable paimon field
/// id, so it survives column renames), and leaves / OR branches whose fields are not
/// in the projection are dropped. Callers therefore do not need to track projection
/// state themselves. `field_name` is informational (used for debug / display) and
/// does not participate in projection lookup.
class PAIMON_EXPORT PredicateBuilder {
public:
PredicateBuilder() = delete;
~PredicateBuilder() = delete;

/// Create an equality predicate (field == literal).
///
/// @param field_index The index of the field in read schema (0-based).
/// @param field_name The name of the field.
/// @param field_index The position of the field in the schema the caller is working
/// with (0-based); projected onto the read schema by
/// `InternalReadContext::Create`. See class doc for details.
/// @param field_name The name of the field (informational; see class doc).
/// @param field_type The data type of the field.
/// @param literal The literal value to compare against.
/// @return A shared pointer to the created Predicate object.
Expand Down Expand Up @@ -99,8 +112,10 @@ class PAIMON_EXPORT PredicateBuilder {
///
/// Tests whether the field value falls within the specified range (inclusive on both ends).
///
/// @param field_index The index of the field in read schema (0-based).
/// @param field_name The name of the field.
/// @param field_index The position of the field in the schema the caller is working
/// with (0-based); projected onto the read schema by
/// `InternalReadContext::Create`. See class doc for details.
/// @param field_name The name of the field (informational; see class doc).
/// @param field_type The data type of the field.
/// @param included_lower_bound The lower bound of the range (inclusive).
/// @param included_upper_bound The upper bound of the range (inclusive).
Expand Down
7 changes: 7 additions & 0 deletions include/paimon/read_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ class PAIMON_EXPORT ReadContextBuilder {
/// It can significantly improve performance by reducing the amount of data
/// that needs to be read and processed.
///
/// The caller should construct the predicate against the latest table schema.
/// `InternalReadContext::Create` projects each leaf onto the read schema
/// (mirroring paimon Java's `PredicateProjectionConverter`): leaf field indices
/// are rewritten to positions in the read schema, and AND children / OR branches
/// whose fields are not in the projection are pruned. The predicate therefore
/// does not need to be projection-aware.
///
/// @param predicate Shared pointer to the predicate for data filtering.
/// @return Reference to this builder for method chaining.
ReadContextBuilder& SetPredicate(const std::shared_ptr<Predicate>& predicate);
Expand Down
2 changes: 0 additions & 2 deletions src/paimon/common/predicate/predicate_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
namespace paimon {
enum class FieldType;

// TODO(xinyu.lxy): predicate field_index use index in read schema now, but java paimon use index
// in file schema
std::shared_ptr<Predicate> PredicateBuilder::Equal(int32_t field_index,
const std::string& field_name,
const FieldType& field_type,
Expand Down
148 changes: 135 additions & 13 deletions src/paimon/core/operation/internal_read_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,124 @@

#include "paimon/core/operation/internal_read_context.h"

#include <optional>
#include <utility>

#include "paimon/common/predicate/compound_predicate_impl.h"
#include "paimon/common/predicate/leaf_predicate_impl.h"
#include "paimon/common/predicate/predicate_validator.h"
#include "paimon/common/table/special_fields.h"
#include "paimon/common/types/data_field.h"
#include "paimon/core/schema/arrow_schema_validator.h"
#include "paimon/predicate/function.h"
#include "paimon/status.h"

namespace arrow {
class Schema;
} // namespace arrow

namespace paimon {
namespace {
// Build a map from a field's position in `table_schema` (the latest table schema, the
// index space upstream predicates are typically constructed against) to its position
// in `read_data_fields` (the projected read schema). Field identity is the stable
// field id, so the mapping survives column renames within the same schema. Read-only
// special fields (RowId / SequenceNumber / etc.) have no analogue in the table
// schema and are skipped — user-supplied predicates do not reference them.
std::map<int32_t, int32_t> BuildLatestToReadIdxMapping(
const TableSchema& table_schema, const std::vector<DataField>& read_data_fields) {
std::map<int32_t, int32_t> id_to_latest_idx;
const auto& table_fields = table_schema.Fields();
for (size_t latest_idx = 0; latest_idx < table_fields.size(); latest_idx++) {
id_to_latest_idx[table_fields[latest_idx].Id()] = static_cast<int32_t>(latest_idx);
}
std::map<int32_t, int32_t> mapping;
for (size_t read_idx = 0; read_idx < read_data_fields.size(); read_idx++) {
auto iter = id_to_latest_idx.find(read_data_fields[read_idx].Id());
if (iter != id_to_latest_idx.end()) {
mapping[iter->second] = static_cast<int32_t>(read_idx);
}
}
return mapping;
}

// Project `predicate` onto the read schema, rewriting each leaf's field index via
// `latest_to_read_idx` (predicate's source index space → read schema position).
//
// Inclusive semantics, matching paimon Java's PredicateProjectionConverter:
// - Leaf whose field is not in the read schema: dropped (returns nullopt).
// - AND: drop non-projectable children, keep the rest. Safe because if `A AND B`
// holds for a row, A holds too; the projected predicate is a superset
// (necessary, not sufficient).
// - OR: every child must be projectable. If any child is dropped, drop the
// whole OR — otherwise a row that only satisfied the dropped branch would
// falsely pass.
// - Predicates without a field index (e.g. full-text / vector search) flow
// through unchanged.
Result<std::optional<std::shared_ptr<Predicate>>> ProjectPredicate(
const std::map<int32_t, int32_t>& latest_to_read_idx,
const std::shared_ptr<Predicate>& predicate) {
if (auto leaf = std::dynamic_pointer_cast<LeafPredicateImpl>(predicate)) {
auto iter = latest_to_read_idx.find(leaf->FieldIndex());
if (iter == latest_to_read_idx.end()) {
return std::optional<std::shared_ptr<Predicate>>{};
}
if (iter->second == leaf->FieldIndex()) {
return std::optional<std::shared_ptr<Predicate>>{predicate};
}
return std::optional<std::shared_ptr<Predicate>>{
std::static_pointer_cast<Predicate>(leaf->NewLeafPredicate(iter->second))};
}
if (auto compound = std::dynamic_pointer_cast<CompoundPredicateImpl>(predicate)) {
const bool is_and = compound->GetFunction().GetType() == Function::Type::AND;
std::vector<std::shared_ptr<Predicate>> projected_children;
projected_children.reserve(compound->Children().size());
bool any_changed = false;
for (const auto& child : compound->Children()) {
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<Predicate>> projected_child,
ProjectPredicate(latest_to_read_idx, child));
if (!projected_child.has_value()) {
if (!is_and) {
return std::optional<std::shared_ptr<Predicate>>{};
}
any_changed = true;
continue;
}
if (projected_child.value() != child) {
any_changed = true;
}
projected_children.push_back(std::move(projected_child.value()));
}
if (projected_children.empty()) {
return std::optional<std::shared_ptr<Predicate>>{};
}
if (projected_children.size() == 1) {
return std::optional<std::shared_ptr<Predicate>>{std::move(projected_children[0])};
}
if (!any_changed) {
return std::optional<std::shared_ptr<Predicate>>{predicate};
}
return std::optional<std::shared_ptr<Predicate>>{std::static_pointer_cast<Predicate>(
compound->NewCompoundPredicate(projected_children))};
}
return std::optional<std::shared_ptr<Predicate>>{predicate};
}

Result<std::shared_ptr<Predicate>> ProjectAndValidatePredicate(
const arrow::Schema& read_schema, const std::map<int32_t, int32_t>& latest_to_read_idx,
const std::shared_ptr<Predicate>& predicate) {
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<Predicate>> projected,
ProjectPredicate(latest_to_read_idx, predicate));
if (!projected.has_value()) {
return std::shared_ptr<Predicate>{};
}
PAIMON_RETURN_NOT_OK(PredicateValidator::ValidatePredicateWithSchema(
read_schema, projected.value(), /*validate_field_idx=*/true));
PAIMON_RETURN_NOT_OK(PredicateValidator::ValidatePredicateWithLiterals(projected.value()));
return projected.value();
}
} // namespace

Result<std::unique_ptr<InternalReadContext>> InternalReadContext::Create(
const std::shared_ptr<ReadContext>& context, const std::shared_ptr<TableSchema>& table_schema,
const std::map<std::string, std::string>& options) {
Expand Down Expand Up @@ -95,34 +200,51 @@ Result<std::unique_ptr<InternalReadContext>> InternalReadContext::Create(
auto read_schema = DataField::ConvertDataFieldsToArrowSchema(read_data_fields);
// validate read schema to avoid redundant fields
PAIMON_RETURN_NOT_OK(ArrowSchemaValidator::ValidateSchemaWithFieldId(*read_schema));
// validate predicate
// Project the upstream predicate onto `read_schema`. Predicates carry field indices
// pointing into the latest table schema; rewrite them to positions in `read_schema_`
// so downstream readers can apply them directly.
std::shared_ptr<Predicate> projected_predicate;
if (context->GetPredicate()) {
PAIMON_RETURN_NOT_OK(PredicateValidator::ValidatePredicateWithSchema(
*read_schema, context->GetPredicate(), /*validate_field_idx=*/true));
PAIMON_RETURN_NOT_OK(
PredicateValidator::ValidatePredicateWithLiterals(context->GetPredicate()));
auto latest_to_read_idx = BuildLatestToReadIdxMapping(*table_schema, read_data_fields);
PAIMON_ASSIGN_OR_RAISE(
projected_predicate,
ProjectAndValidatePredicate(*read_schema, latest_to_read_idx, context->GetPredicate()));
}

return std::unique_ptr<InternalReadContext>(
new InternalReadContext(context, table_schema, read_schema, core_options));
return std::unique_ptr<InternalReadContext>(new InternalReadContext(
context, table_schema, read_schema, core_options, std::move(projected_predicate)));
}

InternalReadContext::InternalReadContext(const std::shared_ptr<ReadContext>& read_context,
const std::shared_ptr<TableSchema>& table_schema,
const std::shared_ptr<arrow::Schema>& read_schema,
const CoreOptions& options)
const CoreOptions& options,
std::shared_ptr<Predicate> projected_predicate)
: read_context_(read_context),
table_schema_(table_schema),
read_schema_(read_schema),
options_(options) {}
options_(options),
projected_predicate_(std::move(projected_predicate)) {}

Result<std::shared_ptr<InternalReadContext>> InternalReadContext::CreateWithSchema(
const std::shared_ptr<InternalReadContext>& original,
const std::shared_ptr<arrow::Schema>& new_read_schema) {
// Create a new InternalReadContext sharing all properties except read_schema.
// The new read_schema is the minimal column set for COUNT(*).
return std::shared_ptr<InternalReadContext>(new InternalReadContext(
original->read_context_, original->table_schema_, new_read_schema, original->options_));
// The wrapped read_context still holds the caller-supplied predicate (in the latest
// table schema's index space). Re-project it against `new_read_schema` directly,
// rather than re-projecting the already-projected predicate sitting on `original`.
std::shared_ptr<Predicate> projected_predicate;
if (original->read_context_->GetPredicate()) {
PAIMON_ASSIGN_OR_RAISE(std::vector<DataField> new_read_data_fields,
DataField::ConvertArrowSchemaToDataFields(new_read_schema));
auto latest_to_read_idx =
BuildLatestToReadIdxMapping(*original->table_schema_, new_read_data_fields);
PAIMON_ASSIGN_OR_RAISE(projected_predicate, ProjectAndValidatePredicate(
*new_read_schema, latest_to_read_idx,
original->read_context_->GetPredicate()));
}
return std::shared_ptr<InternalReadContext>(
new InternalReadContext(original->read_context_, original->table_schema_, new_read_schema,
original->options_, std::move(projected_predicate)));
}

} // namespace paimon
13 changes: 11 additions & 2 deletions src/paimon/core/operation/internal_read_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,16 @@ class InternalReadContext {
const std::vector<std::string>& GetPrimaryKeys() const {
return table_schema_->PrimaryKeys();
}
// Returns the predicate projected onto `read_schema_`. Upstream constructs predicates
// against the latest table schema, so when the query projects a subset of columns
// the leaf field indices no longer match the projected schema. The projection is
// done once at context construction (mirrors paimon Java's
// `PredicateProjectionConverter`): leaf indices are rewritten via the table-schema
// → read-schema mapping, AND children whose fields are absent from the read schema
// are dropped (inclusive), and OR is dropped wholesale if any of its children is not
// projectable. May be nullptr if the entire predicate is non-projectable.
const std::shared_ptr<Predicate>& GetPredicate() const {
return read_context_->GetPredicate();
return projected_predicate_;
}
bool EnablePredicateFilter() const {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class comment in predicate_builder.h clearly states that field_index is “the index of the field in the read schema (0-based)”. If we need to change its behavior, could you please also review the comments carefully and update them all accordingly?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the current change based on the assumption that the field_idx provided in the user predicate is not meaningful, since it will be rebuilt in InternalReadContext anyway?

return read_context_->EnablePredicateFilter();
Expand Down Expand Up @@ -110,12 +118,13 @@ class InternalReadContext {
InternalReadContext(const std::shared_ptr<ReadContext>& read_context,
const std::shared_ptr<TableSchema>& table_schema,
const std::shared_ptr<arrow::Schema>& read_schema,
const CoreOptions& options);
const CoreOptions& options, std::shared_ptr<Predicate> projected_predicate);

std::shared_ptr<ReadContext> read_context_;
std::shared_ptr<TableSchema> table_schema_;
std::shared_ptr<arrow::Schema> read_schema_;
CoreOptions options_;
std::shared_ptr<Predicate> projected_predicate_;
};

} // namespace paimon
Loading
Loading