diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
index 0d1c65f0f2f96a..10e993bd458c5d 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
@@ -174,14 +174,29 @@ default boolean supportsMetadataFilterPushDown() {
/**
* Provides a list of metadata filters in conjunctive form. A source can pick filters and return
- * the accepted and remaining filters. Same contract as {@link
- * SupportsFilterPushDown#applyFilters(List)}, but for metadata columns.
+ * the accepted and remaining filters.
+ *
+ *
Identity contract: The source MUST return the exact same {@link ResolvedExpression}
+ * instances it received — no copies, no wrappers, no rebuilds. The planner uses instance
+ * identity (not {@code equals}) to correlate returned expressions back to their original
+ * positions. Typical implementations partition the input list:
+ *
+ *
{@code
+ * return MetadataFilterResult.of(
+ * metadataFilters.subList(0, accepted),
+ * metadataFilters.subList(accepted, metadataFilters.size()));
+ * }
*
* The provided filters reference metadata key names (from {@link #listReadableMetadata()}),
* not SQL column aliases. For example, a column declared as {@code msg_offset BIGINT METADATA
* FROM 'offset'} will have its predicate expressed as {@code offset >= 1000}, not {@code
* msg_offset >= 1000}. The planner handles the alias-to-key translation before calling this
* method.
+ *
+ *
Acceptance must be decided per predicate, independent of which other predicates are in the
+ * list. On compiled-plan restore only the previously-accepted subset is re-presented, so a
+ * source that accepts one predicate only when another is also present would fail to re-accept
+ * it.
*/
default MetadataFilterResult applyMetadataFilters(List metadataFilters) {
return MetadataFilterResult.of(Collections.emptyList(), metadataFilters);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
index 4dc970af831830..56f9848f7decd5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
@@ -18,10 +18,8 @@
package org.apache.flink.table.planner.plan.rules.logical;
-import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
-import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
@@ -32,6 +30,10 @@
import org.apache.calcite.rex.RexProgramBuilder;
import org.apache.calcite.tools.RelBuilder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
import scala.Tuple2;
/**
@@ -63,8 +65,7 @@ public boolean matches(RelOptRuleCall call) {
FlinkLogicalTableSourceScan scan = call.rel(1);
TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
- // we can not push filter twice
- return canPushdownFilter(tableSourceTable);
+ return canPushdownFilter(tableSourceTable) || canPushdownMetadataFilter(tableSourceTable);
}
@Override
@@ -79,7 +80,7 @@ private void pushFilterIntoScan(
RelOptRuleCall call,
Calc calc,
FlinkLogicalTableSourceScan scan,
- FlinkPreparingTableBase relOptTable) {
+ TableSourceTable tableSourceTable) {
RexProgram originProgram = calc.getProgram();
@@ -94,33 +95,29 @@ private void pushFilterIntoScan(
RexNode[] convertiblePredicates = extractedPredicates._1;
RexNode[] unconvertedPredicates = extractedPredicates._2;
if (convertiblePredicates.length == 0) {
- // no condition can be translated to expression
return;
}
- Tuple2 pushdownResultWithScan =
- resolveFiltersAndCreateTableSourceTable(
- convertiblePredicates,
- relOptTable.unwrap(TableSourceTable.class),
- scan,
- relBuilder);
+ FilterClassificationResult result =
+ classifyAndPushFilters(convertiblePredicates, tableSourceTable, scan, relBuilder);
+ if (result == null) {
+ return;
+ }
- SupportsFilterPushDown.Result result = pushdownResultWithScan._1;
- TableSourceTable tableSourceTable = pushdownResultWithScan._2;
+ List allRemainingRexNodes = new ArrayList<>(result.remainingPredicates);
+ allRemainingRexNodes.addAll(Arrays.asList(unconvertedPredicates));
FlinkLogicalTableSourceScan newScan =
FlinkLogicalTableSourceScan.create(
- scan.getCluster(), scan.getHints(), tableSourceTable);
+ scan.getCluster(), scan.getHints(), result.updatedTable);
// build new calc program
RexProgramBuilder programBuilder =
RexProgramBuilder.forProgram(originProgram, call.builder().getRexBuilder(), true);
programBuilder.clearCondition();
- if (!result.getRemainingFilters().isEmpty() || unconvertedPredicates.length != 0) {
- RexNode remainingCondition =
- createRemainingCondition(
- relBuilder, result.getRemainingFilters(), unconvertedPredicates);
+ if (!allRemainingRexNodes.isEmpty()) {
+ RexNode remainingCondition = relBuilder.and(allRemainingRexNodes);
RexNode simplifiedRemainingCondition =
FlinkRexUtil.simplify(
relBuilder.getRexBuilder(),
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
index ef8bc092f98351..223c967dc1c71a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
@@ -49,6 +49,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -113,6 +114,82 @@ protected RexNode createRemainingCondition(
return new Tuple2<>(result, newTableSourceTable);
}
+ /**
+ * Classifies convertible predicates into physical and metadata, pushes each through the
+ * appropriate path, and returns the updated table plus any remaining predicates.
+ */
+ protected FilterClassificationResult classifyAndPushFilters(
+ RexNode[] convertiblePredicates,
+ TableSourceTable tableSourceTable,
+ TableScan scan,
+ RelBuilder relBuilder) {
+
+ boolean supportsPhysicalFilter = canPushdownFilter(tableSourceTable);
+ boolean supportsMetadataFilter = canPushdownMetadataFilter(tableSourceTable);
+ Set metadataColumnIndices = metadataColumnIndices(tableSourceTable, scan);
+
+ List allRemainingRexNodes = new ArrayList<>();
+ TableSourceTable currentTable = tableSourceTable;
+
+ List physicalPredicates = new ArrayList<>();
+ List metadataPredicates = new ArrayList<>();
+ for (RexNode predicate : convertiblePredicates) {
+ if (referencesOnlyMetadataColumns(predicate, metadataColumnIndices)) {
+ if (supportsMetadataFilter) {
+ metadataPredicates.add(predicate);
+ } else {
+ allRemainingRexNodes.add(predicate);
+ }
+ } else if (referencesAnyMetadataColumns(predicate, metadataColumnIndices)) {
+ allRemainingRexNodes.add(predicate);
+ } else {
+ physicalPredicates.add(predicate);
+ }
+ }
+
+ if ((physicalPredicates.isEmpty() || !supportsPhysicalFilter)
+ && metadataPredicates.isEmpty()) {
+ return null;
+ }
+
+ if (!physicalPredicates.isEmpty() && supportsPhysicalFilter) {
+ Tuple2 physicalResult =
+ resolveFiltersAndCreateTableSourceTable(
+ physicalPredicates.toArray(new RexNode[0]),
+ currentTable,
+ scan,
+ relBuilder);
+ currentTable = physicalResult._2;
+ List physicalRemaining =
+ convertExpressionToRexNode(physicalResult._1.getRemainingFilters(), relBuilder);
+ allRemainingRexNodes.addAll(physicalRemaining);
+ } else {
+ allRemainingRexNodes.addAll(physicalPredicates);
+ }
+
+ if (!metadataPredicates.isEmpty()) {
+ MetadataPushDownOutcome metadataResult =
+ resolveMetadataFiltersAndCreateTableSourceTable(
+ metadataPredicates.toArray(new RexNode[0]), currentTable, scan);
+ currentTable = metadataResult.newTableSourceTable;
+ allRemainingRexNodes.addAll(metadataResult.remainingInputRexNodes);
+ }
+
+ return new FilterClassificationResult(currentTable, allRemainingRexNodes);
+ }
+
+ /** Result of classifying and pushing filters through physical and metadata paths. */
+ protected static final class FilterClassificationResult {
+ final TableSourceTable updatedTable;
+ final List remainingPredicates;
+
+ FilterClassificationResult(
+ TableSourceTable updatedTable, List remainingPredicates) {
+ this.updatedTable = updatedTable;
+ this.remainingPredicates = remainingPredicates;
+ }
+ }
+
/** Whether filter push-down is possible and not already assigned. */
protected boolean canPushdownFilter(TableSourceTable tableSourceTable) {
return tableSourceTable != null
@@ -143,13 +220,27 @@ protected boolean canPushdownMetadataFilter(TableSourceTable tableSourceTable) {
* A predicate like {@code OR(physical_pred, metadata_pred)} returns false because it
* references both physical and metadata columns. Mixed predicates remain as runtime filters.
*/
- protected boolean referencesOnlyMetadataColumns(RexNode predicate, int physicalColumnCount) {
+ protected boolean referencesOnlyMetadataColumns(
+ RexNode predicate, Set metadataColumnIndices) {
+ boolean[] saw = classifyColumnReferences(predicate, metadataColumnIndices);
+ return saw[1] && !saw[0];
+ }
+
+ /** True if predicate references at least one metadata column (may also reference physical). */
+ protected boolean referencesAnyMetadataColumns(
+ RexNode predicate, Set metadataColumnIndices) {
+ boolean[] saw = classifyColumnReferences(predicate, metadataColumnIndices);
+ return saw[1];
+ }
+
+ private boolean[] classifyColumnReferences(
+ RexNode predicate, Set metadataColumnIndices) {
boolean[] saw = new boolean[2]; // [0] = sawPhysical, [1] = sawMetadata
predicate.accept(
new RexVisitorImpl(true) {
@Override
public Void visitInputRef(RexInputRef inputRef) {
- if (inputRef.getIndex() >= physicalColumnCount) {
+ if (metadataColumnIndices.contains(inputRef.getIndex())) {
saw[1] = true;
} else {
saw[0] = true;
@@ -157,13 +248,24 @@ public Void visitInputRef(RexInputRef inputRef) {
return null;
}
});
- return saw[1] && !saw[0];
+ return saw;
}
- /** Number of physical columns in the scan's schema. */
- protected int getPhysicalColumnCount(TableSourceTable tableSourceTable) {
- ResolvedSchema schema = tableSourceTable.contextResolvedTable().getResolvedSchema();
- return (int) schema.getColumns().stream().filter(Column::isPhysical).count();
+ /**
+ * Indices of metadata columns within the scan's current (possibly projection-narrowed) row
+ * type. Predicate {@link RexInputRef}s are positioned against this same row type, so the
+ * physical/metadata distinction must be derived from it rather than from the full table schema.
+ */
+ private Set metadataColumnIndices(TableSourceTable tableSourceTable, TableScan scan) {
+ Map columnToMetadataKey = buildColumnToMetadataKeyMap(tableSourceTable);
+ List fieldNames = scan.getRowType().getFieldNames();
+ Set indices = new HashSet<>();
+ for (int i = 0; i < fieldNames.size(); i++) {
+ if (columnToMetadataKey.containsKey(fieldNames.get(i))) {
+ indices.add(i);
+ }
+ }
+ return indices;
}
/** Maps SQL column names to metadata keys for metadata columns. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
index 4e896a30701ca6..ba502728d2d701 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
@@ -95,63 +95,17 @@ private void pushFilterIntoScan(
return;
}
- boolean supportsPhysicalFilter = canPushdownFilter(tableSourceTable);
- boolean supportsMetadataFilter = canPushdownMetadataFilter(tableSourceTable);
- int physicalColumnCount = getPhysicalColumnCount(tableSourceTable);
-
- List allRemainingRexNodes = new ArrayList<>();
- TableSourceTable currentTable = tableSourceTable;
-
- // Unpushable metadata predicates stay as a runtime Calc, not the physical path —
- // physical routing produces a FilterPushDownSpec that crashes compiled-plan restore
- // once ProjectPushDownSpec narrows the scan row type.
- List physicalPredicates = new ArrayList<>();
- List metadataPredicates = new ArrayList<>();
- for (RexNode predicate : convertiblePredicates) {
- if (referencesOnlyMetadataColumns(predicate, physicalColumnCount)) {
- if (supportsMetadataFilter) {
- metadataPredicates.add(predicate);
- } else {
- allRemainingRexNodes.add(predicate);
- }
- } else {
- physicalPredicates.add(predicate);
- }
- }
-
- // Avoid re-firing on shapes we can't transform — saves wasted Hep iterations.
- boolean nothingToPushPhysically = physicalPredicates.isEmpty() || !supportsPhysicalFilter;
- if (nothingToPushPhysically && metadataPredicates.isEmpty()) {
+ FilterClassificationResult result =
+ classifyAndPushFilters(convertiblePredicates, tableSourceTable, scan, relBuilder);
+ if (result == null) {
return;
}
- if (!physicalPredicates.isEmpty() && supportsPhysicalFilter) {
- Tuple2 physicalResult =
- resolveFiltersAndCreateTableSourceTable(
- physicalPredicates.toArray(new RexNode[0]),
- currentTable,
- scan,
- relBuilder);
- currentTable = physicalResult._2;
- List physicalRemaining =
- convertExpressionToRexNode(physicalResult._1.getRemainingFilters(), relBuilder);
- allRemainingRexNodes.addAll(physicalRemaining);
- } else {
- allRemainingRexNodes.addAll(physicalPredicates);
- }
-
- if (!metadataPredicates.isEmpty()) {
- MetadataPushDownOutcome metadataResult =
- resolveMetadataFiltersAndCreateTableSourceTable(
- metadataPredicates.toArray(new RexNode[0]), currentTable, scan);
- currentTable = metadataResult.newTableSourceTable;
- allRemainingRexNodes.addAll(metadataResult.remainingInputRexNodes);
- }
-
+ List allRemainingRexNodes = new ArrayList<>(result.remainingPredicates);
allRemainingRexNodes.addAll(Arrays.asList(unconvertedPredicates));
LogicalTableScan newScan =
- LogicalTableScan.create(scan.getCluster(), currentTable, scan.getHints());
+ LogicalTableScan.create(scan.getCluster(), result.updatedTable, scan.getHints());
if (allRemainingRexNodes.isEmpty()) {
call.transformTo(newScan);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterResultShapesITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterResultShapesITCase.java
index 47a987d485c4f0..5f079ab20c1798 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterResultShapesITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterResultShapesITCase.java
@@ -29,6 +29,7 @@
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
import org.apache.flink.table.data.RowData;
@@ -44,6 +45,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -190,6 +192,77 @@ void testBestEffortOverlap() throws Exception {
assertThat(explain).contains("where=[AND(>(m0, 0), >(m1, 0))]");
}
+ /**
+ * A mixed OR predicate like {@code id > 0 OR m0 > 0} references both physical and metadata
+ * columns in a single predicate that AND-decomposition cannot split. Such predicates must NOT
+ * leak into the physical {@code filter=[...]} path because {@code FilterPushDownSpec} stores
+ * RexInputRef indices without a row type — metadata column indices break during compiled-plan
+ * restore when {@code ProjectPushDownSpec} narrows the scan type.
+ *
+ * The mixed predicate must stay as a runtime Calc ({@code where=[...]}).
+ */
+ @Test
+ void testMixedOrPredicateStaysAsRuntimeFilter() throws Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", INT())
+ .columnByMetadata("m0", INT())
+ .columnByMetadata("m1", INT())
+ .build();
+
+ MixedFilterTrackingSource source = new MixedFilterTrackingSource();
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder().schema(schema).source(source).build();
+ tableEnv.createTable("T_MIXED_OR", descriptor);
+
+ // id < 3: rows 1,2. m0 > 0: rows 1,2,5. Union: rows 1,2,5.
+ String sql = "SELECT id FROM T_MIXED_OR WHERE id < 3 OR m0 > 0 ORDER BY id";
+
+ String explain = tableEnv.explainSql(sql);
+ assertThat(explain)
+ .as("Mixed OR must not leak into physical filter path")
+ .doesNotContain("filter=[OR(");
+ assertThat(explain)
+ .as("Mixed OR must not leak into metadata filter path")
+ .doesNotContain("metadataFilter=[OR(");
+
+ // Row data: (1,5,5), (2,5,-1), (3,-1,5), (4,-1,-1), (5,7,9), (6,0,0)
+ assertThat(collectIds(sql)).containsExactly(1, 2, 5);
+ }
+
+ /**
+ * When AND connects a pure physical predicate and a mixed OR, the physical predicate should be
+ * pushed while the mixed OR stays as a runtime Calc.
+ */
+ @Test
+ void testMixedOrWithSeparatePhysicalPredicate() throws Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", INT())
+ .columnByMetadata("m0", INT())
+ .columnByMetadata("m1", INT())
+ .build();
+
+ MixedFilterTrackingSource source = new MixedFilterTrackingSource();
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder().schema(schema).source(source).build();
+ tableEnv.createTable("T_MIXED_OR2", descriptor);
+
+ // id > 2 (physical, pushable) AND (id < 5 OR m0 > 0) (mixed, must stay as runtime)
+ String sql = "SELECT id FROM T_MIXED_OR2 WHERE id > 2 AND (id < 5 OR m0 > 0) ORDER BY id";
+
+ String explain = tableEnv.explainSql(sql);
+ // The mixed OR must NOT appear in filter= or metadataFilter=
+ assertThat(explain)
+ .as("Mixed OR must not leak into physical filter path")
+ .doesNotContain("filter=[OR(");
+
+ // Row data: (1,5,5), (2,5,-1), (3,-1,5), (4,-1,-1), (5,7,9), (6,0,0)
+ // id>2: rows 3,4,5,6. AND (id<5 OR m0>0): row3(yes,no→yes), row4(yes,no→yes),
+ // row5(no,yes→yes), row6(no,no→no). Result: 3,4,5
+ assertThat(collectIds(sql)).containsExactly(3, 4, 5);
+ }
+
// -----------------------------------------------------------------------------------------
// Helpers
// -----------------------------------------------------------------------------------------
@@ -302,4 +375,64 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
return SourceProvider.of(source);
}
}
+
+ /**
+ * Source that supports both physical and metadata filter push-down. Accepts all filters it
+ * receives, emits all rows (runtime Calc is the load-bearing filter for remaining predicates).
+ */
+ static class MixedFilterTrackingSource extends TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata, SupportsFilterPushDown {
+
+ private DataType producedDataType;
+ private List acceptedPhysicalFilters = new ArrayList<>();
+
+ MixedFilterTrackingSource() {
+ super(true);
+ }
+
+ @Override
+ public Map listReadableMetadata() {
+ Map metadata = new LinkedHashMap<>();
+ metadata.put("m0", INT());
+ metadata.put("m1", INT());
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {
+ this.producedDataType = producedDataType;
+ }
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return true;
+ }
+
+ @Override
+ public MetadataFilterResult applyMetadataFilters(List metadataFilters) {
+ return MetadataFilterResult.of(metadataFilters, Collections.emptyList());
+ }
+
+ @Override
+ public Result applyFilters(List filters) {
+ acceptedPhysicalFilters.addAll(filters);
+ return Result.of(Collections.emptyList(), filters);
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+ DataType emitted =
+ producedDataType != null
+ ? producedDataType
+ : getFactoryContext().getPhysicalRowDataType();
+ DynamicTableSource.DataStructureConverter converter =
+ runtimeProviderContext.createDataStructureConverter(emitted);
+ GeneratorFunction generator =
+ index -> (RowData) converter.toInternal(ROWS.get(index.intValue()));
+ DataGeneratorSource source =
+ new DataGeneratorSource<>(
+ generator, ROWS.size(), TypeInformation.of(RowData.class));
+ return SourceProvider.of(source);
+ }
+ }
}