Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions core/src/main/java/org/apache/iceberg/SchemaUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.mapping.MappingUtil;
Expand Down Expand Up @@ -437,7 +438,8 @@ private void internalMove(String name, Move move) {
@Override
public Schema apply() {
Schema newSchema =
applyChanges(schema, deletes, updates, adds, moves, identifierFieldNames, caseSensitive);
applyChanges(
schema, deletes, updates, adds, moves, identifierFieldNames, caseSensitive, base, ops);

return newSchema;
}
Expand Down Expand Up @@ -508,7 +510,9 @@ private static Schema applyChanges(
Multimap<Integer, Types.NestedField> adds,
Multimap<Integer, Move> moves,
Set<String> identifierFieldNames,
boolean caseSensitive) {
boolean caseSensitive,
TableMetadata base,
TableOperations ops) {
// validate existing identifier fields are not deleted
Map<Integer, Integer> idToParent = TypeUtil.indexParents(schema.asStruct());

Expand All @@ -533,6 +537,36 @@ private static Schema applyChanges(
}
}

Map<Integer, List<Integer>> specToDeletes = Maps.newHashMap();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a mapping from a spec to the requested fields to delete, where the fields are referenced as part of that spec.
It's a reverse mapping which makes surfacing the error details for the case where it's an active partition spec easier. Probably needs a better name though

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put this in a separate checkNotDeletingColumnsInActiveSpecs method.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We occurred a similar issue. There might be another option which simplifies this schema update logic, namely:

  1. Prevent dropping any source column if it's used in any spec in table's specById
  2. Introduce the RemoveUnusedSpec in https://github.com/apache/iceberg/pull/3462/files

In that way, SchemaUpdate only needs to check specById instead of reading all the manifest files.

To actually drop an unused partition source field, user should make sure all the data files wrote by that spec are rewrote or removed, then call RemoveUnusedSpec to drop that spec.

Copy link
Copy Markdown
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a better option @advancedxy , I was having a slack discussion with someone on removing unused partition specs and unused schemas and thinking the same thing about this PR. I think there's a legitimate need for those operations and we could leverage that need here.

With this approach, this means to drop a column which used to be referenced in a partition spec a user must first call RemovedUnusedPartitionSpec to remove any partition specs (this procedure would have to go through the manifests) and only then can they perform the column drop. This seems better because then we don't have to have potential manifest reads on the schema evolution path which I can see being problematic for folks.

One aspect is on ordering of these. There are two ways of going about this:

1.) Prevent dropping of columns which are part of the specs and get in the RemovedUnusedPartitionSpec procedure after that. The downside of this approach is that there are cases where a user could not drop a partition column even though they should be able to. Here's a trivial case: Imagine a user just creates the table and they don't write any data. Then they realize they want to drop a partition column, but the procedure will fail unexpectedly whereas before it would work. The benefit of this approach though is we will prevent users from shooting themselves in the foot generally as seen by the reported issues for this.

2.) First get in the RemovedUnusedPartitionSpec procedure and then prevent dropping if it's part of a spec. The downside of this is, it may take some more time in to get the whole API in? Maybe not much more time since it looks mostly there, but I'd have to check. Another downside is until the procedure is in, users may end up in bad states. The benefit of this approach is that users have a way for dropping historical partition columns that are unreachable prior to the behavior change.

Right now in my mind approach 1 seems better even though there are some behavior changes, it seems net better for users.

cc @Fokko @RussellSpitzer @aokolnychyi @rdblue @nastra in case they had any comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One aspect is on ordering of these. There are two ways of going about this:

I don't think the ordering of getting these two functionality into the master really matter that much? As long as they are both merged before the next release(assuming we are saying Iceberg 1.7 release). I imagine customer should use official released versions.

Here's a trivial case: Imagine a user just creates the table and they don't write any data. Then they realize they want to drop a partition column, but the procedure will fail unexpectedly whereas before it would work.

If preventing dropping active partition source field and RemoveUnusedPartitionSpec are both landed. It's still possible to drop the just created table but with some additional but necessary steps:

  1. first remove the unwanted partition field, which will create a new PartitionSpec
  2. Call RemoveUnusedPartitionSpec, which should be able to remove the previous wrongly partition spec
  3. remove the wrongly partition source field.

First get in the RemovedUnusedPartitionSpec procedure and then prevent dropping if it's part of a spec. The downside of this is, it may take some more time in to get the whole API in?

I think we can parallelize these two PRs if others all agree that's in the right direction. BTW, I can help to work on https://github.com/apache/iceberg/pull/3462/files to get it merged in case @RussellSpitzer is busy and cannot work on that recently.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@advancedxy That's reasonable, we can do the 2 independently. Cool, discussed offline with @RussellSpitzer feel free to go for it, for carrying forward the removing historical partitions PR!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, filed #10755, please take a look.

if (base != null && base.currentSnapshot() != null) {
for (int fieldIdToDelete : deletes) {
for (PartitionSpec spec : base.specs()) {
if (spec.fields().stream()
.anyMatch(partitionField -> partitionField.sourceId() == fieldIdToDelete)) {
List<Integer> deletesForSpec =
specToDeletes.computeIfAbsent(spec.specId(), k -> Lists.newArrayList());
deletesForSpec.add(fieldIdToDelete);
specToDeletes.put(spec.specId(), deletesForSpec);
}
}
}

if (!specToDeletes.isEmpty()) {
List<ManifestFile> manifests =
ManifestLists.read(
ops.io().newInputFile(base.currentSnapshot().manifestListLocation()));
Optional<ManifestFile> manifestReferencingActivePartition =
manifests.stream()
.filter(manifest -> specToDeletes.containsKey(manifest.partitionSpecId()))
.findAny();
Preconditions.checkArgument(
!manifestReferencingActivePartition.isPresent(),
"Cannot delete field %s as it is used by an active partition spec %s",
specToDeletes.get(manifestReferencingActivePartition.get().partitionSpecId()).get(0),
manifestReferencingActivePartition.get().partitionSpecId());
}
}

// apply schema changes
Types.StructType struct =
TypeUtil.visit(schema, new ApplyChanges(deletes, updates, adds, moves))
Expand Down