[BEAM-5191] Support for BigQuery clustering#8945
Conversation
eb18664 to
5acc4bf
Compare
|
R: @juancho088 @reuvenlax @alexvanboxel @chamikaramj who were reviewers on #7061 Also cc @wscheep who authored #7061 which is still the bulk of the code here. |
|
Run JavaPortabilityApi PreCommit |
| .withTestServices(fakeBqServices) | ||
| .withMethod(BigQueryIO.Write.Method.FILE_LOADS) | ||
| .withSchema(schema) | ||
| .enableClustering() |
There was a problem hiding this comment.
Notably, this test fails if enableClustering() is not called because the default TableDestinationCoderV2 is used and clustering information is dropped before the table is created. This is exactly the behavior we want for backwards compatibility.
| Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry) | ||
| throws CannotProvideCoderException { | ||
| return inner.getDestinationCoderWithDefault(registry); | ||
| Coder<DestinationT> destinationCoder = getDestinationCoder(); |
There was a problem hiding this comment.
This seems to be a behavior change for non-clustering case ?
Previously we returned inner.getDestinationCoderWithDefault(registry) and now we return TableDestinationCoderV2.of().
There was a problem hiding this comment.
DynamicDestinations#getDestinationCoderWithDefault is commented as:
// Gets the destination coder. If the user does not provide one, try to find one in the coder
// registry. If no coder can be found, throws CannotProvideCoderException.
This code is written with potentially multiple layers of delegation, and I think the correct behavior here is to return the first non-delegated implementation of getDestinationCoder() that appears as we move down the delegation chain.
I would argue that the existing behavior is incorrect. Currently, if an implementing class defines a custom return value for getDestinationCoder, that value is ignored when you call getDestinationCoderWithDefault. My expectation is that getDestinationCoderWithDefault would always return the same value as getDestinationCoder except in the null case, in which getDestinationCoderWithDefault would then attempt to look up a coder in the registry.
So the change here is intended to fix broken behavior.
It's possible that a user has written a custom class that extends DelegatingDynamicDestinations and relies on the incorrect behavior, but it feels unlikely to me.
For the scope of the coders provided here, I don't believe this change affects behavior (the method was already returning TableDestinationCoderV2 in all cases).
There was a problem hiding this comment.
It may actually be better to remove this override altogether. getDestinationCoderWithDefault is defined as package-private and probably shouldn't be overridden. Instead, we let getDestinationCoder handle delegation, and the base implementation of getDestinationCoderWithDefault will make the appropriate getDestinationCoder call before falling back to the registry.
There was a problem hiding this comment.
I've removed the override getDestinationCoderWithDefault completely.
There was a problem hiding this comment.
I've removed the override getDestinationCoderWithDefault completely.
Looks like that caused some regressions where destination coder couldn't be found from type. I'm looking into it.
There was a problem hiding this comment.
The override turns out to be necessary because the DynamicDestinations instance itself is eventually passed into extractFromTypeParameters and the inner class potentially has richer type information than the delegating class.
There was a problem hiding this comment.
I've put the override method back in place, including the modification of first checking if we have a non-null return from getDestinationCoder().
| .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) | ||
| .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); | ||
|
|
||
| p.run().waitUntilFinish(); |
There was a problem hiding this comment.
Thanks for adding these ITs :)
There was a problem hiding this comment.
I believe these ITs are the work of @wscheep in the previous PR, and I am going to borrow liberally from these examples to add the dynamic destinations ITs.
| p.apply(BigQueryIO.readTableRows().from(options.getInput())) | ||
| .apply(ParDo.of(new KeepStationNumberAndConvertDate())) | ||
| .apply( | ||
| BigQueryIO.writeTableRows() |
There was a problem hiding this comment.
Can we also add one with dynamic destinations and clustering ?
There was a problem hiding this comment.
Added. It doesn't look like the ITs are run by Jenkins by default and I also can't find that there's a command to kick off BigQueryIO integration tests. Do you have any advice about how we can get results for these?
There was a problem hiding this comment.
Post-commit test suite should capture these. Just triggered it.
| * option, since {@link TableDestinationCoderV3} will not be able to read state written with a | ||
| * previous version. | ||
| */ | ||
| public Write<T> enableClustering() { |
There was a problem hiding this comment.
Having these two methods seems to make the API pretty brittle.
How about just having one function withClustering() that optionally takes a clustering object ? In dynamic destinations case optional Clustering object can be skipped/null and the method will behave similar to enableClustering() ?
There was a problem hiding this comment.
Implemented. enableClustering() is now withClustering() and it sets clustering to a default Clustering instance with no fields set, which we use as a flag for enabling clustering on dynamic destinations.
bb77061 to
328d14b
Compare
|
Run Java PostCommit |
chamikaramj
left a comment
There was a problem hiding this comment.
Thanks. Looks great.
Just a couple of comments.
...d-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| @Test | ||
| public void testClusteringStreamingInserts() throws Exception { |
There was a problem hiding this comment.
Probably also add a test that confirms that we fail if withClustering() is set without withTimePartitioning().
There was a problem hiding this comment.
Added
@Test(expected = IllegalArgumentException.class)
public void testClusteringThrowsWithoutPartitioning() throws Exception {
|
FYI: I'm OOO till 7/9 so please add another reviewer if you need to get this merged early. LGTM from my side other than the two comments I mentioned and integration tests passing. |
|
Run Java PostCommit |
Addressed those comments, integration tests passed, and now I've squashed and rebased on master.
I'm also OOO next week, so if there are any further changes that need to be made, I'll be back on 7/15. We have several weeks before the branch cut date for 2.15, so I don't see any point in rushing. And thanks for the thoughtful review, @chamikaramj . |
|
Run Java PostCommit |
|
Run Java PostCommit |
| * A {@link Coder} for {@link TableDestination} that includes time partitioning and clustering | ||
| * information. Users must opt in to this version of the coder by setting one of the clustering | ||
| * options on {@link BigQueryIO.Write}, otherwise {@link TableDestinationCoderV2} will be used and | ||
| * clustering information will be discarded. |
There was a problem hiding this comment.
if a user forgets to set a clustering option but creates a TableDestination with clustering information, will we warn them or just silently discard?
|
There is no warning at this point. For the case of loading to an existing
table, the operation will throw exceptions anyway if clustering is
mismatched. Perhaps there are compile time cases we could warn about.
I will have to think more about when I'm back in office next week if we
want to block on that.
…On Mon, Jul 8, 2019, 1:06 PM reuvenlax ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV3.java
<#8945 (comment)>:
> + */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/**
+ * A ***@***.*** Coder} for ***@***.*** TableDestination} that includes time partitioning and clustering
+ * information. Users must opt in to this version of the coder by setting one of the clustering
+ * options on ***@***.*** BigQueryIO.Write}, otherwise ***@***.*** TableDestinationCoderV2} will be used and
+ * clustering information will be discarded.
if a user forgets to set a clustering option but creates a
TableDestination with clustering information, will we warn them or just
silently discard?
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#8945?email_source=notifications&email_token=AAFIW5DW44XOYCDV56PR263P6NXZFA5CNFSM4H3LPCH2YY3PNVWWK3TUL52HS4DFWFIHK3DMKJSXC5LFON2FEZLWNFSXPKTDN5WW2ZLOORPWSZGOB5YJ5IY#pullrequestreview-259038883>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAFIW5HCBJWDJJP3EH4GJE3P6NXZFANCNFSM4H3LPCHQ>
.
|
|
A "good enough" solution might be to always print a warning when clustering
is not enabled. We will want to wordsmith the warning so it's not too
frightening, and it's clear that it only applies if trying to specify
clustering.
…On Mon, Jul 8, 2019 at 1:11 PM Jeff Klukas ***@***.***> wrote:
There is no warning at this point. For the case of loading to an existing
table, the operation will throw exceptions anyway if clustering is
mismatched. Perhaps there are compile time cases we could warn about.
I will have to think more about when I'm back in office next week if we
want to block on that.
On Mon, Jul 8, 2019, 1:06 PM reuvenlax ***@***.***> wrote:
> ***@***.**** commented on this pull request.
> ------------------------------
>
> In
>
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV3.java
> <#8945 (comment)>:
>
> > + */
> +package org.apache.beam.sdk.io.gcp.bigquery;
> +
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.io.OutputStream;
> +import org.apache.beam.sdk.coders.AtomicCoder;
> +import org.apache.beam.sdk.coders.Coder;
> +import org.apache.beam.sdk.coders.NullableCoder;
> +import org.apache.beam.sdk.coders.StringUtf8Coder;
> +
> +/**
> + * A ***@***.*** Coder} for ***@***.*** TableDestination} that includes time
partitioning and clustering
> + * information. Users must opt in to this version of the coder by
setting one of the clustering
> + * options on ***@***.*** BigQueryIO.Write}, otherwise ***@***.***
TableDestinationCoderV2} will be used and
> + * clustering information will be discarded.
>
> if a user forgets to set a clustering option but creates a
> TableDestination with clustering information, will we warn them or just
> silently discard?
>
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub
> <
#8945?email_source=notifications&email_token=AAFIW5DW44XOYCDV56PR263P6NXZFA5CNFSM4H3LPCH2YY3PNVWWK3TUL52HS4DFWFIHK3DMKJSXC5LFON2FEZLWNFSXPKTDN5WW2ZLOORPWSZGOB5YJ5IY#pullrequestreview-259038883
>,
> or mute the thread
> <
https://github.com/notifications/unsubscribe-auth/AAFIW5HCBJWDJJP3EH4GJE3P6NXZFANCNFSM4H3LPCHQ
>
> .
>
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#8945?email_source=notifications&email_token=AFAYJVMDVF3DPJ4JDFBEIJLP6ONN3A5CNFSM4H3LPCH2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODZOGXUY#issuecomment-509373395>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AFAYJVKO7SCMPFTQGZWTTLTP6ONN3ANCNFSM4H3LPCHQ>
.
|
|
Thanks. I agree that a compile time (or even run-time) failure is better but a warning should be OK if this is not possible for some reason. I'll wait till this is addressed for the merge. |
|
Run Java PostCommit |
|
Run Java PreCommit |
|
Run JavaPortabilityApi PreCommit |
| + " to use TableDestinationCoderV2. Set withClustering() on BigQueryIO.write() and, " | ||
| + " if you provided a custom DynamicDestinations instance, override" | ||
| + " getDestinationCoder() to return TableDestinationCoderV3.", | ||
| dynamicDestinations); |
There was a problem hiding this comment.
We cannot know at compile-time whether a custom table function or DynamicDestinations instance will produce any destination with clustering enabled, so we have to check the produced destinations at runtime.
This check will cause a runtime failure for StreamingInserts if a clustered destination is produced without the relevant configuration to use the newer destination coder.
| + " to use TableDestinationCoderV2. Set withClustering() on BigQueryIO.write() and, " | ||
| + " if you provided a custom DynamicDestinations instance, override" | ||
| + " getDestinationCoder() to return TableDestinationCoderV3.", | ||
| dynamicDestinations); |
There was a problem hiding this comment.
This check will cause a runtime failure for BatchLoads if a clustered destination is produced without the relevant configuration to use the newer destination coder.
|
R: @chamikaramj I've added in runtime checks for both the streaming insert and batch load cases where the transform will fail with an exception that includes instructions about how to configure clustering if a clustered destination is produced without the right coder in place. Does this look ready to merge? |
|
LGTM. Thanks Jeff. |
|
Run Java PostCommit |
|
Run Dataflow ValidatesRunner |
|
Run Java PostCommit |
2 similar comments
|
Run Java PostCommit |
|
Run Java PostCommit |
|
Run Java PostCommit |
|
Run Dataflow ValidatesRunner |
|
@chamikaramj All the tests here are finally passing again after transient issues. |
|
Thanks. Merging. |
This takes the commits from #7061, rebases on master, and adds an
enableClusteringmethod to allow users to opt in to the updated coder when using dynamic destinations.This follows the pattern of #6914 where we added a new version of
MetadataCoder, documenting how a user could opt in and why they might want to.This should solve the coder compatibility concerns that were the blockers to merging #7061.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.