Skip to content

[BEAM-5191] Support for BigQuery clustering#7061

Closed
wscheep wants to merge 1 commit intoapache:masterfrom
wscheep:bq_clustering
Closed

[BEAM-5191] Support for BigQuery clustering#7061
wscheep wants to merge 1 commit intoapache:masterfrom
wscheep:bq_clustering

Conversation

@wscheep
Copy link
Copy Markdown

@wscheep wscheep commented Nov 16, 2018

Implemented BigQuery clustering: https://cloud.google.com/bigquery/docs/clustered-tables.

As this is related to BigQuery TimePartitioning, I based my implementation on this commit:
b0e03a3

As far as I know, there are no integration tests covering time partitioning, so I did not add tests for clustering. If needed I can write some if someone points me in the right direction.

This is my first feature PR, so I'm eager to get some proper feedback.
@jkff, @reuvenlax as you committed & authored time partitioning, can you have a look?

Thanks,
Wout


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status Build Status Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
Build Status --- --- ---

@mxm
Copy link
Copy Markdown
Contributor

mxm commented Nov 20, 2018

Run JavaPortabilityApi PreCommit

@chamikaramj chamikaramj self-requested a review November 28, 2018 18:27
@chamikaramj
Copy link
Copy Markdown
Contributor

cc: @reuvenlax

@robertwb
Copy link
Copy Markdown
Contributor

@chamikaramj @reuvenlax any update on this?

Copy link
Copy Markdown
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Added some comments.

Please add an integration test for the new feature. I know that we did not add this for time-based partitioning but we recently added some BQ integration tests and you can probably follow that (it's great if you can add a test for time partitioning as well :) )
https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadIT.java

}
}

static class ClusteringToJson implements SerializableFunction<Clustering, String> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document why this is needed.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't, removed it.

return withJsonClustering(NestedValueProvider.of(clustering, new ClusteringToJson()));
}

public Write<T> withJsonClustering(ValueProvider<String> clustering) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea why we would need 'withJsonClustering' ? (I understand that you are following time partitioning here).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I don't need to expose it. Removed it.

}
if (getJsonClustering() != null) {
checkArgument(
getJsonTimePartitioning() != null,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that combinations (getJsonClustering and getTimePartitioning) and (getClustering and getJsonTimePartitioning) are not accepted ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be covered now if I'm not mistaken

dynamicDestinations =
new ConstantTimePartitioningDestinations(
dynamicDestinations, getJsonTimePartitioning());
if (getJsonClustering() != null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding a single class instead of forking here ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}
}

static class ConstantClusteringDestinations<T> extends ConstantTimePartitioningDestinations<T> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly cleaner rename and update existing class instead of sub-classing here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@wscheep
Copy link
Copy Markdown
Author

wscheep commented Jan 28, 2019

@chamikaramj Thanks for the review. Sorry it took a while, lost track of it. I made some changes and added an IT covering timepartitioning and clustering tests.

@lgajowy
Copy link
Copy Markdown
Contributor

lgajowy commented Jan 28, 2019

@chamikaramj @wscheep Created BigQueryTimePartitioningIT dataset for the test in the apache-beam-testing project.

@wscheep wscheep force-pushed the bq_clustering branch 2 times, most recently from 08cb56b to 5c3320d Compare January 28, 2019 15:14
@wscheep
Copy link
Copy Markdown
Author

wscheep commented Jan 28, 2019

Run Java PreCommit

@chamikaramj
Copy link
Copy Markdown
Contributor

Run Java PostCommit

@chamikaramj
Copy link
Copy Markdown
Contributor

Run Dataflow ValidatesRunner

@chamikaramj
Copy link
Copy Markdown
Contributor

Thanks. LGTM.

I'll merge after post-commit tests pass.

@wscheep wscheep force-pushed the bq_clustering branch 2 times, most recently from 685df30 to 3f728c9 Compare February 3, 2019 17:18
@wscheep
Copy link
Copy Markdown
Author

wscheep commented Feb 3, 2019

Run Java PreCommit

@wscheep
Copy link
Copy Markdown
Author

wscheep commented Feb 3, 2019

Run Java PostCommit

@chamikaramj
Copy link
Copy Markdown
Contributor

Sorry, can you please resolve the conflict.

.setCoder(
KvCoder.of(
VoidCoder.of(), KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of())))
VoidCoder.of(), KvCoder.of(TableDestinationCoderV3.of(), StringUtf8Coder.of())))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will break some Dataflow users who expect to be able to update their pipelines, and can't if these coders change. @chamikaramj what do you think here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm bit confused by this comment (and probably missing something). Isn't this similar to the TableDestinationCoder to TableDestinationCoderV2 change we did in following commit to preserve update compatibility ?

