From eb03d9945fb341ff6d0a824f81b2cf457f08ccd1 Mon Sep 17 00:00:00 2001 From: Josh McKenzie Date: Tue, 10 Mar 2026 16:49:02 -0400 Subject: [PATCH] CASSANALYTICS-128: Add flag to allow bulk write to indexed tables Patch by Josh McKenzie; reviewed by Jyothsna Konisa and Shailaja Koppu for CASSANALYTICS-128 --- CHANGES.txt | 1 + .../bulkwriter/AbstractBulkWriterContext.java | 3 +- .../spark/bulkwriter/BulkSparkConf.java | 2 + .../spark/bulkwriter/TableSchema.java | 19 ++++++++- .../spark/bulkwriter/WriterOptions.java | 8 ++++ .../spark/bulkwriter/TableSchemaTest.java | 11 +++++ .../bulkwriter/TableSchemaTestCommon.java | 42 +++++++++++++++++-- 7 files changed, 80 insertions(+), 6 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e987053d0..b975c7f88 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,6 +3,7 @@ * Fixing CdcTests.testMockedCdc broken due to incorrect position update in BufferingCommitLogReader (CASSANALYTICS-127) * Commitlog reading not progressing in CDC due to incorrect CommitLogReader.isFullyRead (CASSANALYTICS-124) * Incorrect hash code calculation in PartitionUpdateWrapper.Digest (CASSANALYTICS-125) + * Add flag to allow bulk write to indexed tables (CASSANALYTICS-128) * Assign data file start offset based on BTI index (CASSANALYTICS-121) * Quote identifiers option must be set to true if ttl has mixed case column name (CASSANALYTICS-120) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java index 6acbb2fcc..b6eff3c80 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java @@ -321,7 +321,8 @@ protected TableSchema initializeTableSchema(@NotNull BulkSparkConf conf, conf.getTTLOptions(), conf.getTimestampOptions(), lowestCassandraVersion, - job().qualifiedTableName().quoteIdentifiers()); + job().qualifiedTableName().quoteIdentifiers(), + conf.skipSecondaryIndexCheck); } @NotNull diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java index 3f52cd74d..68f5636b3 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java @@ -127,6 +127,7 @@ public class BulkSparkConf implements Serializable public final int commitThreadsPerInstance; public final double importCoordinatorTimeoutMultiplier; public boolean quoteIdentifiers; + public final boolean skipSecondaryIndexCheck; protected final String keystorePassword; protected final String keystorePath; protected final String keystoreBase64Encoded; @@ -207,6 +208,7 @@ public BulkSparkConf(SparkConf conf, Map options, @Nullable Logg this.ttl = MapUtils.getOrDefault(options, WriterOptions.TTL.name(), null); this.timestamp = MapUtils.getOrDefault(options, WriterOptions.TIMESTAMP.name(), null); this.quoteIdentifiers = MapUtils.getBoolean(options, WriterOptions.QUOTE_IDENTIFIERS.name(), false, "quote identifiers"); + this.skipSecondaryIndexCheck = MapUtils.getBoolean(options, WriterOptions.SKIP_SECONDARY_INDEX_CHECK.name(), false, "skip secondary index check"); int storageClientConcurrency = MapUtils.getInt(options, WriterOptions.STORAGE_CLIENT_CONCURRENCY.name(), DEFAULT_STORAGE_CLIENT_CONCURRENCY, "storage client concurrency"); long storageClientKeepAliveSeconds = MapUtils.getLong(options, WriterOptions.STORAGE_CLIENT_THREAD_KEEP_ALIVE_SECONDS.name(), diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java index 8f0bbcef9..b198af984 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java @@ -69,7 +69,8 @@ public TableSchema(StructType dfSchema, TTLOption ttlOption, TimestampOption timestampOption, String lowestCassandraVersion, - boolean quoteIdentifiers) + boolean quoteIdentifiers, + boolean skipSecondaryIndexCheck) { this.writeMode = writeMode; this.ttlOption = ttlOption; @@ -79,7 +80,21 @@ public TableSchema(StructType dfSchema, this.quoteIdentifiers = quoteIdentifiers; validateDataFrameCompatibility(dfSchema, tableInfo); - validateNoSecondaryIndexes(tableInfo); + // If a table has indexes on it, some external process (application, DB, etc.) is responsible for rebuilding + // indexes on the table after the bulk write completes; cassandra does this as part of the SSTable import + // process today. 2i and SAI have different ergonomics here regarding if stale data is served during index build; + // ultimately we want the bulk writer to also write native SAI index files alongside sstables but until + // then, this is allowable and fine for users who Know What They're Doing. + if (!skipSecondaryIndexCheck) + { + validateNoSecondaryIndexes(tableInfo); + } + else if (tableInfo.hasSecondaryIndex()) + { + LOGGER.warn("Bulk writing to tables with SecondaryIndexes will have an asynchronous index rebuild " + + "take place automatically after writing. Reads against the index during this time " + + "window will produce inconsistent or stale results until index rebuild is complete."); + } validateUserAddedColumns(lowestCassandraVersion, quoteIdentifiers, ttlOption, timestampOption); this.createStatement = getCreateStatement(tableInfo); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java index 6574e2782..445ab206b 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java @@ -132,4 +132,12 @@ public enum WriterOptions implements WriterOption * - a failure otherwise */ JOB_TIMEOUT_SECONDS, + /** + * Option to bypass the secondary index validation check during bulk write job setup. + * By default, bulk writes to tables with secondary indexes are rejected. + * Setting this option to {@code true} allows bulk writes to proceed on tables that have secondary indexes, + * with the understanding that the secondary indexes will NOT be updated by the bulk write and must be + * rebuilt separately after the job completes. + */ + SKIP_SECONDARY_INDEX_CHECK, } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java index 266caed58..138bbffe9 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java @@ -270,6 +270,17 @@ public void testSecondaryIndexIsUnsupported() throws Exception .hasMessage("Bulkwriter doesn't support secondary indexes"); } + @ParameterizedTest + @MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions") + public void testSecondaryIndexAllowedWithSkipCheck(String cassandraVersion) + { + TableSchema schema = getValidSchemaBuilder(cassandraVersion) + .withHasSecondaryIndex() + .withSkipSecondaryIndexCheck() + .build(); + assertThat(schema).isNotNull(); + } + @ParameterizedTest @MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions") public void testMixedCaseTTLColumnNameWithoutQuoteIdentifiersFails(String cassandraVersion) diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java index cff2e6f45..4122d94dd 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java @@ -191,6 +191,8 @@ public static class MockTableSchemaBuilder private TTLOption ttlOption = TTLOption.forever(); private TimestampOption timestampOption = TimestampOption.now(); private boolean quoteIdentifiers = false; + private boolean skipSecondaryIndexCheck = false; + private boolean hasSecondaryIndex = false; public MockTableSchemaBuilder(CassandraBridge bridge) { @@ -270,6 +272,18 @@ public MockTableSchemaBuilder withQuotedIdentifiers() return this; } + public MockTableSchemaBuilder withSkipSecondaryIndexCheck() + { + this.skipSecondaryIndexCheck = true; + return this; + } + + public MockTableSchemaBuilder withHasSecondaryIndex() + { + this.hasSecondaryIndex = true; + return this; + } + private ImmutableMap addColumnToCqlColumns(ImmutableMap currentColumns, String columnName, String cqlType) @@ -315,8 +329,16 @@ public TableSchema build() partitionKeyColumnTypes, primaryKeyColumnNames, cassandraVersion, - quoteIdentifiers); - return new TableSchema(dataFrameSchema, tableInfoProvider, writeMode, ttlOption, timestampOption, cassandraVersion, quoteIdentifiers); + quoteIdentifiers, + hasSecondaryIndex); + return new TableSchema(dataFrameSchema, + tableInfoProvider, + writeMode, + ttlOption, + timestampOption, + cassandraVersion, + quoteIdentifiers, + skipSecondaryIndexCheck); } } @@ -333,6 +355,7 @@ public static class MockTableInfoProvider implements TableInfoProvider Map columns; private final String cassandraVersion; private final boolean quoteIdentifiers; + private final boolean hasSecondaryIndex; public MockTableInfoProvider(CassandraBridge bridge, ImmutableMap cqlColumns, @@ -341,6 +364,18 @@ public MockTableInfoProvider(CassandraBridge bridge, String[] primaryKeyColumnNames, String cassandraVersion, boolean quoteIdentifiers) + { + this(bridge, cqlColumns, partitionKeyColumns, partitionKeyColumnTypes, primaryKeyColumnNames, cassandraVersion, quoteIdentifiers, false); + } + + public MockTableInfoProvider(CassandraBridge bridge, + ImmutableMap cqlColumns, + String[] partitionKeyColumns, + ColumnType[] partitionKeyColumnTypes, + String[] primaryKeyColumnNames, + String cassandraVersion, + boolean quoteIdentifiers, + boolean hasSecondaryIndex) { this.bridge = bridge; this.cqlColumns = cqlColumns; @@ -350,6 +385,7 @@ public MockTableInfoProvider(CassandraBridge bridge, columns = cqlColumns; this.cassandraVersion = cassandraVersion.replaceAll("(\\w+-)*cassandra-", ""); this.quoteIdentifiers = quoteIdentifiers; + this.hasSecondaryIndex = hasSecondaryIndex; this.uniqueTableName = TEST_TABLE_PREFIX + TEST_TABLE_ID.getAndIncrement(); } @@ -439,7 +475,7 @@ public String getKeyspaceName() @Override public boolean hasSecondaryIndex() { - return false; + return hasSecondaryIndex; } @Override