Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions docs/docs/branching.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,94 @@ Creating, querying and writing to branches and tags are supported in the Iceberg
- [Spark Branch Writes](spark-writes.md#writing-to-branches)
- [Flink Reads](flink-queries.md#reading-branches-and-tags-with-SQL)
- [Flink Branch Writes](flink-writes.md#branch-writes)


## Schema selection with branches and tags

It is important to understand that the schema tracked for a table is valid across all branches.
When working with branches, the table's schema is used as that's the schema being validated when writing data to a branch.
On the other hands, querying a tag uses the snapshot's schema, which is the schema id that snapshot pointed to when the snapshot was created.

The below examples show which schema is being used when working with branches.

Create a table and insert some data:

```sql
CREATE TABLE db.table (id bigint, data string, col float);
INSERT INTO db.table values (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c', 3.0);
SELECT * FROM db.table;
1 a 1.0
2 b 2.0
3 c 3.0
```

Create a branch `test_branch` that points to the current snapshot and read data from the branch:

```sql
ALTER TABLE db.table CREATE BRANCH test_branch;

SELECT * FROM db.table.branch_test_branch;
1 a 1.0
2 b 2.0
3 c 3.0
```

Modify the table's schema by dropping the `col` column and adding a new column named `new_col`:

```sql
ALTER TABLE db.table drop column float;

ALTER TABLE db.table add column new_col date;

INSERT INTO db.table values (4, 'd', date('2024-04-04')), (5, 'e', date('2024-05-05'));

SELECT * FROM db.table;
1 a NULL
2 b NULL
3 c NULL
4 d 2024-04-04
5 e 2024-05-05
```

Querying the head of the branch using one of the below statements will return data using the **table's schema**:

```sql
SELECT * FROM db.table.branch_test_branch;
1 a NULL
2 b NULL
3 c NULL

SELECT * FROM db.table VERSION AS OF 'test_branch';
1 a NULL
2 b NULL
3 c NULL
```

Performing a time travel query using the snapshot id uses the **snapshot's schema**:

```sql

SELECT * FROM db.table.refs;
test_branch BRANCH 8109744798576441359 NULL NULL NULL
main BRANCH 6910357365743665710 NULL NULL NULL


SELECT * FROM db.table VERSION AS OF 8109744798576441359;
1 a 1.0
2 b 2.0
3 c 3.0
```

When writing to the branch, the **table's schema** is used for validation:

```sql

INSERT INTO db.table.branch_test_branch values (6, 'e', date('2024-06-06')), (7, 'g', date('2024-07-07'));

SELECT * FROM db.table.branch_test_branch;
6 e 2024-06-06
7 g 2024-07-07
1 a NULL
2 b NULL
3 c NULL
```
23 changes: 22 additions & 1 deletion docs/docs/spark-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ The `VERSION AS OF` clause can contain a long snapshot ID or a string branch or
If this is not desired, rename the tag or branch with a well-defined prefix such as 'snapshot-1'.


```sql
```sql
-- time travel to October 26, 1986 at 01:21:00
SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';

Expand Down Expand Up @@ -124,6 +124,27 @@ SELECT * FROM prod.db.table.`tag_historical-snapshot`;

Note that the identifier with branch or tag may not be used in combination with `VERSION AS OF`.


#### Schema selection in time travel queries

The different time travel queries mentioned in the previous section can use either the snapshot's schema or the table's schema:

```sql
-- time travel to October 26, 1986 at 01:21:00 -> uses the snapshot's schema
SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';
Comment thread
nastra marked this conversation as resolved.

-- time travel to snapshot with id 10963874102873L -> uses the snapshot's schema
SELECT * FROM prod.db.table VERSION AS OF 10963874102873;

-- time travel to the head of audit-branch -> uses the table's schema
SELECT * FROM prod.db.table VERSION AS OF 'audit-branch';
SELECT * FROM prod.db.table.`branch_audit-branch`;

-- time travel to the snapshot referenced by the tag historical-snapshot -> uses the snapshot's schema
SELECT * FROM prod.db.table VERSION AS OF 'historical-snapshot';
SELECT * FROM prod.db.table.`tag_historical-snapshot`;
```

#### DataFrame

To select a specific table snapshot or the snapshot at some time in the DataFrame API, Iceberg supports four Spark read options:
Expand Down
4 changes: 4 additions & 0 deletions docs/docs/spark-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ Note WAP branch and branch identifier cannot both be specified.
Also, the branch must exist before performing the write.
The operation does **not** create the branch if it does not exist.
For more information on branches please refer to [branches](branching.md).

!!! info
Comment thread
nastra marked this conversation as resolved.
Note: When writing to a branch, the current schema of the table will be used for validation.


```sql
-- INSERT (1,' a') (2, 'b') into the audit branch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.assertj.core.api.Assumptions.assumeThat;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -348,6 +349,64 @@ public void testBranchReference() {
assertEquals("Snapshot at specific branch reference name", expected, fromDF);
}

@TestTemplate
public void readAndWriteWithBranchAfterSchemaChange() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test looks correct to me. Thanks, @nastra!

Table table = validationCatalog.loadTable(tableIdent);
String branchName = "test_branch";
table.manageSnapshots().createBranch(branchName, table.currentSnapshot().snapshotId()).commit();

List<Object[]> expected =
Arrays.asList(row(1L, "a", 1.0f), row(2L, "b", 2.0f), row(3L, "c", Float.NaN));
assertThat(sql("SELECT * FROM %s", tableName)).containsExactlyElementsOf(expected);

// change schema on the table and add more data
sql("ALTER TABLE %s DROP COLUMN float", tableName);
sql("ALTER TABLE %s ADD COLUMN new_col date", tableName);
sql(
"INSERT INTO %s VALUES (4, 'd', date('2024-04-04')), (5, 'e', date('2024-05-05'))",
tableName);

// time-travel query using snapshot id should return the snapshot's schema
long branchSnapshotId = table.refs().get(branchName).snapshotId();
assertThat(sql("SELECT * FROM %s VERSION AS OF %s", tableName, branchSnapshotId))
.containsExactlyElementsOf(expected);

// querying the head of the branch should return the table's schema
assertThat(sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, branchName))
.containsExactly(row(1L, "a", null), row(2L, "b", null), row(3L, "c", null));

if (!"spark_catalog".equals(catalogName)) {
// querying the head of the branch using 'branch_' should return the table's schema
assertThat(sql("SELECT * FROM %s.branch_%s", tableName, branchName))
.containsExactly(row(1L, "a", null), row(2L, "b", null), row(3L, "c", null));
}

// writing to a branch uses the table's schema
sql(
"INSERT INTO %s.branch_%s VALUES (6L, 'f', cast('2023-06-06' as date)), (7L, 'g', cast('2023-07-07' as date))",
tableName, branchName);

// querying the head of the branch returns the table's schema
assertThat(sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, branchName))
.containsExactlyInAnyOrder(
row(1L, "a", null),
row(2L, "b", null),
row(3L, "c", null),
row(6L, "f", java.sql.Date.valueOf("2023-06-06")),
row(7L, "g", java.sql.Date.valueOf("2023-07-07")));

// using DataFrameReader with the 'branch' option should return the table's schema
Dataset<Row> df =
spark.read().format("iceberg").option(SparkReadOptions.BRANCH, branchName).load(tableName);
assertThat(rowsToJava(df.collectAsList()))
.containsExactlyInAnyOrder(
row(1L, "a", null),
row(2L, "b", null),
row(3L, "c", null),
row(6L, "f", java.sql.Date.valueOf("2023-06-06")),
row(7L, "g", java.sql.Date.valueOf("2023-07-07")));
}

@TestTemplate
public void testUnknownReferenceAsOf() {
assertThatThrownBy(() -> sql("SELECT * FROM %s VERSION AS OF 'test_unknown'", tableName))
Expand Down