b0e03a3#diff-b2706c94bc268b2bc2b78820ab23b0fe

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not sure, I based myself on the commit referenced above. Why doesn't this preserve update compatibility?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reuvenlax in retrospec looking at both the TableDestinationCoder and TableDestinationCoderV2 I see that they are complexly incompatible and could understand requiring the V2 coder. Am I correct that iso adding another Nullable field (clustering) in the TableDestinationCoderV2 will not break the upgradeability? I think the topic of Coder compatibility on upgrades is not a well understood topic.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coder compatibility is unfortunately not well defined. The best assumption today is that any change to coder is incompatible (Beam needs to define a better story around updating coders).

I don't think a Nullable field helps here. The nullable coder will still expect something there, and there will be nothing there.

We need to either:

  1. Decide that this is important enough that we are willing to break update.
    2 . Make the use of the new coder somehow contingent on the user using clustering.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we make that decision ?

Also, what did we do previously ? Did we cluster changes that will unavoidably break update compatibility ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wscheep is approach (2) described by Reuven viable here ? I.e., Can we only use new coder when user use BQ clustering ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this particular instance, I think we can safely continue to use TableDestinationCoderV2 since none of the subsequent processing steps in this method reference the clustering configuration. The TableDestinations we code here are used to configure a copy job, which does not need to know about clustering.

/**
* A {@link Coder} for {@link TableDestination} that includes time partitioning information. This is
* a new coder (instead of extending the old {@link TableDestinationCoder}) for compatibility
* reasons. The old coder is kept around for the same compatibility reasons.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the logic. We are replacing the old coder with the new coder, which has the same compatibility issues. Keeping the old coder around in the codebase doesn't really change much.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the biggest confusion started when the V2 coder was created. @wscheep thought this is the patten of updating coders. If you can confirm that adding the field (as Nullable) would not break upgrades the V3 can be removed and added to the V2.

@wscheep
Copy link
Copy Markdown
Author

wscheep commented Feb 6, 2019

Run Python PreCommit

@reuvenlax
Copy link
Copy Markdown
Contributor

Sorry for the delay here, I was out of the office for a few days.

I'm working on a way so this can be done in a compatible way. I'll update you once I have something.

@juancho088
Copy link
Copy Markdown

How is this PR going on? Soon to be completed?

@jklukas
Copy link
Copy Markdown
Contributor

jklukas commented May 20, 2019

Getting this PR to completion would be very useful, as table clustering is no longer in beta and likely to become more popular.

tempTables
.apply("ReifyRenameInput", new ReifyAsIterable<>())
.setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of())))
.setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoderV3.of(), StringUtf8Coder.of())))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. The coded value is only passed to WriteRename which creates a copy job and never references the clustering configuration.

@jklukas
Copy link
Copy Markdown
Contributor

jklukas commented Jun 26, 2019

Since this PR seems to have stalled out, I've taken the commits here and posted a new PR #8945 that is rebased on master and addresses the coder evolution issue by providing an interface for users to opt in to the new coder.

@jklukas
Copy link
Copy Markdown
Contributor

jklukas commented Jul 22, 2019

The contents of this PR were incorporated into #8945 which is now merged, so this issue can be closed. This feature should be available in the 2.15 release.

@chamikaramj
Copy link
Copy Markdown
Contributor

Closing since this was included in #8945

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants