Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion deploy/docker/venice/schemas/keySchemaRecord.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
"doc": "SampleTableKey",
"fields": [
{
"name": "id",
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ static Map<String, String> addKeysAsOption(Map<String, String> options, RelDataT
.filter(name -> name.startsWith(KEY_PREFIX))
.collect(Collectors.joining(";"));
if (!keyString.isEmpty()) {
newOptions.put(KEY_OPTION, keyString);
newOptions.put(KEY_OPTION, keyString.replaceAll("\\s+", ""));
newOptions.put(KEY_PREFIX_OPTION, KEY_PREFIX);
newOptions.put(KEY_TYPE_OPTION, "RECORD");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,10 @@ default String sql() {
default String sql(SqlDialect dialect) {
SqlWriter w = new SqlPrettyWriter(SqlWriterConfig.of().withDialect(dialect));
implement(w);
return w.toSqlString().getSql().replaceAll("\\n", " ").replaceAll("\\s*;\\s*", ";\n").trim();
// This logic is intended to split DDL/DML statements into multiple lines.
// Flink SQL options (e.g. key.fields) can contain ";" characters as a delimiter.
// TODO: make this logic more robust, it will still fail for non-trimmed strings containing ";" characters
return w.toSqlString().getSql().replaceAll("\\n", " ").replaceAll("\\s+;\\s*", ";\n").trim();
}

/** Generate SQL for a given dialect */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ public class VeniceSchemaValidationTest extends JdbcTestBase {

@Test
public void testVeniceTableCreationAndSchema() throws Exception {
sql("create or replace materialized view \"VENICE\".\"test-store$insert-partial\" (\"KEY_id\", \"intField\") "
+ "as select \"KEY\", \"intField\" from \"VENICE\".\"test-store-primitive\"");
sql("create or replace materialized view \"VENICE\".\"test-store$insert-partial\" (\"KEY_name\", \"KEY_age\", \"intField\") "
+ "as select \"stringField\" AS \"KEY_name\", \"KEY\" AS \"KEY_age\", \"intField\" from \"VENICE\".\"test-store-primitive\"");

// Validate the table was created with expected schema
Map<String, String> expectedColumns = Map.of(
"KEY_id", "INTEGER",
"KEY_name", "VARCHAR",
"KEY_age", "INTEGER",
"intField", "INTEGER"
);
validateTableSchema(List.of("VENICE", "test-store$insert-partial"), expectedColumns);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
!set outputformat mysql
!use k8s

create or replace materialized view "VENICE"."test-store$insert-partial" ("KEY_id", "intField") as select "KEY", "intField" from "VENICE"."test-store-primitive";
create or replace materialized view "VENICE"."test-store$insert-partial" ("KEY_name", "KEY_age", "intField") as select "stringField" AS "KEY_name", "KEY" AS "KEY_age", "intField" from "VENICE"."test-store-primitive";
(0 rows modified)

!update

+------------+----------+------------+------------+
| columnName | typeName | columnSize | isNullable |
+------------+----------+------------+------------+
| KEY_id | INTEGER | 10 | NO |
| KEY_name | VARCHAR | null | YES |
| KEY_age | INTEGER | 10 | NO |
| intField | INTEGER | 10 | YES |
+------------+----------+------------+------------+
(2 rows)
(3 rows)

!describe "VENICE"."test-store$insert-partial"

Expand All @@ -21,7 +22,7 @@ drop materialized view "VENICE"."test-store$insert-partial";

!update

insert into "VENICE"."test-store" ("KEY_id", "intField") select "KEY", "intField" from "VENICE"."test-store-primitive";
insert into "VENICE"."test-store" ("KEY_name", "KEY_age", "intField") select "stringField" AS "KEY_name", "KEY" AS "KEY_age", "intField" from "VENICE"."test-store-primitive";
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
Expand All @@ -34,8 +35,8 @@ spec:
- CREATE DATABASE IF NOT EXISTS `VENICE` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-primitive` (`KEY` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY', 'key.fields-prefix'='', 'key.type'='PRIMITIVE', 'partial-update-mode'='true', 'storeName'='test-store-primitive', 'value.fields-include'='EXCEPT_KEY')
- CREATE DATABASE IF NOT EXISTS `VENICE` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `VENICE`.`test-store` (`KEY_id`, `intField`) SELECT `KEY` AS `KEY_id`, `intField` FROM `VENICE`.`test-store-primitive`
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store` (`KEY_name` VARCHAR, `KEY_age` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_name;KEY_age', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `VENICE`.`test-store` (`KEY_name`, `KEY_age`, `intField`) SELECT `stringField` AS `KEY_name`, `KEY` AS `KEY_age`, `intField` FROM `VENICE`.`test-store-primitive`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
Expand Down
Loading