Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,29 @@ default boolean supportsMetadataFilterPushDown() {

/**
* Provides a list of metadata filters in conjunctive form. A source can pick filters and return
* the accepted and remaining filters. Same contract as {@link
* SupportsFilterPushDown#applyFilters(List)}, but for metadata columns.
* the accepted and remaining filters.
*
* <p><b>Identity contract:</b> The source MUST return the exact same {@link ResolvedExpression}
* instances it received — no copies, no wrappers, no rebuilds. The planner uses instance
* identity (not {@code equals}) to correlate returned expressions back to their original
* positions. Typical implementations partition the input list:
*
* <pre>{@code
* return MetadataFilterResult.of(
* metadataFilters.subList(0, accepted),
* metadataFilters.subList(accepted, metadataFilters.size()));
* }</pre>
*
* <p>The provided filters reference metadata key names (from {@link #listReadableMetadata()}),
* not SQL column aliases. For example, a column declared as {@code msg_offset BIGINT METADATA
* FROM 'offset'} will have its predicate expressed as {@code offset >= 1000}, not {@code
* msg_offset >= 1000}. The planner handles the alias-to-key translation before calling this
* method.
*
* <p>Acceptance must be decided per predicate, independent of which other predicates are in the
* list. On compiled-plan restore only the previously-accepted subset is re-presented, so a
* source that accepts one predicate only when another is also present would fail to re-accept
* it.
*/
default MetadataFilterResult applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
return MetadataFilterResult.of(Collections.emptyList(), metadataFilters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@

package org.apache.flink.table.planner.plan.rules.logical;

import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;

Expand All @@ -32,6 +30,10 @@
import org.apache.calcite.rex.RexProgramBuilder;
import org.apache.calcite.tools.RelBuilder;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import scala.Tuple2;

/**
Expand Down Expand Up @@ -63,8 +65,7 @@ public boolean matches(RelOptRuleCall call) {

FlinkLogicalTableSourceScan scan = call.rel(1);
TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
// we can not push filter twice
return canPushdownFilter(tableSourceTable);
return canPushdownFilter(tableSourceTable) || canPushdownMetadataFilter(tableSourceTable);
}

@Override
Expand All @@ -79,7 +80,7 @@ private void pushFilterIntoScan(
RelOptRuleCall call,
Calc calc,
FlinkLogicalTableSourceScan scan,
FlinkPreparingTableBase relOptTable) {
TableSourceTable tableSourceTable) {

RexProgram originProgram = calc.getProgram();

Expand All @@ -94,33 +95,29 @@ private void pushFilterIntoScan(
RexNode[] convertiblePredicates = extractedPredicates._1;
RexNode[] unconvertedPredicates = extractedPredicates._2;
if (convertiblePredicates.length == 0) {
// no condition can be translated to expression
return;
}

Tuple2<SupportsFilterPushDown.Result, TableSourceTable> pushdownResultWithScan =
resolveFiltersAndCreateTableSourceTable(
convertiblePredicates,
relOptTable.unwrap(TableSourceTable.class),
scan,
relBuilder);
FilterClassificationResult result =
classifyAndPushFilters(convertiblePredicates, tableSourceTable, scan, relBuilder);
if (result == null) {
return;
}

SupportsFilterPushDown.Result result = pushdownResultWithScan._1;
TableSourceTable tableSourceTable = pushdownResultWithScan._2;
List<RexNode> allRemainingRexNodes = new ArrayList<>(result.remainingPredicates);
allRemainingRexNodes.addAll(Arrays.asList(unconvertedPredicates));

FlinkLogicalTableSourceScan newScan =
FlinkLogicalTableSourceScan.create(
scan.getCluster(), scan.getHints(), tableSourceTable);
scan.getCluster(), scan.getHints(), result.updatedTable);

// build new calc program
RexProgramBuilder programBuilder =
RexProgramBuilder.forProgram(originProgram, call.builder().getRexBuilder(), true);
programBuilder.clearCondition();

if (!result.getRemainingFilters().isEmpty() || unconvertedPredicates.length != 0) {
RexNode remainingCondition =
createRemainingCondition(
relBuilder, result.getRemainingFilters(), unconvertedPredicates);
if (!allRemainingRexNodes.isEmpty()) {
RexNode remainingCondition = relBuilder.and(allRemainingRexNodes);
RexNode simplifiedRemainingCondition =
FlinkRexUtil.simplify(
relBuilder.getRexBuilder(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -113,6 +114,82 @@ protected RexNode createRemainingCondition(
return new Tuple2<>(result, newTableSourceTable);
}

/**
* Classifies convertible predicates into physical and metadata, pushes each through the
* appropriate path, and returns the updated table plus any remaining predicates.
*/
protected FilterClassificationResult classifyAndPushFilters(
RexNode[] convertiblePredicates,
TableSourceTable tableSourceTable,
TableScan scan,
RelBuilder relBuilder) {

boolean supportsPhysicalFilter = canPushdownFilter(tableSourceTable);
boolean supportsMetadataFilter = canPushdownMetadataFilter(tableSourceTable);
Set<Integer> metadataColumnIndices = metadataColumnIndices(tableSourceTable, scan);

List<RexNode> allRemainingRexNodes = new ArrayList<>();
TableSourceTable currentTable = tableSourceTable;

List<RexNode> physicalPredicates = new ArrayList<>();
List<RexNode> metadataPredicates = new ArrayList<>();
for (RexNode predicate : convertiblePredicates) {
if (referencesOnlyMetadataColumns(predicate, metadataColumnIndices)) {
if (supportsMetadataFilter) {
metadataPredicates.add(predicate);
} else {
allRemainingRexNodes.add(predicate);
}
} else if (referencesAnyMetadataColumns(predicate, metadataColumnIndices)) {
allRemainingRexNodes.add(predicate);
} else {
physicalPredicates.add(predicate);
}
}

if ((physicalPredicates.isEmpty() || !supportsPhysicalFilter)
&& metadataPredicates.isEmpty()) {
return null;
}

if (!physicalPredicates.isEmpty() && supportsPhysicalFilter) {
Tuple2<SupportsFilterPushDown.Result, TableSourceTable> physicalResult =
resolveFiltersAndCreateTableSourceTable(
physicalPredicates.toArray(new RexNode[0]),
currentTable,
scan,
relBuilder);
currentTable = physicalResult._2;
List<RexNode> physicalRemaining =
convertExpressionToRexNode(physicalResult._1.getRemainingFilters(), relBuilder);
allRemainingRexNodes.addAll(physicalRemaining);
} else {
allRemainingRexNodes.addAll(physicalPredicates);
}

if (!metadataPredicates.isEmpty()) {
MetadataPushDownOutcome metadataResult =
resolveMetadataFiltersAndCreateTableSourceTable(
metadataPredicates.toArray(new RexNode[0]), currentTable, scan);
currentTable = metadataResult.newTableSourceTable;
allRemainingRexNodes.addAll(metadataResult.remainingInputRexNodes);
}

return new FilterClassificationResult(currentTable, allRemainingRexNodes);
}

/** Result of classifying and pushing filters through physical and metadata paths. */
protected static final class FilterClassificationResult {
final TableSourceTable updatedTable;
final List<RexNode> remainingPredicates;

FilterClassificationResult(
TableSourceTable updatedTable, List<RexNode> remainingPredicates) {
this.updatedTable = updatedTable;
this.remainingPredicates = remainingPredicates;
}
}

/** Whether filter push-down is possible and not already assigned. */
protected boolean canPushdownFilter(TableSourceTable tableSourceTable) {
return tableSourceTable != null
Expand Down Expand Up @@ -143,27 +220,52 @@ protected boolean canPushdownMetadataFilter(TableSourceTable tableSourceTable) {
* <p>A predicate like {@code OR(physical_pred, metadata_pred)} returns false because it
* references both physical and metadata columns. Mixed predicates remain as runtime filters.
*/
protected boolean referencesOnlyMetadataColumns(RexNode predicate, int physicalColumnCount) {
protected boolean referencesOnlyMetadataColumns(
RexNode predicate, Set<Integer> metadataColumnIndices) {
boolean[] saw = classifyColumnReferences(predicate, metadataColumnIndices);
return saw[1] && !saw[0];
}

/** True if predicate references at least one metadata column (may also reference physical). */
protected boolean referencesAnyMetadataColumns(
RexNode predicate, Set<Integer> metadataColumnIndices) {
boolean[] saw = classifyColumnReferences(predicate, metadataColumnIndices);
return saw[1];
}

private boolean[] classifyColumnReferences(
RexNode predicate, Set<Integer> metadataColumnIndices) {
boolean[] saw = new boolean[2]; // [0] = sawPhysical, [1] = sawMetadata
predicate.accept(
new RexVisitorImpl<Void>(true) {
@Override
public Void visitInputRef(RexInputRef inputRef) {
if (inputRef.getIndex() >= physicalColumnCount) {
if (metadataColumnIndices.contains(inputRef.getIndex())) {
saw[1] = true;
} else {
saw[0] = true;
}
return null;
}
});
return saw[1] && !saw[0];
return saw;
}

/** Number of physical columns in the scan's schema. */
protected int getPhysicalColumnCount(TableSourceTable tableSourceTable) {
ResolvedSchema schema = tableSourceTable.contextResolvedTable().getResolvedSchema();
return (int) schema.getColumns().stream().filter(Column::isPhysical).count();
/**
* Indices of metadata columns within the scan's current (possibly projection-narrowed) row
* type. Predicate {@link RexInputRef}s are positioned against this same row type, so the
* physical/metadata distinction must be derived from it rather than from the full table schema.
*/
private Set<Integer> metadataColumnIndices(TableSourceTable tableSourceTable, TableScan scan) {
Map<String, String> columnToMetadataKey = buildColumnToMetadataKeyMap(tableSourceTable);
List<String> fieldNames = scan.getRowType().getFieldNames();
Set<Integer> indices = new HashSet<>();
for (int i = 0; i < fieldNames.size(); i++) {
if (columnToMetadataKey.containsKey(fieldNames.get(i))) {
indices.add(i);
}
}
return indices;
}

/** Maps SQL column names to metadata keys for metadata columns. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,63 +95,17 @@ private void pushFilterIntoScan(
return;
}

boolean supportsPhysicalFilter = canPushdownFilter(tableSourceTable);
boolean supportsMetadataFilter = canPushdownMetadataFilter(tableSourceTable);
int physicalColumnCount = getPhysicalColumnCount(tableSourceTable);

List<RexNode> allRemainingRexNodes = new ArrayList<>();
TableSourceTable currentTable = tableSourceTable;

// Unpushable metadata predicates stay as a runtime Calc, not the physical path —
// physical routing produces a FilterPushDownSpec that crashes compiled-plan restore
// once ProjectPushDownSpec narrows the scan row type.
List<RexNode> physicalPredicates = new ArrayList<>();
List<RexNode> metadataPredicates = new ArrayList<>();
for (RexNode predicate : convertiblePredicates) {
if (referencesOnlyMetadataColumns(predicate, physicalColumnCount)) {
if (supportsMetadataFilter) {
metadataPredicates.add(predicate);
} else {
allRemainingRexNodes.add(predicate);
}
} else {
physicalPredicates.add(predicate);
}
}

// Avoid re-firing on shapes we can't transform — saves wasted Hep iterations.
boolean nothingToPushPhysically = physicalPredicates.isEmpty() || !supportsPhysicalFilter;
if (nothingToPushPhysically && metadataPredicates.isEmpty()) {
FilterClassificationResult result =
classifyAndPushFilters(convertiblePredicates, tableSourceTable, scan, relBuilder);
if (result == null) {
return;
}

if (!physicalPredicates.isEmpty() && supportsPhysicalFilter) {
Tuple2<SupportsFilterPushDown.Result, TableSourceTable> physicalResult =
resolveFiltersAndCreateTableSourceTable(
physicalPredicates.toArray(new RexNode[0]),
currentTable,
scan,
relBuilder);
currentTable = physicalResult._2;
List<RexNode> physicalRemaining =
convertExpressionToRexNode(physicalResult._1.getRemainingFilters(), relBuilder);
allRemainingRexNodes.addAll(physicalRemaining);
} else {
allRemainingRexNodes.addAll(physicalPredicates);
}

if (!metadataPredicates.isEmpty()) {
MetadataPushDownOutcome metadataResult =
resolveMetadataFiltersAndCreateTableSourceTable(
metadataPredicates.toArray(new RexNode[0]), currentTable, scan);
currentTable = metadataResult.newTableSourceTable;
allRemainingRexNodes.addAll(metadataResult.remainingInputRexNodes);
}

List<RexNode> allRemainingRexNodes = new ArrayList<>(result.remainingPredicates);
allRemainingRexNodes.addAll(Arrays.asList(unconvertedPredicates));

LogicalTableScan newScan =
LogicalTableScan.create(scan.getCluster(), currentTable, scan.getHints());
LogicalTableScan.create(scan.getCluster(), result.updatedTable, scan.getHints());

if (allRemainingRexNodes.isEmpty()) {
call.transformTo(newScan);
Expand Down
Loading