From 15aed54e670f5993f6a88d1dfa7590ba88eff65f Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Mon, 23 Mar 2026 14:09:17 -0400 Subject: [PATCH] Fix issue with ';' delimiter when specifying multiple table keys --- deploy/docker/venice/schemas/keySchemaRecord.avsc | 6 +++++- .../com/linkedin/hoptimator/k8s/K8sConnector.java | 2 +- .../hoptimator/util/planner/ScriptImplementor.java | 5 ++++- .../venice/VeniceSchemaValidationTest.java | 7 ++++--- .../src/test/resources/venice-ddl-insert-partial.id | 13 +++++++------ 5 files changed, 21 insertions(+), 12 deletions(-) diff --git a/deploy/docker/venice/schemas/keySchemaRecord.avsc b/deploy/docker/venice/schemas/keySchemaRecord.avsc index 4a0f80f0..a98a520f 100644 --- a/deploy/docker/venice/schemas/keySchemaRecord.avsc +++ b/deploy/docker/venice/schemas/keySchemaRecord.avsc @@ -4,7 +4,11 @@ "doc": "SampleTableKey", "fields": [ { - "name": "id", + "name": "name", + "type": "string" + }, + { + "name": "age", "type": "int" } ] diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java index 1bacadaa..3edcf87e 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java @@ -117,7 +117,7 @@ static Map addKeysAsOption(Map options, RelDataT .filter(name -> name.startsWith(KEY_PREFIX)) .collect(Collectors.joining(";")); if (!keyString.isEmpty()) { - newOptions.put(KEY_OPTION, keyString); + newOptions.put(KEY_OPTION, keyString.replaceAll("\\s+", "")); newOptions.put(KEY_PREFIX_OPTION, KEY_PREFIX); newOptions.put(KEY_TYPE_OPTION, "RECORD"); } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java index 45c73881..33ecd031 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java @@ -148,7 +148,10 @@ default String sql() { default String sql(SqlDialect dialect) { SqlWriter w = new SqlPrettyWriter(SqlWriterConfig.of().withDialect(dialect)); implement(w); - return w.toSqlString().getSql().replaceAll("\\n", " ").replaceAll("\\s*;\\s*", ";\n").trim(); + // This logic is intended to split DDL/DML statements into multiple lines. + // Flink SQL options (e.g. key.fields) can contain ";" characters as a delimiter. + // TODO: make this logic more robust, it will still fail for non-trimmed strings containing ";" characters + return w.toSqlString().getSql().replaceAll("\\n", " ").replaceAll("\\s+;\\s*", ";\n").trim(); } /** Generate SQL for a given dialect */ diff --git a/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/VeniceSchemaValidationTest.java b/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/VeniceSchemaValidationTest.java index 0d8b8553..2aa9bcb5 100644 --- a/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/VeniceSchemaValidationTest.java +++ b/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/VeniceSchemaValidationTest.java @@ -16,12 +16,13 @@ public class VeniceSchemaValidationTest extends JdbcTestBase { @Test public void testVeniceTableCreationAndSchema() throws Exception { - sql("create or replace materialized view \"VENICE\".\"test-store$insert-partial\" (\"KEY_id\", \"intField\") " - + "as select \"KEY\", \"intField\" from \"VENICE\".\"test-store-primitive\""); + sql("create or replace materialized view \"VENICE\".\"test-store$insert-partial\" (\"KEY_name\", \"KEY_age\", \"intField\") " + + "as select \"stringField\" AS \"KEY_name\", \"KEY\" AS \"KEY_age\", \"intField\" from \"VENICE\".\"test-store-primitive\""); // Validate the table was created with expected schema Map expectedColumns = Map.of( - "KEY_id", "INTEGER", + "KEY_name", "VARCHAR", + "KEY_age", "INTEGER", "intField", "INTEGER" ); validateTableSchema(List.of("VENICE", "test-store$insert-partial"), expectedColumns); diff --git a/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id b/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id index 3f07ef60..f4d4c05c 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id +++ b/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id @@ -1,7 +1,7 @@ !set outputformat mysql !use k8s -create or replace materialized view "VENICE"."test-store$insert-partial" ("KEY_id", "intField") as select "KEY", "intField" from "VENICE"."test-store-primitive"; +create or replace materialized view "VENICE"."test-store$insert-partial" ("KEY_name", "KEY_age", "intField") as select "stringField" AS "KEY_name", "KEY" AS "KEY_age", "intField" from "VENICE"."test-store-primitive"; (0 rows modified) !update @@ -9,10 +9,11 @@ create or replace materialized view "VENICE"."test-store$insert-partial" ("KEY_i +------------+----------+------------+------------+ | columnName | typeName | columnSize | isNullable | +------------+----------+------------+------------+ -| KEY_id | INTEGER | 10 | NO | +| KEY_name | VARCHAR | null | YES | +| KEY_age | INTEGER | 10 | NO | | intField | INTEGER | 10 | YES | +------------+----------+------------+------------+ -(2 rows) +(3 rows) !describe "VENICE"."test-store$insert-partial" @@ -21,7 +22,7 @@ drop materialized view "VENICE"."test-store$insert-partial"; !update -insert into "VENICE"."test-store" ("KEY_id", "intField") select "KEY", "intField" from "VENICE"."test-store-primitive"; +insert into "VENICE"."test-store" ("KEY_name", "KEY_age", "intField") select "stringField" AS "KEY_name", "KEY" AS "KEY_age", "intField" from "VENICE"."test-store-primitive"; apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob metadata: @@ -34,8 +35,8 @@ spec: - CREATE DATABASE IF NOT EXISTS `VENICE` WITH () - CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-primitive` (`KEY` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY', 'key.fields-prefix'='', 'key.type'='PRIMITIVE', 'partial-update-mode'='true', 'storeName'='test-store-primitive', 'value.fields-include'='EXCEPT_KEY') - CREATE DATABASE IF NOT EXISTS `VENICE` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') - - INSERT INTO `VENICE`.`test-store` (`KEY_id`, `intField`) SELECT `KEY` AS `KEY_id`, `intField` FROM `VENICE`.`test-store-primitive` + - CREATE TABLE IF NOT EXISTS `VENICE`.`test-store` (`KEY_name` VARCHAR, `KEY_age` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_name;KEY_age', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') + - INSERT INTO `VENICE`.`test-store` (`KEY_name`, `KEY_age`, `intField`) SELECT `stringField` AS `KEY_name`, `KEY` AS `KEY_age`, `intField` FROM `VENICE`.`test-store-primitive` jarURI: file:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless