Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/spark-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Both catalogs are configured using properties nested under the catalog name. Com
| spark.sql.catalog._catalog-name_.cache.expiration-interval-ms | `30000` (30 seconds) | Duration after which cached catalog entries are expired; Only effective if `cache-enabled` is `true`. `-1` disables cache expiration and `0` disables caching entirely, irrespective of `cache-enabled`. Default is `30000` (30 seconds) |
| spark.sql.catalog._catalog-name_.table-default._propertyKey_ | | Default Iceberg table property value for property key _propertyKey_, which will be set on tables created by this catalog if not overridden |
| spark.sql.catalog._catalog-name_.table-override._propertyKey_ | | Enforced Iceberg table property value for property key _propertyKey_, which cannot be overridden by user |
| spark.sql.catalog._catalog-name_.use-nullable-query-schema | `true` or `false` | Whether to preserve fields' nullability when creating the table using CTAS and RTAS. If set to `true`, all fields will be marked as nullable. If set to `false`, fields' nullability will be preserved. The default value is `true`. Available in Spark 3.5 and above. |

Additional properties can be found in common [catalog configuration](configuration.md#catalog-properties).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,25 @@
import org.apache.iceberg.spark.procedures.SparkProcedures;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.spark.source.HasIcebergCatalog;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

abstract class BaseCatalog
implements StagingTableCatalog,
ProcedureCatalog,
SupportsNamespaces,
HasIcebergCatalog,
SupportsFunctions {
private static final String USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS = "use-nullable-query-schema";
private static final boolean USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT = true;

private boolean useNullableQuerySchema = USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT;

@Override
public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException {
Expand Down Expand Up @@ -66,6 +72,20 @@ public boolean isExistingNamespace(String[] namespace) {
return namespaceExists(namespace);
}

@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.useNullableQuerySchema =
PropertyUtil.propertyAsBoolean(
options,
USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS,
USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT);
}

@Override
public boolean useNullableQuerySchema() {
return useNullableQuerySchema;
}

private static boolean isSystemNamespace(String[] namespace) {
return namespace.length == 1 && namespace[0].equalsIgnoreCase("system");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,8 @@ public void renameView(Identifier fromIdentifier, Identifier toIdentifier)

@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
super.initialize(name, options);

this.cacheEnabled =
PropertyUtil.propertyAsBoolean(
options, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ public void renameTable(Identifier from, Identifier to)

@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
super.initialize(name, options);

if (options.containsKey(CatalogUtil.ICEBERG_CATALOG_TYPE)
&& options
.get(CatalogUtil.ICEBERG_CATALOG_TYPE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.ThreadLocalRandom;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
Expand All @@ -33,6 +38,47 @@
import org.junit.jupiter.api.TestTemplate;

public class TestSparkCatalogOperations extends CatalogTestBase {
private static boolean useNullableQuerySchema = ThreadLocalRandom.current().nextBoolean();

@Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
protected static Object[][] parameters() {
return new Object[][] {
{
SparkCatalogConfig.HIVE.catalogName(),
SparkCatalogConfig.HIVE.implementation(),
ImmutableMap.of(
"type", "hive",
"default-namespace", "default",
"use-nullable-query-schema", Boolean.toString(useNullableQuerySchema))
},
{
SparkCatalogConfig.HADOOP.catalogName(),
SparkCatalogConfig.HADOOP.implementation(),
ImmutableMap.of(
"type",
"hadoop",
"cache-enabled",
"false",
"use-nullable-query-schema",
Boolean.toString(useNullableQuerySchema))
},
{
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
ImmutableMap.of(
"type",
"hive",
"default-namespace",
"default",
"parquet-enabled",
"true",
"cache-enabled",
"false", // Spark will delete tables using v1, leaving the cache out of sync
"use-nullable-query-schema",
Boolean.toString(useNullableQuerySchema)),
}
};
}

@BeforeEach
public void createTable() {
Expand Down Expand Up @@ -86,4 +132,60 @@ public void testInvalidateTable() {
sql("REFRESH TABLE %s", tableName);
sql("SELECT count(1) FROM %s", tableName);
}

@TestTemplate
public void testCTASUseNullableQuerySchema() {
sql("INSERT INTO %s VALUES(1, 'abc'), (2, null)", tableName);

String ctasTableName = tableName("ctas_table");

sql("CREATE TABLE %s USING iceberg AS SELECT * FROM %s", ctasTableName, tableName);

org.apache.iceberg.Table ctasTable =
validationCatalog.loadTable(TableIdentifier.parse("default.ctas_table"));

Schema expectedSchema =
new Schema(
useNullableQuerySchema
? Types.NestedField.optional(1, "id", Types.LongType.get())
: Types.NestedField.required(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()));

assertThat(ctasTable.schema().asStruct())
.as("Should have expected schema")
.isEqualTo(expectedSchema.asStruct());

sql("DROP TABLE IF EXISTS %s", ctasTableName);
}

@TestTemplate
public void testRTASUseNullableQuerySchema() {
sql("INSERT INTO %s VALUES(1, 'abc'), (2, null)", tableName);

String rtasTableName = tableName("rtas_table");
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", rtasTableName);

sql("REPLACE TABLE %s USING iceberg AS SELECT * FROM %s", rtasTableName, tableName);

org.apache.iceberg.Table rtasTable =
validationCatalog.loadTable(TableIdentifier.parse("default.rtas_table"));

Schema expectedSchema =
new Schema(
useNullableQuerySchema
? Types.NestedField.optional(1, "id", Types.LongType.get())
: Types.NestedField.required(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()));

assertThat(rtasTable.schema().asStruct())
.as("Should have expected schema")
.isEqualTo(expectedSchema.asStruct());

assertEquals(
"Should have rows matching the source table",
sql("SELECT * FROM %s ORDER BY id", tableName),
sql("SELECT * FROM %s ORDER BY id", rtasTableName));

sql("DROP TABLE IF EXISTS %s", rtasTableName);
}
}