From 51177349b901e7d6ac4f07024f1d40c5ba95a7b7 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 9 Oct 2025 10:48:36 -0400 Subject: [PATCH 01/15] Revert "CombinePerKey with gbek (Java) (#36408)" This reverts commit b5a0495e55b0f9b4156b76aa6816344b4f088e7b. --- .../trigger_files/beam_PostCommit_Java.json | 2 +- .../beam_PostCommit_Java_DataflowV1.json | 2 +- .../beam_PostCommit_Java_DataflowV2.json | 2 +- .../apache/beam/sdk/transforms/Combine.java | 19 --------- .../util/construction/CombineTranslation.java | 17 +------- .../beam/sdk/transforms/GroupByKeyIT.java | 42 +------------------ 6 files changed, 6 insertions(+), 78 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java.json b/.github/trigger_files/beam_PostCommit_Java.json index 1bd74515152c..8784d0786c02 100644 --- a/.github/trigger_files/beam_PostCommit_Java.json +++ b/.github/trigger_files/beam_PostCommit_Java.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 4 + "modification": 2 } \ No newline at end of file diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json index 5e7fbb916f4b..42fb8f985ba1 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json @@ -3,6 +3,6 @@ "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 4, + "modification": 2, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json index 73012c45df18..3717f48ee492 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json @@ -1,4 +1,4 @@ { - "modification": 6, + "modification": 4, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index e138b32c58fe..f1a964fa5a61 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -41,10 +41,6 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.Globally; -import org.apache.beam.sdk.transforms.Combine.PerKey; import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; @@ -1503,7 +1499,6 @@ public static class PerKey private final DisplayData.ItemSpec> fnDisplayData; private final boolean fewKeys; private final List> sideInputs; - private boolean shouldSkipReplacement; private PerKey( GlobalCombineFn fn, @@ -1513,7 +1508,6 @@ private PerKey( this.fnDisplayData = fnDisplayData; this.fewKeys = fewKeys; this.sideInputs = ImmutableList.of(); - this.shouldSkipReplacement = false; } private PerKey( @@ -1525,7 +1519,6 @@ private PerKey( this.fnDisplayData = fnDisplayData; this.fewKeys = fewKeys; this.sideInputs = sideInputs; - this.shouldSkipReplacement = false; } @Override @@ -1599,11 +1592,6 @@ public List> getSideInputs() { return sideInputs; } - /** Returns whether a runner should skip replacing this transform. For runner use only */ - public boolean shouldSkipReplacement() { - return this.shouldSkipReplacement; - } - /** * Returns the side inputs of this {@link Combine}, tagged with the tag of the {@link * PCollectionView}. The values of the returned map will be equal to the result of {@link @@ -1616,13 +1604,6 @@ public Map, PValue> getAdditionalInputs() { @Override public PCollection> expand(PCollection> input) { - PipelineOptions options = input.getPipeline().getOptions(); - String gbekOveride = options.getGbek(); - if (gbekOveride != null && !gbekOveride.trim().isEmpty()) { - // Don't replace this transform if we're using GBEK since the runner may insert - // its own GBK which doesn't perform encryption. - this.shouldSkipReplacement = true; - } return input .apply(fewKeys ? GroupByKey.createWithFewKeys() : GroupByKey.create()) .apply( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CombineTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CombineTranslation.java index 73a3ed84d820..1a1913d87f39 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CombineTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CombineTranslation.java @@ -61,25 +61,12 @@ public String getUrn() { return PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN; } - @Override - public String getUrn(Combine.PerKey transform) { - if (transform.shouldSkipReplacement()) { - return "beam:transform:combine_per_key_wrapper:v1"; - } - return PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN; - } - @Override public FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) throws IOException { - Combine.PerKey underlyingCombine = transform.getTransform(); - if (underlyingCombine.shouldSkipReplacement()) { - // Can use null for spec for generic composite. - return null; - } - if (underlyingCombine.getSideInputs().isEmpty()) { - GlobalCombineFn combineFn = underlyingCombine.getFn(); + if (transform.getTransform().getSideInputs().isEmpty()) { + GlobalCombineFn combineFn = transform.getTransform().getFn(); Coder accumulatorCoder = extractAccumulatorCoder(combineFn, (AppliedPTransform) transform); return FunctionSpec.newBuilder() diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java index 1c8168a42a03..141d6dae64b2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -151,7 +151,7 @@ public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception { // Redistribute depends on GBK under the hood and can have runner-specific implementations @Test - public void testRedistributeWithValidGcpSecretOption() throws Exception { + public void redistributeWithValidGcpSecretOption() throws Exception { if (gcpSecretVersionName == null) { // Skip test if we couldn't set up secret manager return; @@ -192,44 +192,4 @@ public void testRedistributeWithInvalidGcpSecretOption() throws Exception { thrown.expect(RuntimeException.class); p.run(); } - - // Combine.PerKey depends on GBK under the hood, but can be overriden by a runner. This can - // fail unless it is handled specially, so we should test it specifically - @Test - public void testCombinePerKeyWithValidGcpSecretOption() throws Exception { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); - Pipeline p = Pipeline.create(options); - - List> ungroupedPairs = - Arrays.asList( - KV.of("k1", 3), KV.of("k2", 66), KV.of("k1", 4), KV.of("k2", -33), KV.of("k3", 0)); - List> sums = Arrays.asList(KV.of("k1", 7), KV.of("k2", 33), KV.of("k3", 0)); - PCollection> input = - p.apply( - Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); - PCollection> output = input.apply(Combine.perKey(Sum.ofIntegers())); - PAssert.that(output).containsInAnyOrder(sums); - - p.run(); - } - - @Test - public void testCombinePerKeyWithInvalidGcpSecretOption() throws Exception { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); - Pipeline p = Pipeline.create(options); - p.apply(Create.of(KV.of("k1", 1))).apply(Combine.perKey(Sum.ofIntegers())); - thrown.expect(RuntimeException.class); - p.run(); - } } From 8325d1bf6e313118aa18ba939fc5842e197592b3 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 9 Oct 2025 10:49:44 -0400 Subject: [PATCH 02/15] Revert "Use consistent encoding for GBEK across languages (#36431)" This reverts commit e410e34b0671b00548e7ea4c504732806863e92e. --- .../org/apache/beam/sdk/options/PipelineOptions.java | 11 +++-------- .../beam/sdk/transforms/GroupByEncryptedKey.java | 10 ++++------ .../beam/sdk/transforms/GroupByEncryptedKeyTest.java | 7 ++----- .../org/apache/beam/sdk/transforms/GroupByKeyIT.java | 5 +---- .../apache/beam/sdk/transforms/GroupByKeyTest.java | 4 +--- sdks/python/apache_beam/options/pipeline_options.py | 8 ++------ 6 files changed, 13 insertions(+), 32 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 989e3a1e3193..62022b219c2a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -420,7 +420,7 @@ public Long create(PipelineOptions options) { *

Beam will infer the secret type and value based on the secret itself. This guarantees that * any data at rest during the performing a GBK, so this can be used to guarantee that data is not * unencrypted. Runners with this behavior include the Dataflow, Flink, and Spark runners. The - * secret should be a url safe base64 encoded 32 byte value. The option should be structured like: + * option should be structured like: * *


    * --gbek=type:;:
@@ -432,19 +432,14 @@ public Long create(PipelineOptions options) {
    * --gbek=type:GcpSecret;version_name:my_secret/versions/latest"
    * 
* - * All variables should use snake case to allow consistency across languages. For an example of - * generating a properly formatted secret, see - * https://github.com/apache/beam/blob/c8df4da229da49d533491857e1bb4ab5dbf4fd37/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java#L82 + * All variables should use snake case to allow consistency across languages. */ @Description( "When set, will replace all GroupByKey transforms in the pipeline the option. Beam will" + " infer the secret type and value based on the secret itself. This guarantees that" + " any data at rest during the performing a GBK, so this can be used to guarantee" + " that data is not unencrypted. Runners with this behavior include the Dataflow," - + " Flink, and Spark runners. The secret should be a url safe base64 encoded 32 byte" - + " value. For an example of generating a properly formatted secret, see" - + " https://github.com/apache/beam/blob/c8df4da229da49d533491857e1bb4ab5dbf4fd37/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java#L82" - + " When passing in the gbek option, it should be structured like:" + + " Flink, and Spark runners. The option should be structured like:" + " --gbek=type:;:, for example " + " --gbek=type:GcpSecret;version_name:my_secret/versions/latest. All variables " + " should use snake case to allow consistency across languages.") diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index 1f4b7535d89e..6ed0a31b3b95 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -39,8 +39,8 @@ * the output. This is useful when the keys contain sensitive data that should not be stored at rest * by the runner. * - *

The transform requires a {@link Secret} which returns a base64 encoded 32 byte secret which - * can be used to generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm. + *

The transform requires a {@link Secret} which returns a 32 byte secret which can be used to + * generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm. * *

Note the following caveats: 1) Runners can implement arbitrary materialization steps, so this * does not guarantee that the whole pipeline will not have unencrypted data at rest by itself. 2) @@ -153,7 +153,7 @@ private static class EncryptMessage extends DoFn, KV public void setup() { try { this.cipher = Cipher.getInstance("AES/GCM/NoPadding"); - this.secretKeySpec = - new SecretKeySpec( - java.util.Base64.getUrlDecoder().decode(this.hmacKey.getSecretBytes()), "AES"); + this.secretKeySpec = new SecretKeySpec(this.hmacKey.getSecretBytes(), "AES"); } catch (Exception ex) { throw new RuntimeException( "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index 3a2fc2f08c04..ba4c50e5a41e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -58,7 +58,7 @@ public class GroupByEncryptedKeyTest implements Serializable { private static class FakeSecret implements Secret { private final byte[] secret = - "YUt3STJQbXFZRnQycDV0TktDeUJTNXFZV0hoSHNHWmM".getBytes(Charset.defaultCharset()); + "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZc".getBytes(Charset.defaultCharset()); @Override public byte[] getSecretBytes() { @@ -123,10 +123,7 @@ public static void setup() throws IOException { byte[] secretBytes = new byte[32]; new SecureRandom().nextBytes(secretBytes); client.addSecretVersion( - secretName, - SecretPayload.newBuilder() - .setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) - .build()); + secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); } gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest"); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java index 141d6dae64b2..60477a4c242f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -82,10 +82,7 @@ public static void setup() throws IOException { byte[] secretBytes = new byte[32]; new SecureRandom().nextBytes(secretBytes); client.addSecretVersion( - secretName, - SecretPayload.newBuilder() - .setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) - .build()); + secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); } gcpSecretVersionName = secretName.toString() + "/versions/latest"; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index d9a3e3ed20d4..326da99f1a81 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -153,9 +153,7 @@ public static void setup() throws IOException { new SecureRandom().nextBytes(secretBytes); client.addSecretVersion( secretName, - SecretPayload.newBuilder() - .setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) - .build()); + SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); } gcpSecretVersionName = secretName.toString() + "/versions/latest"; } diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 3fc5151156f1..2d3b8b49d8d7 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1726,12 +1726,8 @@ def _add_argparse_args(cls, parser): 'secret itself. This guarantees that any data at rest during the ' 'GBK will be encrypted. Many runners only store data at rest when ' 'performing a GBK, so this can be used to guarantee that data is ' - 'not unencrypted. The secret should be a url safe base64 encoded ' - '32 byte value. To generate a secret in this format, you can use ' - 'Secret.generate_secret_bytes(). For an example of this, see ' - 'https://github.com/apache/beam/blob/c8df4da229da49d533491857e1bb4ab5dbf4fd37/sdks/python/apache_beam/transforms/util_test.py#L356. ' # pylint: disable=line-too-long - 'Runners with this behavior include the Dataflow, ' - 'Flink, and Spark runners. The option should be ' + 'not unencrypted. Runners with this behavior include the ' + 'Dataflow, Flink, and Spark runners. The option should be ' 'structured like: ' '--gbek=type:;:, for example ' '--gbek=type:GcpSecret;version_name:my_secret/versions/latest')) From 4c9d55b94e39d5bd75b219b4e9df6ccf0e0160b9 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 9 Oct 2025 10:49:49 -0400 Subject: [PATCH 03/15] Revert "Add pipeline option to force GBEK (Java) (#36346)" This reverts commit c8df4da229da49d533491857e1bb4ab5dbf4fd37. --- .../trigger_files/beam_PostCommit_Java.json | 2 +- .../beam_PostCommit_Java_DataflowV1.json | 2 +- .../beam_PostCommit_Java_DataflowV2.json | 6 +- .../dataflow/internal/DataflowGroupByKey.java | 52 +---- .../beam/checkstyle/suppressions.xml | 2 - .../beam/sdk/options/PipelineOptions.java | 35 ---- .../sdk/transforms/GroupByEncryptedKey.java | 34 +--- .../beam/sdk/transforms/GroupByKey.java | 34 ---- .../org/apache/beam/sdk/util/GcpSecret.java | 9 - .../java/org/apache/beam/sdk/util/Secret.java | 49 ----- .../construction/GroupByKeyTranslation.java | 14 -- .../construction/PTransformTranslation.java | 2 - .../beam/sdk/transforms/GroupByKeyIT.java | 192 ------------------ .../beam/sdk/transforms/GroupByKeyTest.java | 106 ---------- .../org/apache/beam/sdk/util/SecretTest.java | 67 ------ 15 files changed, 11 insertions(+), 595 deletions(-) delete mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java delete mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java diff --git a/.github/trigger_files/beam_PostCommit_Java.json b/.github/trigger_files/beam_PostCommit_Java.json index 8784d0786c02..920c8d132e4a 100644 --- a/.github/trigger_files/beam_PostCommit_Java.json +++ b/.github/trigger_files/beam_PostCommit_Java.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 1 } \ No newline at end of file diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json index 42fb8f985ba1..bba1872a33e8 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json @@ -3,6 +3,6 @@ "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2, + "modification": 1, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json index 3717f48ee492..78b2bdb93e2b 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json @@ -1,4 +1,8 @@ { - "modification": 4, + "https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling", + "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", + "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", + "comment": "Modify this file in a trivial way to cause this test suite to run", + "modification": 3, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java index 10030aa892a2..89135641689e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java @@ -25,13 +25,10 @@ import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.GroupByEncryptedKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.util.Secret; import org.apache.beam.sdk.util.construction.PTransformTranslation; import org.apache.beam.sdk.util.construction.SdkComponents; import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; @@ -39,7 +36,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.WindowingStrategy; -import org.checkerframework.checker.nullness.qual.Nullable; /** * Specialized implementation of {@code GroupByKey} for translating Redistribute transform into @@ -50,13 +46,9 @@ public class DataflowGroupByKey // Plumbed from Redistribute transform. private final boolean allowDuplicates; - private boolean insideGBEK; - private boolean surroundsGBEK; private DataflowGroupByKey(boolean allowDuplicates) { this.allowDuplicates = allowDuplicates; - this.insideGBEK = false; - this.surroundsGBEK = false; } /** @@ -87,22 +79,6 @@ public boolean allowDuplicates() { return allowDuplicates; } - /** - * For Beam internal use only. Tells runner that this is an inner GBK inside of a - * GroupByEncryptedKey - */ - public void setInsideGBEK() { - this.insideGBEK = true; - } - - /** - * For Beam internal use only. Tells runner that this is a GBK wrapped around of a - * GroupByEncryptedKey - */ - public boolean surroundsGBEK() { - return this.surroundsGBEK; - } - ///////////////////////////////////////////////////////////////////////////// public static void applicableTo(PCollection input) { @@ -141,20 +117,6 @@ public PCollection>> expand(PCollection> input) { "the keyCoder of a DataflowGroupByKey must be deterministic", e); } - PipelineOptions options = input.getPipeline().getOptions(); - String gbekOveride = options.getGbek(); - if (!this.insideGBEK && gbekOveride != null && !gbekOveride.trim().isEmpty()) { - this.surroundsGBEK = true; - Secret hmacSecret = Secret.parseSecretOption(gbekOveride); - DataflowGroupByKey> gbk = DataflowGroupByKey.create(); - if (this.allowDuplicates) { - gbk = DataflowGroupByKey.createWithAllowDuplicates(); - } - gbk.setInsideGBEK(); - GroupByEncryptedKey gbek = GroupByEncryptedKey.createWithCustomGbk(hmacSecret, gbk); - return input.apply(gbek); - } - // This primitive operation groups by the combination of key and window, // merging windows as needed, using the windows assigned to the // key/value input elements and the window merge operation of the @@ -209,22 +171,10 @@ public String getUrn() { return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; } - @Override - public String getUrn(DataflowGroupByKey transform) { - if (transform.surroundsGBEK()) { - return PTransformTranslation.GROUP_BY_KEY_WRAPPER_TRANSFORM_URN; - } - return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; - } - @Override @SuppressWarnings("nullness") - public RunnerApi.@Nullable FunctionSpec translate( + public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { - if (transform.getTransform().surroundsGBEK()) { - // Can use null for spec for empty composite. - return null; - } return RunnerApi.FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build(); } } diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index 52e8467b1624..c103ab7f5b1d 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -58,8 +58,6 @@ - - diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 62022b219c2a..2eba8c6ef68d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -37,7 +37,6 @@ import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; -import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; @@ -414,40 +413,6 @@ public Long create(PipelineOptions options) { void setUserAgent(String userAgent); - /** - * A string defining whether GroupByKey transforms should be replaced by GroupByEncryptedKey - * - *

Beam will infer the secret type and value based on the secret itself. This guarantees that - * any data at rest during the performing a GBK, so this can be used to guarantee that data is not - * unencrypted. Runners with this behavior include the Dataflow, Flink, and Spark runners. The - * option should be structured like: - * - *


-   * --gbek=type:;:
-   * 
- * - * for example: - * - *

-   * --gbek=type:GcpSecret;version_name:my_secret/versions/latest"
-   * 
- * - * All variables should use snake case to allow consistency across languages. - */ - @Description( - "When set, will replace all GroupByKey transforms in the pipeline the option. Beam will" - + " infer the secret type and value based on the secret itself. This guarantees that" - + " any data at rest during the performing a GBK, so this can be used to guarantee" - + " that data is not unencrypted. Runners with this behavior include the Dataflow," - + " Flink, and Spark runners. The option should be structured like:" - + " --gbek=type:;:, for example " - + " --gbek=type:GcpSecret;version_name:my_secret/versions/latest. All variables " - + " should use snake case to allow consistency across languages.") - @Nullable - String getGbek(); - - void setGbek(String gbek); - /** * Returns a user agent string constructed from {@link ReleaseInfo#getName()} and {@link * ReleaseInfo#getVersion()}, in the format {@code [name]/[version]}. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index 6ed0a31b3b95..e927efad44af 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -53,19 +53,9 @@ public class GroupByEncryptedKey extends PTransform>, PCollection>>> { private final Secret hmacKey; - private final PTransform< - PCollection>>, - PCollection>>>> - gbk; - private GroupByEncryptedKey( - Secret hmacKey, - PTransform< - PCollection>>, - PCollection>>>> - gbk) { + private GroupByEncryptedKey(Secret hmacKey) { this.hmacKey = hmacKey; - this.gbk = gbk; } /** @@ -77,25 +67,7 @@ private GroupByEncryptedKey( * @return A {@link GroupByEncryptedKey} transform. */ public static GroupByEncryptedKey create(Secret hmacKey) { - return new GroupByEncryptedKey<>(hmacKey, GroupByKey.create()); - } - - /** - * Creates a {@link GroupByEncryptedKey} transform with a custom GBK in the middle. - * - * @param hmacKey The {@link Secret} key to use for encryption. - * @param gbk The custom GBK transform to use in the middle of the GBEK. - * @param The type of the keys in the input PCollection. - * @param The type of the values in the input PCollection. - * @return A {@link GroupByEncryptedKey} transform. - */ - public static GroupByEncryptedKey createWithCustomGbk( - Secret hmacKey, - PTransform< - PCollection>>, - PCollection>>>> - gbk) { - return new GroupByEncryptedKey<>(hmacKey, gbk); + return new GroupByEncryptedKey<>(hmacKey); } @Override @@ -121,7 +93,7 @@ public PCollection>> expand(PCollection> input) { .apply( "EncryptMessage", ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, valueCoder))) - .apply(this.gbk); + .apply(GroupByKey.create()); return grouped .apply("DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, keyCoder, valueCoder))) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index 95ff73f55e74..d0b320a87654 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.Secret; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -116,13 +115,9 @@ public class GroupByKey extends PTransform>, PCollection>>> { private final boolean fewKeys; - private boolean insideGBEK; - private boolean surroundsGBEK; private GroupByKey(boolean fewKeys) { this.fewKeys = fewKeys; - this.insideGBEK = false; - surroundsGBEK = false; } /** @@ -153,21 +148,6 @@ public boolean fewKeys() { return fewKeys; } - /** - * For Beam internal use only. Tells runner that this is an inner GBK inside a GroupByEncryptedKey - */ - public void setInsideGBEK() { - this.insideGBEK = true; - } - - /** - * For Beam internal use only. Tells runner that this is a GBK wrapped around of a - * GroupByEncryptedKey - */ - public boolean surroundsGBEK() { - return this.surroundsGBEK; - } - ///////////////////////////////////////////////////////////////////////////// public static void applicableTo(PCollection input) { @@ -264,20 +244,6 @@ public PCollection>> expand(PCollection> input) { throw new IllegalStateException("the keyCoder of a GroupByKey must be deterministic", e); } - PipelineOptions options = input.getPipeline().getOptions(); - String gbekOveride = options.getGbek(); - if (!this.insideGBEK && gbekOveride != null && !gbekOveride.trim().isEmpty()) { - this.surroundsGBEK = true; - Secret hmacSecret = Secret.parseSecretOption(gbekOveride); - GroupByKey> gbk = GroupByKey.create(); - if (this.fewKeys) { - gbk = GroupByKey.createWithFewKeys(); - } - gbk.setInsideGBEK(); - GroupByEncryptedKey gbek = GroupByEncryptedKey.createWithCustomGbk(hmacSecret, gbk); - return input.apply(gbek); - } - // This primitive operation groups by the combination of key and window, // merging windows as needed, using the windows assigned to the // key/value input elements and the window merge operation of the diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java index 8effae7f61cf..80bc3a54535e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java @@ -55,13 +55,4 @@ public byte[] getSecretBytes() { throw new RuntimeException("Failed to retrieve secret bytes", e); } } - - /** - * Returns the version name of the secret. - * - * @return The version name as a String. - */ - public String getVersionName() { - return versionName; - } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java index a75e01c9543f..fe476ef6cb1d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -18,11 +18,6 @@ package org.apache.beam.sdk.util; import java.io.Serializable; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; /** * A secret management interface used for handling sensitive data. @@ -38,48 +33,4 @@ public interface Secret extends Serializable { * @return The secret as a byte array. */ byte[] getSecretBytes(); - - static Secret parseSecretOption(String secretOption) { - Map paramMap = new HashMap<>(); - for (String param : secretOption.split(";", -1)) { - String[] parts = param.split(":", 2); - if (parts.length == 2) { - paramMap.put(parts[0], parts[1]); - } - } - - if (!paramMap.containsKey("type")) { - throw new RuntimeException("Secret string must contain a valid type parameter"); - } - - String secretType = paramMap.get("type"); - paramMap.remove("type"); - - if (secretType == null) { - throw new RuntimeException("Secret string must contain a valid value for type parameter"); - } - - switch (secretType.toLowerCase()) { - case "gcpsecret": - Set gcpSecretParams = new HashSet<>(Arrays.asList("version_name")); - for (String paramName : paramMap.keySet()) { - if (!gcpSecretParams.contains(paramName)) { - throw new RuntimeException( - String.format( - "Invalid secret parameter %s, GcpSecret only supports the following parameters: %s", - paramName, gcpSecretParams)); - } - } - String versionName = paramMap.get("version_name"); - if (versionName == null) { - throw new RuntimeException( - "version_name must contain a valid value for versionName parameter"); - } - return new GcpSecret(versionName); - default: - throw new RuntimeException( - String.format( - "Invalid secret type %s, currently only GcpSecret is supported", secretType)); - } - } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java index 569c3cbe2989..d08a48d0e5e6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java @@ -25,7 +25,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; -import org.checkerframework.checker.nullness.qual.Nullable; /** * Utility methods for translating a {@link GroupByKey} to and from {@link RunnerApi} @@ -45,21 +44,8 @@ public String getUrn() { } @Override - public String getUrn(GroupByKey transform) { - if (transform.surroundsGBEK()) { - return PTransformTranslation.GROUP_BY_KEY_WRAPPER_TRANSFORM_URN; - } - return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; - } - - @Override - @Nullable public FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { - if (transform.getTransform().surroundsGBEK()) { - // Can use null for spec for empty composite. - return null; - } return FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build(); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java index 3e38aad1ad4b..e4f00c706254 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java @@ -90,8 +90,6 @@ public class PTransformTranslation { public static final String PAR_DO_TRANSFORM_URN = "beam:transform:pardo:v1"; public static final String FLATTEN_TRANSFORM_URN = "beam:transform:flatten:v1"; public static final String GROUP_BY_KEY_TRANSFORM_URN = "beam:transform:group_by_key:v1"; - public static final String GROUP_BY_KEY_WRAPPER_TRANSFORM_URN = - "beam:transform:group_by_key_wrapper:v1"; public static final String IMPULSE_TRANSFORM_URN = "beam:transform:impulse:v1"; public static final String ASSIGN_WINDOWS_TRANSFORM_URN = "beam:transform:window_into:v1"; public static final String TEST_STREAM_TRANSFORM_URN = "beam:transform:teststream:v1"; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java deleted file mode 100644 index 60477a4c242f..000000000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import com.google.cloud.secretmanager.v1.ProjectName; -import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; -import com.google.cloud.secretmanager.v1.SecretName; -import com.google.cloud.secretmanager.v1.SecretPayload; -import com.google.protobuf.ByteString; -import java.io.IOException; -import java.security.SecureRandom; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Integration test for GroupByKey transforms and some other transforms which use GBK. */ -@RunWith(JUnit4.class) -public class GroupByKeyIT { - @Rule public ExpectedException thrown = ExpectedException.none(); - - private static final String PROJECT_ID = "apache-beam-testing"; - private static final String SECRET_ID = "gbek-test"; - private static String gcpSecretVersionName; - private static String secretId; - - @BeforeClass - public static void setup() throws IOException { - secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000)); - SecretManagerServiceClient client; - try { - client = SecretManagerServiceClient.create(); - } catch (IOException e) { - gcpSecretVersionName = null; - return; - } - ProjectName projectName = ProjectName.of(PROJECT_ID); - SecretName secretName = SecretName.of(PROJECT_ID, secretId); - - try { - client.getSecret(secretName); - } catch (Exception e) { - com.google.cloud.secretmanager.v1.Secret secret = - com.google.cloud.secretmanager.v1.Secret.newBuilder() - .setReplication( - com.google.cloud.secretmanager.v1.Replication.newBuilder() - .setAutomatic( - com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() - .build()) - .build()) - .build(); - client.createSecret(projectName, secretId, secret); - byte[] secretBytes = new byte[32]; - new SecureRandom().nextBytes(secretBytes); - client.addSecretVersion( - secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); - } - gcpSecretVersionName = secretName.toString() + "/versions/latest"; - } - - @AfterClass - public static void tearDown() throws IOException { - if (gcpSecretVersionName != null) { - SecretManagerServiceClient client = SecretManagerServiceClient.create(); - SecretName secretName = SecretName.of(PROJECT_ID, secretId); - client.deleteSecret(secretName); - } - } - - @Test - public void testGroupByKeyWithValidGcpSecretOption() throws Exception { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); - Pipeline p = Pipeline.create(options); - List> ungroupedPairs = - Arrays.asList( - KV.of("k1", 3), - KV.of("k5", Integer.MAX_VALUE), - KV.of("k5", Integer.MIN_VALUE), - KV.of("k2", 66), - KV.of("k1", 4), - KV.of("k2", -33), - KV.of("k3", 0)); - - PCollection> input = - p.apply( - Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); - - PCollection>> output = input.apply(GroupByKey.create()); - - PAssert.that(output) - .containsInAnyOrder( - KV.of("k1", Arrays.asList(3, 4)), - KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), - KV.of("k2", Arrays.asList(66, -33)), - KV.of("k3", Arrays.asList(0))); - - p.run(); - } - - @Test - public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); - Pipeline p = Pipeline.create(options); - p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); - thrown.expect(RuntimeException.class); - p.run(); - } - - // Redistribute depends on GBK under the hood and can have runner-specific implementations - @Test - public void redistributeWithValidGcpSecretOption() throws Exception { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); - Pipeline p = Pipeline.create(options); - - List> ungroupedPairs = - Arrays.asList( - KV.of("k1", 3), - KV.of("k5", Integer.MAX_VALUE), - KV.of("k5", Integer.MIN_VALUE), - KV.of("k2", 66), - KV.of("k1", 4), - KV.of("k2", -33), - KV.of("k3", 0)); - PCollection> input = - p.apply( - Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); - PCollection> output = input.apply(Redistribute.byKey()); - PAssert.that(output).containsInAnyOrder(ungroupedPairs); - - p.run(); - } - - @Test - public void testRedistributeWithInvalidGcpSecretOption() throws Exception { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); - Pipeline p = Pipeline.create(options); - p.apply(Create.of(KV.of("k1", 1))).apply(Redistribute.byKey()); - thrown.expect(RuntimeException.class); - p.run(); - } -} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 326da99f1a81..5464838ad4db 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -26,18 +26,12 @@ import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertThrows; -import com.google.cloud.secretmanager.v1.ProjectName; -import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; -import com.google.cloud.secretmanager.v1.SecretName; -import com.google.cloud.secretmanager.v1.SecretPayload; -import com.google.protobuf.ByteString; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; -import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -96,9 +90,7 @@ import org.hamcrest.Matcher; import org.joda.time.Duration; import org.joda.time.Instant; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -117,55 +109,6 @@ public class GroupByKeyTest implements Serializable { /** Shared test base class with setup/teardown helpers. */ public abstract static class SharedTestBase { @Rule public transient TestPipeline p = TestPipeline.create(); - - private static final String PROJECT_ID = "apache-beam-testing"; - private static final String SECRET_ID = "gbek-test"; - public static String gcpSecretVersionName; - private static String secretId; - - @BeforeClass - public static void setup() throws IOException { - secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000)); - SecretManagerServiceClient client; - try { - client = SecretManagerServiceClient.create(); - } catch (IOException e) { - gcpSecretVersionName = null; - return; - } - ProjectName projectName = ProjectName.of(PROJECT_ID); - SecretName secretName = SecretName.of(PROJECT_ID, secretId); - - try { - client.getSecret(secretName); - } catch (Exception e) { - com.google.cloud.secretmanager.v1.Secret secret = - com.google.cloud.secretmanager.v1.Secret.newBuilder() - .setReplication( - com.google.cloud.secretmanager.v1.Replication.newBuilder() - .setAutomatic( - com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() - .build()) - .build()) - .build(); - client.createSecret(projectName, secretId, secret); - byte[] secretBytes = new byte[32]; - new SecureRandom().nextBytes(secretBytes); - client.addSecretVersion( - secretName, - SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); - } - gcpSecretVersionName = secretName.toString() + "/versions/latest"; - } - - @AfterClass - public static void tearDown() throws IOException { - if (gcpSecretVersionName != null) { - SecretManagerServiceClient client = SecretManagerServiceClient.create(); - SecretName secretName = SecretName.of(PROJECT_ID, secretId); - client.deleteSecret(secretName); - } - } } /** Tests validating basic {@link GroupByKey} scenarios. */ @@ -671,55 +614,6 @@ public void testLargeKeys10MB() throws Exception { public void testLargeKeys100MB() throws Exception { runLargeKeysTest(p, 100 << 20); } - - @Test - @Category(NeedsRunner.class) - public void testGroupByKeyWithValidGcpSecretOption() { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - List> ungroupedPairs = - Arrays.asList( - KV.of("k1", 3), - KV.of("k5", Integer.MAX_VALUE), - KV.of("k5", Integer.MIN_VALUE), - KV.of("k2", 66), - KV.of("k1", 4), - KV.of("k2", -33), - KV.of("k3", 0)); - - PCollection> input = - p.apply( - Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); - - p.getOptions().setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); - PCollection>> output = input.apply(GroupByKey.create()); - - SerializableFunction>>, Void> checker = - containsKvs( - kv("k1", 3, 4), - kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE), - kv("k2", 66, -33), - kv("k3", 0)); - PAssert.that(output).satisfies(checker); - PAssert.that(output).inWindow(GlobalWindow.INSTANCE).satisfies(checker); - - p.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testGroupByKeyWithInvalidGcpSecretOption() { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - p.getOptions().setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); - p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); - assertThrows(RuntimeException.class, () -> p.run()); - } } /** Tests validating GroupByKey behaviors with windowing. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java deleted file mode 100644 index dd4b125d73fe..000000000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link org.apache.beam.sdk.util.Secret}. */ -@RunWith(JUnit4.class) -public class SecretTest { - - @Test - public void testParseSecretOptionWithValidGcpSecret() { - String secretOption = "type:gcpsecret;version_name:my_secret/versions/latest"; - Secret secret = Secret.parseSecretOption(secretOption); - assertTrue(secret instanceof GcpSecret); - assertEquals("my_secret/versions/latest", ((GcpSecret) secret).getVersionName()); - } - - @Test - public void testParseSecretOptionWithMissingType() { - String secretOption = "version_name:my_secret/versions/latest"; - Exception exception = - assertThrows(RuntimeException.class, () -> Secret.parseSecretOption(secretOption)); - assertEquals("Secret string must contain a valid type parameter", exception.getMessage()); - } - - @Test - public void testParseSecretOptionWithUnsupportedType() { - String secretOption = "type:unsupported;version_name:my_secret/versions/latest"; - Exception exception = - assertThrows(RuntimeException.class, () -> Secret.parseSecretOption(secretOption)); - assertEquals( - "Invalid secret type unsupported, currently only GcpSecret is supported", - exception.getMessage()); - } - - @Test - public void testParseSecretOptionWithInvalidGcpSecretParameter() { - String secretOption = "type:gcpsecret;invalid_param:some_value"; - Exception exception = - assertThrows(RuntimeException.class, () -> Secret.parseSecretOption(secretOption)); - assertEquals( - "Invalid secret parameter invalid_param, GcpSecret only supports the following parameters: [version_name]", - exception.getMessage()); - } -} From d2fab46bb177cee9066d7b5e02ac1029250fe584 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 9 Oct 2025 10:50:44 -0400 Subject: [PATCH 04/15] Debug what caused spark job to fail --- .../beam_PostCommit_Java_PVR_Spark3_Streaming.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json index e0266d62f2e0..f1ba03a243ee 100644 --- a/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 4 + "modification": 5 } From 8a27d9c83983ffcefe87334250103eddc18875a0 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 9 Oct 2025 11:57:09 -0400 Subject: [PATCH 05/15] disable cache --- settings.gradle.kts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/settings.gradle.kts b/settings.gradle.kts index 72c5194ec93d..562d9aba804d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -56,11 +56,11 @@ buildCache { url = uri("https://beam-cache.apache.org/cache/") isAllowUntrustedServer = false credentials { - username = System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME") - password = System.getenv("GRADLE_ENTERPRISE_CACHE_PASSWORD") + username = System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME_DISABLED") + password = System.getenv("GRADLE_ENTERPRISE_CACHE_PASSWORD_DISABLED") } - isEnabled = !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank() - isPush = isCi && !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank() + isEnabled = !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME_DISABLED").isNullOrBlank() + isPush = isCi && !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME_DISABLED").isNullOrBlank() } } From c4b2f19dd917b4319d8a9cd4af6c38baa727b0fe Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 9 Oct 2025 14:21:45 -0400 Subject: [PATCH 06/15] Reapply "Add pipeline option to force GBEK (Java) (#36346)" This reverts commit 4c9d55b94e39d5bd75b219b4e9df6ccf0e0160b9. --- .../trigger_files/beam_PostCommit_Java.json | 2 +- .../beam_PostCommit_Java_DataflowV1.json | 2 +- .../beam_PostCommit_Java_DataflowV2.json | 6 +- .../dataflow/internal/DataflowGroupByKey.java | 52 ++++- .../beam/checkstyle/suppressions.xml | 2 + .../beam/sdk/options/PipelineOptions.java | 35 ++++ .../sdk/transforms/GroupByEncryptedKey.java | 34 +++- .../beam/sdk/transforms/GroupByKey.java | 34 ++++ .../org/apache/beam/sdk/util/GcpSecret.java | 9 + .../java/org/apache/beam/sdk/util/Secret.java | 49 +++++ .../construction/GroupByKeyTranslation.java | 14 ++ .../construction/PTransformTranslation.java | 2 + .../beam/sdk/transforms/GroupByKeyIT.java | 192 ++++++++++++++++++ .../beam/sdk/transforms/GroupByKeyTest.java | 106 ++++++++++ .../org/apache/beam/sdk/util/SecretTest.java | 67 ++++++ 15 files changed, 595 insertions(+), 11 deletions(-) create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java diff --git a/.github/trigger_files/beam_PostCommit_Java.json b/.github/trigger_files/beam_PostCommit_Java.json index 920c8d132e4a..8784d0786c02 100644 --- a/.github/trigger_files/beam_PostCommit_Java.json +++ b/.github/trigger_files/beam_PostCommit_Java.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 2 } \ No newline at end of file diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json index bba1872a33e8..42fb8f985ba1 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json @@ -3,6 +3,6 @@ "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1, + "modification": 2, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json index 78b2bdb93e2b..3717f48ee492 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json @@ -1,8 +1,4 @@ { - "https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling", - "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", - "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", - "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 3, + "modification": 4, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java index 89135641689e..10030aa892a2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java @@ -25,10 +25,13 @@ import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.GroupByEncryptedKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.util.Secret; import org.apache.beam.sdk.util.construction.PTransformTranslation; import org.apache.beam.sdk.util.construction.SdkComponents; import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; @@ -36,6 +39,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.WindowingStrategy; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Specialized implementation of {@code GroupByKey} for translating Redistribute transform into @@ -46,9 +50,13 @@ public class DataflowGroupByKey // Plumbed from Redistribute transform. private final boolean allowDuplicates; + private boolean insideGBEK; + private boolean surroundsGBEK; private DataflowGroupByKey(boolean allowDuplicates) { this.allowDuplicates = allowDuplicates; + this.insideGBEK = false; + this.surroundsGBEK = false; } /** @@ -79,6 +87,22 @@ public boolean allowDuplicates() { return allowDuplicates; } + /** + * For Beam internal use only. Tells runner that this is an inner GBK inside of a + * GroupByEncryptedKey + */ + public void setInsideGBEK() { + this.insideGBEK = true; + } + + /** + * For Beam internal use only. Tells runner that this is a GBK wrapped around of a + * GroupByEncryptedKey + */ + public boolean surroundsGBEK() { + return this.surroundsGBEK; + } + ///////////////////////////////////////////////////////////////////////////// public static void applicableTo(PCollection input) { @@ -117,6 +141,20 @@ public PCollection>> expand(PCollection> input) { "the keyCoder of a DataflowGroupByKey must be deterministic", e); } + PipelineOptions options = input.getPipeline().getOptions(); + String gbekOveride = options.getGbek(); + if (!this.insideGBEK && gbekOveride != null && !gbekOveride.trim().isEmpty()) { + this.surroundsGBEK = true; + Secret hmacSecret = Secret.parseSecretOption(gbekOveride); + DataflowGroupByKey> gbk = DataflowGroupByKey.create(); + if (this.allowDuplicates) { + gbk = DataflowGroupByKey.createWithAllowDuplicates(); + } + gbk.setInsideGBEK(); + GroupByEncryptedKey gbek = GroupByEncryptedKey.createWithCustomGbk(hmacSecret, gbk); + return input.apply(gbek); + } + // This primitive operation groups by the combination of key and window, // merging windows as needed, using the windows assigned to the // key/value input elements and the window merge operation of the @@ -171,10 +209,22 @@ public String getUrn() { return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; } + @Override + public String getUrn(DataflowGroupByKey transform) { + if (transform.surroundsGBEK()) { + return PTransformTranslation.GROUP_BY_KEY_WRAPPER_TRANSFORM_URN; + } + return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; + } + @Override @SuppressWarnings("nullness") - public RunnerApi.FunctionSpec translate( + public RunnerApi.@Nullable FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (transform.getTransform().surroundsGBEK()) { + // Can use null for spec for empty composite. + return null; + } return RunnerApi.FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build(); } } diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index c103ab7f5b1d..52e8467b1624 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -58,6 +58,8 @@ + + diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 2eba8c6ef68d..62022b219c2a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; @@ -413,6 +414,40 @@ public Long create(PipelineOptions options) { void setUserAgent(String userAgent); + /** + * A string defining whether GroupByKey transforms should be replaced by GroupByEncryptedKey + * + *

Beam will infer the secret type and value based on the secret itself. This guarantees that + * any data at rest during the performing a GBK, so this can be used to guarantee that data is not + * unencrypted. Runners with this behavior include the Dataflow, Flink, and Spark runners. The + * option should be structured like: + * + *


+   * --gbek=type:;:
+   * 
+ * + * for example: + * + *

+   * --gbek=type:GcpSecret;version_name:my_secret/versions/latest"
+   * 
+ * + * All variables should use snake case to allow consistency across languages. + */ + @Description( + "When set, will replace all GroupByKey transforms in the pipeline the option. Beam will" + + " infer the secret type and value based on the secret itself. This guarantees that" + + " any data at rest during the performing a GBK, so this can be used to guarantee" + + " that data is not unencrypted. Runners with this behavior include the Dataflow," + + " Flink, and Spark runners. The option should be structured like:" + + " --gbek=type:;:, for example " + + " --gbek=type:GcpSecret;version_name:my_secret/versions/latest. All variables " + + " should use snake case to allow consistency across languages.") + @Nullable + String getGbek(); + + void setGbek(String gbek); + /** * Returns a user agent string constructed from {@link ReleaseInfo#getName()} and {@link * ReleaseInfo#getVersion()}, in the format {@code [name]/[version]}. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index e927efad44af..6ed0a31b3b95 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -53,9 +53,19 @@ public class GroupByEncryptedKey extends PTransform>, PCollection>>> { private final Secret hmacKey; + private final PTransform< + PCollection>>, + PCollection>>>> + gbk; - private GroupByEncryptedKey(Secret hmacKey) { + private GroupByEncryptedKey( + Secret hmacKey, + PTransform< + PCollection>>, + PCollection>>>> + gbk) { this.hmacKey = hmacKey; + this.gbk = gbk; } /** @@ -67,7 +77,25 @@ private GroupByEncryptedKey(Secret hmacKey) { * @return A {@link GroupByEncryptedKey} transform. */ public static GroupByEncryptedKey create(Secret hmacKey) { - return new GroupByEncryptedKey<>(hmacKey); + return new GroupByEncryptedKey<>(hmacKey, GroupByKey.create()); + } + + /** + * Creates a {@link GroupByEncryptedKey} transform with a custom GBK in the middle. + * + * @param hmacKey The {@link Secret} key to use for encryption. + * @param gbk The custom GBK transform to use in the middle of the GBEK. + * @param The type of the keys in the input PCollection. + * @param The type of the values in the input PCollection. + * @return A {@link GroupByEncryptedKey} transform. + */ + public static GroupByEncryptedKey createWithCustomGbk( + Secret hmacKey, + PTransform< + PCollection>>, + PCollection>>>> + gbk) { + return new GroupByEncryptedKey<>(hmacKey, gbk); } @Override @@ -93,7 +121,7 @@ public PCollection>> expand(PCollection> input) { .apply( "EncryptMessage", ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, valueCoder))) - .apply(GroupByKey.create()); + .apply(this.gbk); return grouped .apply("DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, keyCoder, valueCoder))) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index d0b320a87654..95ff73f55e74 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.Secret; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -115,9 +116,13 @@ public class GroupByKey extends PTransform>, PCollection>>> { private final boolean fewKeys; + private boolean insideGBEK; + private boolean surroundsGBEK; private GroupByKey(boolean fewKeys) { this.fewKeys = fewKeys; + this.insideGBEK = false; + surroundsGBEK = false; } /** @@ -148,6 +153,21 @@ public boolean fewKeys() { return fewKeys; } + /** + * For Beam internal use only. Tells runner that this is an inner GBK inside a GroupByEncryptedKey + */ + public void setInsideGBEK() { + this.insideGBEK = true; + } + + /** + * For Beam internal use only. Tells runner that this is a GBK wrapped around of a + * GroupByEncryptedKey + */ + public boolean surroundsGBEK() { + return this.surroundsGBEK; + } + ///////////////////////////////////////////////////////////////////////////// public static void applicableTo(PCollection input) { @@ -244,6 +264,20 @@ public PCollection>> expand(PCollection> input) { throw new IllegalStateException("the keyCoder of a GroupByKey must be deterministic", e); } + PipelineOptions options = input.getPipeline().getOptions(); + String gbekOveride = options.getGbek(); + if (!this.insideGBEK && gbekOveride != null && !gbekOveride.trim().isEmpty()) { + this.surroundsGBEK = true; + Secret hmacSecret = Secret.parseSecretOption(gbekOveride); + GroupByKey> gbk = GroupByKey.create(); + if (this.fewKeys) { + gbk = GroupByKey.createWithFewKeys(); + } + gbk.setInsideGBEK(); + GroupByEncryptedKey gbek = GroupByEncryptedKey.createWithCustomGbk(hmacSecret, gbk); + return input.apply(gbek); + } + // This primitive operation groups by the combination of key and window, // merging windows as needed, using the windows assigned to the // key/value input elements and the window merge operation of the diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java index 80bc3a54535e..8effae7f61cf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java @@ -55,4 +55,13 @@ public byte[] getSecretBytes() { throw new RuntimeException("Failed to retrieve secret bytes", e); } } + + /** + * Returns the version name of the secret. + * + * @return The version name as a String. + */ + public String getVersionName() { + return versionName; + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java index fe476ef6cb1d..a75e01c9543f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -18,6 +18,11 @@ package org.apache.beam.sdk.util; import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; /** * A secret management interface used for handling sensitive data. @@ -33,4 +38,48 @@ public interface Secret extends Serializable { * @return The secret as a byte array. */ byte[] getSecretBytes(); + + static Secret parseSecretOption(String secretOption) { + Map paramMap = new HashMap<>(); + for (String param : secretOption.split(";", -1)) { + String[] parts = param.split(":", 2); + if (parts.length == 2) { + paramMap.put(parts[0], parts[1]); + } + } + + if (!paramMap.containsKey("type")) { + throw new RuntimeException("Secret string must contain a valid type parameter"); + } + + String secretType = paramMap.get("type"); + paramMap.remove("type"); + + if (secretType == null) { + throw new RuntimeException("Secret string must contain a valid value for type parameter"); + } + + switch (secretType.toLowerCase()) { + case "gcpsecret": + Set gcpSecretParams = new HashSet<>(Arrays.asList("version_name")); + for (String paramName : paramMap.keySet()) { + if (!gcpSecretParams.contains(paramName)) { + throw new RuntimeException( + String.format( + "Invalid secret parameter %s, GcpSecret only supports the following parameters: %s", + paramName, gcpSecretParams)); + } + } + String versionName = paramMap.get("version_name"); + if (versionName == null) { + throw new RuntimeException( + "version_name must contain a valid value for versionName parameter"); + } + return new GcpSecret(versionName); + default: + throw new RuntimeException( + String.format( + "Invalid secret type %s, currently only GcpSecret is supported", secretType)); + } + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java index d08a48d0e5e6..569c3cbe2989 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Utility methods for translating a {@link GroupByKey} to and from {@link RunnerApi} @@ -44,8 +45,21 @@ public String getUrn() { } @Override + public String getUrn(GroupByKey transform) { + if (transform.surroundsGBEK()) { + return PTransformTranslation.GROUP_BY_KEY_WRAPPER_TRANSFORM_URN; + } + return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; + } + + @Override + @Nullable public FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (transform.getTransform().surroundsGBEK()) { + // Can use null for spec for empty composite. + return null; + } return FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build(); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java index e4f00c706254..3e38aad1ad4b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java @@ -90,6 +90,8 @@ public class PTransformTranslation { public static final String PAR_DO_TRANSFORM_URN = "beam:transform:pardo:v1"; public static final String FLATTEN_TRANSFORM_URN = "beam:transform:flatten:v1"; public static final String GROUP_BY_KEY_TRANSFORM_URN = "beam:transform:group_by_key:v1"; + public static final String GROUP_BY_KEY_WRAPPER_TRANSFORM_URN = + "beam:transform:group_by_key_wrapper:v1"; public static final String IMPULSE_TRANSFORM_URN = "beam:transform:impulse:v1"; public static final String ASSIGN_WINDOWS_TRANSFORM_URN = "beam:transform:window_into:v1"; public static final String TEST_STREAM_TRANSFORM_URN = "beam:transform:teststream:v1"; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java new file mode 100644 index 000000000000..60477a4c242f --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.security.SecureRandom; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration test for GroupByKey transforms and some other transforms which use GBK. */ +@RunWith(JUnit4.class) +public class GroupByKeyIT { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static final String PROJECT_ID = "apache-beam-testing"; + private static final String SECRET_ID = "gbek-test"; + private static String gcpSecretVersionName; + private static String secretId; + + @BeforeClass + public static void setup() throws IOException { + secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000)); + SecretManagerServiceClient client; + try { + client = SecretManagerServiceClient.create(); + } catch (IOException e) { + gcpSecretVersionName = null; + return; + } + ProjectName projectName = ProjectName.of(PROJECT_ID); + SecretName secretName = SecretName.of(PROJECT_ID, secretId); + + try { + client.getSecret(secretName); + } catch (Exception e) { + com.google.cloud.secretmanager.v1.Secret secret = + com.google.cloud.secretmanager.v1.Secret.newBuilder() + .setReplication( + com.google.cloud.secretmanager.v1.Replication.newBuilder() + .setAutomatic( + com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() + .build()) + .build()) + .build(); + client.createSecret(projectName, secretId, secret); + byte[] secretBytes = new byte[32]; + new SecureRandom().nextBytes(secretBytes); + client.addSecretVersion( + secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + } + gcpSecretVersionName = secretName.toString() + "/versions/latest"; + } + + @AfterClass + public static void tearDown() throws IOException { + if (gcpSecretVersionName != null) { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretName secretName = SecretName.of(PROJECT_ID, secretId); + client.deleteSecret(secretName); + } + } + + @Test + public void testGroupByKeyWithValidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + Pipeline p = Pipeline.create(options); + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + PCollection>> output = input.apply(GroupByKey.create()); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), + KV.of("k2", Arrays.asList(66, -33)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + + @Test + public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); + Pipeline p = Pipeline.create(options); + p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); + thrown.expect(RuntimeException.class); + p.run(); + } + + // Redistribute depends on GBK under the hood and can have runner-specific implementations + @Test + public void redistributeWithValidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + Pipeline p = Pipeline.create(options); + + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + PCollection> output = input.apply(Redistribute.byKey()); + PAssert.that(output).containsInAnyOrder(ungroupedPairs); + + p.run(); + } + + @Test + public void testRedistributeWithInvalidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); + Pipeline p = Pipeline.create(options); + p.apply(Create.of(KV.of("k1", 1))).apply(Redistribute.byKey()); + thrown.expect(RuntimeException.class); + p.run(); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 5464838ad4db..326da99f1a81 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -26,12 +26,18 @@ import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertThrows; +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.protobuf.ByteString; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; +import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -90,7 +96,9 @@ import org.hamcrest.Matcher; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.AfterClass; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -109,6 +117,55 @@ public class GroupByKeyTest implements Serializable { /** Shared test base class with setup/teardown helpers. */ public abstract static class SharedTestBase { @Rule public transient TestPipeline p = TestPipeline.create(); + + private static final String PROJECT_ID = "apache-beam-testing"; + private static final String SECRET_ID = "gbek-test"; + public static String gcpSecretVersionName; + private static String secretId; + + @BeforeClass + public static void setup() throws IOException { + secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000)); + SecretManagerServiceClient client; + try { + client = SecretManagerServiceClient.create(); + } catch (IOException e) { + gcpSecretVersionName = null; + return; + } + ProjectName projectName = ProjectName.of(PROJECT_ID); + SecretName secretName = SecretName.of(PROJECT_ID, secretId); + + try { + client.getSecret(secretName); + } catch (Exception e) { + com.google.cloud.secretmanager.v1.Secret secret = + com.google.cloud.secretmanager.v1.Secret.newBuilder() + .setReplication( + com.google.cloud.secretmanager.v1.Replication.newBuilder() + .setAutomatic( + com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() + .build()) + .build()) + .build(); + client.createSecret(projectName, secretId, secret); + byte[] secretBytes = new byte[32]; + new SecureRandom().nextBytes(secretBytes); + client.addSecretVersion( + secretName, + SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + } + gcpSecretVersionName = secretName.toString() + "/versions/latest"; + } + + @AfterClass + public static void tearDown() throws IOException { + if (gcpSecretVersionName != null) { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretName secretName = SecretName.of(PROJECT_ID, secretId); + client.deleteSecret(secretName); + } + } } /** Tests validating basic {@link GroupByKey} scenarios. */ @@ -614,6 +671,55 @@ public void testLargeKeys10MB() throws Exception { public void testLargeKeys100MB() throws Exception { runLargeKeysTest(p, 100 << 20); } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyWithValidGcpSecretOption() { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); + + p.getOptions().setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + PCollection>> output = input.apply(GroupByKey.create()); + + SerializableFunction>>, Void> checker = + containsKvs( + kv("k1", 3, 4), + kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE), + kv("k2", 66, -33), + kv("k3", 0)); + PAssert.that(output).satisfies(checker); + PAssert.that(output).inWindow(GlobalWindow.INSTANCE).satisfies(checker); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyWithInvalidGcpSecretOption() { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + p.getOptions().setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); + p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); + assertThrows(RuntimeException.class, () -> p.run()); + } } /** Tests validating GroupByKey behaviors with windowing. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java new file mode 100644 index 000000000000..dd4b125d73fe --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link org.apache.beam.sdk.util.Secret}. */ +@RunWith(JUnit4.class) +public class SecretTest { + + @Test + public void testParseSecretOptionWithValidGcpSecret() { + String secretOption = "type:gcpsecret;version_name:my_secret/versions/latest"; + Secret secret = Secret.parseSecretOption(secretOption); + assertTrue(secret instanceof GcpSecret); + assertEquals("my_secret/versions/latest", ((GcpSecret) secret).getVersionName()); + } + + @Test + public void testParseSecretOptionWithMissingType() { + String secretOption = "version_name:my_secret/versions/latest"; + Exception exception = + assertThrows(RuntimeException.class, () -> Secret.parseSecretOption(secretOption)); + assertEquals("Secret string must contain a valid type parameter", exception.getMessage()); + } + + @Test + public void testParseSecretOptionWithUnsupportedType() { + String secretOption = "type:unsupported;version_name:my_secret/versions/latest"; + Exception exception = + assertThrows(RuntimeException.class, () -> Secret.parseSecretOption(secretOption)); + assertEquals( + "Invalid secret type unsupported, currently only GcpSecret is supported", + exception.getMessage()); + } + + @Test + public void testParseSecretOptionWithInvalidGcpSecretParameter() { + String secretOption = "type:gcpsecret;invalid_param:some_value"; + Exception exception = + assertThrows(RuntimeException.class, () -> Secret.parseSecretOption(secretOption)); + assertEquals( + "Invalid secret parameter invalid_param, GcpSecret only supports the following parameters: [version_name]", + exception.getMessage()); + } +} From 867f9d60944d9a28c0bccea98d4ec14e3affbd5b Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 9 Oct 2025 16:25:48 -0400 Subject: [PATCH 07/15] Shorten dev loop by focusing on failing test --- .../spark/job-server/spark_job_server.gradle | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/runners/spark/job-server/spark_job_server.gradle b/runners/spark/job-server/spark_job_server.gradle index 90109598ed64..3fc3df11c702 100644 --- a/runners/spark/job-server/spark_job_server.gradle +++ b/runners/spark/job-server/spark_job_server.gradle @@ -169,6 +169,38 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean docker, excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeAfterFixedWindowsAndGroupByKey' excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeAfterSessionsAndGroupByKey' excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeAfterSlidingWindowsAndGroupByKey' + excludeTestsMatching 'org.apache.beam.runners.*' + excludeTestsMatching 'org.apache.beam.sdk.PipelineTest.*' + excludeTestsMatching 'org.apache.beam.sdk.io.*' + excludeTestsMatching 'org.apache.beam.sdk.metrics.*' + excludeTestsMatching 'org.apache.beam.sdk.testing.*' + excludeTestsMatching 'org.apache.beam.sdk.util.*' + excludeTestsMatching 'org.apache.beam.sdk.values.*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.join.*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineFnsTest.*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.CreateTest.*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ImpulseTest.*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.KeysTest.*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.KvSwapTest.*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.MapElementsTest.*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoSchemaTest.*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ReifyTimestampsTest.*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.WithTimestampsTest.*' + excludeTestsMatching 'org.apache.beam.sdk.transforms.*.testGroupByKey' + excludeTestsMatching 'org.apache.beam.sdk.transforms.*.testGroupByKeyEmpty' + excludeTestsMatching 'org.apache.beam.sdk.transforms.*.testGroupByKeyWithBadEqualsHashCode' + excludeTestsMatching 'org.apache.beam.sdk.transforms.*.testLargeKeys100KB' + excludeTestsMatching 'org.apache.beam.sdk.transforms.*.testLargeKeys10KB' + excludeTestsMatching 'org.apache.beam.sdk.transforms.*.testLargeKeys10MB' + excludeTestsMatching 'org.apache.beam.sdk.transforms.*.testTimestampCombinerEarliest' + excludeTestsMatching 'org.apache.beam.sdk.transforms.*.testTimestampCombinerLatest' } } else { From 17ecaa7b026895a928fbe57d507295d3953e408f Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 9 Oct 2025 16:50:56 -0400 Subject: [PATCH 08/15] Revert "Shorten dev loop by focusing on failing test" This reverts commit 867f9d60944d9a28c0bccea98d4ec14e3affbd5b. --- .../spark/job-server/spark_job_server.gradle | 32 ------------------- 1 file changed, 32 deletions(-) diff --git a/runners/spark/job-server/spark_job_server.gradle b/runners/spark/job-server/spark_job_server.gradle index 3fc3df11c702..90109598ed64 100644 --- a/runners/spark/job-server/spark_job_server.gradle +++ b/runners/spark/job-server/spark_job_server.gradle @@ -169,38 +169,6 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean docker, excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeAfterFixedWindowsAndGroupByKey' excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeAfterSessionsAndGroupByKey' excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeAfterSlidingWindowsAndGroupByKey' - excludeTestsMatching 'org.apache.beam.runners.*' - excludeTestsMatching 'org.apache.beam.sdk.PipelineTest.*' - excludeTestsMatching 'org.apache.beam.sdk.io.*' - excludeTestsMatching 'org.apache.beam.sdk.metrics.*' - excludeTestsMatching 'org.apache.beam.sdk.testing.*' - excludeTestsMatching 'org.apache.beam.sdk.util.*' - excludeTestsMatching 'org.apache.beam.sdk.values.*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.join.*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineFnsTest.*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.CreateTest.*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.ImpulseTest.*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.KeysTest.*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.KvSwapTest.*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.MapElementsTest.*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoSchemaTest.*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.ReifyTimestampsTest.*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.WithTimestampsTest.*' - excludeTestsMatching 'org.apache.beam.sdk.transforms.*.testGroupByKey' - excludeTestsMatching 'org.apache.beam.sdk.transforms.*.testGroupByKeyEmpty' - excludeTestsMatching 'org.apache.beam.sdk.transforms.*.testGroupByKeyWithBadEqualsHashCode' - excludeTestsMatching 'org.apache.beam.sdk.transforms.*.testLargeKeys100KB' - excludeTestsMatching 'org.apache.beam.sdk.transforms.*.testLargeKeys10KB' - excludeTestsMatching 'org.apache.beam.sdk.transforms.*.testLargeKeys10MB' - excludeTestsMatching 'org.apache.beam.sdk.transforms.*.testTimestampCombinerEarliest' - excludeTestsMatching 'org.apache.beam.sdk.transforms.*.testTimestampCombinerLatest' } } else { From e0a9a934ef07d637438729c0e428202ed0bbd529 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 9 Oct 2025 16:53:21 -0400 Subject: [PATCH 09/15] remove test changes --- .../beam/sdk/transforms/GroupByKeyIT.java | 192 ------------------ .../beam/sdk/transforms/GroupByKeyTest.java | 107 ---------- 2 files changed, 299 deletions(-) delete mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java deleted file mode 100644 index 60477a4c242f..000000000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import com.google.cloud.secretmanager.v1.ProjectName; -import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; -import com.google.cloud.secretmanager.v1.SecretName; -import com.google.cloud.secretmanager.v1.SecretPayload; -import com.google.protobuf.ByteString; -import java.io.IOException; -import java.security.SecureRandom; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Integration test for GroupByKey transforms and some other transforms which use GBK. */ -@RunWith(JUnit4.class) -public class GroupByKeyIT { - @Rule public ExpectedException thrown = ExpectedException.none(); - - private static final String PROJECT_ID = "apache-beam-testing"; - private static final String SECRET_ID = "gbek-test"; - private static String gcpSecretVersionName; - private static String secretId; - - @BeforeClass - public static void setup() throws IOException { - secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000)); - SecretManagerServiceClient client; - try { - client = SecretManagerServiceClient.create(); - } catch (IOException e) { - gcpSecretVersionName = null; - return; - } - ProjectName projectName = ProjectName.of(PROJECT_ID); - SecretName secretName = SecretName.of(PROJECT_ID, secretId); - - try { - client.getSecret(secretName); - } catch (Exception e) { - com.google.cloud.secretmanager.v1.Secret secret = - com.google.cloud.secretmanager.v1.Secret.newBuilder() - .setReplication( - com.google.cloud.secretmanager.v1.Replication.newBuilder() - .setAutomatic( - com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() - .build()) - .build()) - .build(); - client.createSecret(projectName, secretId, secret); - byte[] secretBytes = new byte[32]; - new SecureRandom().nextBytes(secretBytes); - client.addSecretVersion( - secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); - } - gcpSecretVersionName = secretName.toString() + "/versions/latest"; - } - - @AfterClass - public static void tearDown() throws IOException { - if (gcpSecretVersionName != null) { - SecretManagerServiceClient client = SecretManagerServiceClient.create(); - SecretName secretName = SecretName.of(PROJECT_ID, secretId); - client.deleteSecret(secretName); - } - } - - @Test - public void testGroupByKeyWithValidGcpSecretOption() throws Exception { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); - Pipeline p = Pipeline.create(options); - List> ungroupedPairs = - Arrays.asList( - KV.of("k1", 3), - KV.of("k5", Integer.MAX_VALUE), - KV.of("k5", Integer.MIN_VALUE), - KV.of("k2", 66), - KV.of("k1", 4), - KV.of("k2", -33), - KV.of("k3", 0)); - - PCollection> input = - p.apply( - Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); - - PCollection>> output = input.apply(GroupByKey.create()); - - PAssert.that(output) - .containsInAnyOrder( - KV.of("k1", Arrays.asList(3, 4)), - KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), - KV.of("k2", Arrays.asList(66, -33)), - KV.of("k3", Arrays.asList(0))); - - p.run(); - } - - @Test - public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); - Pipeline p = Pipeline.create(options); - p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); - thrown.expect(RuntimeException.class); - p.run(); - } - - // Redistribute depends on GBK under the hood and can have runner-specific implementations - @Test - public void redistributeWithValidGcpSecretOption() throws Exception { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); - Pipeline p = Pipeline.create(options); - - List> ungroupedPairs = - Arrays.asList( - KV.of("k1", 3), - KV.of("k5", Integer.MAX_VALUE), - KV.of("k5", Integer.MIN_VALUE), - KV.of("k2", 66), - KV.of("k1", 4), - KV.of("k2", -33), - KV.of("k3", 0)); - PCollection> input = - p.apply( - Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); - PCollection> output = input.apply(Redistribute.byKey()); - PAssert.that(output).containsInAnyOrder(ungroupedPairs); - - p.run(); - } - - @Test - public void testRedistributeWithInvalidGcpSecretOption() throws Exception { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); - Pipeline p = Pipeline.create(options); - p.apply(Create.of(KV.of("k1", 1))).apply(Redistribute.byKey()); - thrown.expect(RuntimeException.class); - p.run(); - } -} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 326da99f1a81..d420debe6455 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -26,18 +26,12 @@ import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertThrows; -import com.google.cloud.secretmanager.v1.ProjectName; -import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; -import com.google.cloud.secretmanager.v1.SecretName; -import com.google.cloud.secretmanager.v1.SecretPayload; -import com.google.protobuf.ByteString; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; -import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -96,9 +90,7 @@ import org.hamcrest.Matcher; import org.joda.time.Duration; import org.joda.time.Instant; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -117,55 +109,6 @@ public class GroupByKeyTest implements Serializable { /** Shared test base class with setup/teardown helpers. */ public abstract static class SharedTestBase { @Rule public transient TestPipeline p = TestPipeline.create(); - - private static final String PROJECT_ID = "apache-beam-testing"; - private static final String SECRET_ID = "gbek-test"; - public static String gcpSecretVersionName; - private static String secretId; - - @BeforeClass - public static void setup() throws IOException { - secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000)); - SecretManagerServiceClient client; - try { - client = SecretManagerServiceClient.create(); - } catch (IOException e) { - gcpSecretVersionName = null; - return; - } - ProjectName projectName = ProjectName.of(PROJECT_ID); - SecretName secretName = SecretName.of(PROJECT_ID, secretId); - - try { - client.getSecret(secretName); - } catch (Exception e) { - com.google.cloud.secretmanager.v1.Secret secret = - com.google.cloud.secretmanager.v1.Secret.newBuilder() - .setReplication( - com.google.cloud.secretmanager.v1.Replication.newBuilder() - .setAutomatic( - com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() - .build()) - .build()) - .build(); - client.createSecret(projectName, secretId, secret); - byte[] secretBytes = new byte[32]; - new SecureRandom().nextBytes(secretBytes); - client.addSecretVersion( - secretName, - SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); - } - gcpSecretVersionName = secretName.toString() + "/versions/latest"; - } - - @AfterClass - public static void tearDown() throws IOException { - if (gcpSecretVersionName != null) { - SecretManagerServiceClient client = SecretManagerServiceClient.create(); - SecretName secretName = SecretName.of(PROJECT_ID, secretId); - client.deleteSecret(secretName); - } - } } /** Tests validating basic {@link GroupByKey} scenarios. */ @@ -672,56 +615,6 @@ public void testLargeKeys100MB() throws Exception { runLargeKeysTest(p, 100 << 20); } - @Test - @Category(NeedsRunner.class) - public void testGroupByKeyWithValidGcpSecretOption() { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - List> ungroupedPairs = - Arrays.asList( - KV.of("k1", 3), - KV.of("k5", Integer.MAX_VALUE), - KV.of("k5", Integer.MIN_VALUE), - KV.of("k2", 66), - KV.of("k1", 4), - KV.of("k2", -33), - KV.of("k3", 0)); - - PCollection> input = - p.apply( - Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); - - p.getOptions().setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); - PCollection>> output = input.apply(GroupByKey.create()); - - SerializableFunction>>, Void> checker = - containsKvs( - kv("k1", 3, 4), - kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE), - kv("k2", 66, -33), - kv("k3", 0)); - PAssert.that(output).satisfies(checker); - PAssert.that(output).inWindow(GlobalWindow.INSTANCE).satisfies(checker); - - p.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testGroupByKeyWithInvalidGcpSecretOption() { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - p.getOptions().setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); - p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); - assertThrows(RuntimeException.class, () -> p.run()); - } - } - /** Tests validating GroupByKey behaviors with windowing. */ @RunWith(JUnit4.class) public static class WindowTests extends SharedTestBase { From 12d258bc70f8535d8d361868771a653b6b5f980f Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 9 Oct 2025 20:07:35 -0400 Subject: [PATCH 10/15] Add back brace --- .../test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index d420debe6455..5464838ad4db 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -614,6 +614,7 @@ public void testLargeKeys10MB() throws Exception { public void testLargeKeys100MB() throws Exception { runLargeKeysTest(p, 100 << 20); } + } /** Tests validating GroupByKey behaviors with windowing. */ @RunWith(JUnit4.class) From f970eb29caebb9b66f0f804a51c49270753628b6 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 10 Oct 2025 09:13:49 -0400 Subject: [PATCH 11/15] Revert "Add back brace" This reverts commit 12d258bc70f8535d8d361868771a653b6b5f980f. --- .../test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 5464838ad4db..d420debe6455 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -614,7 +614,6 @@ public void testLargeKeys10MB() throws Exception { public void testLargeKeys100MB() throws Exception { runLargeKeysTest(p, 100 << 20); } - } /** Tests validating GroupByKey behaviors with windowing. */ @RunWith(JUnit4.class) From 18dffbdb38abe40f8758c5386655b7d2797ecf00 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 10 Oct 2025 09:13:54 -0400 Subject: [PATCH 12/15] Revert "remove test changes" This reverts commit e0a9a934ef07d637438729c0e428202ed0bbd529. --- .../beam/sdk/transforms/GroupByKeyIT.java | 192 ++++++++++++++++++ .../beam/sdk/transforms/GroupByKeyTest.java | 107 ++++++++++ 2 files changed, 299 insertions(+) create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java new file mode 100644 index 000000000000..60477a4c242f --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.security.SecureRandom; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration test for GroupByKey transforms and some other transforms which use GBK. */ +@RunWith(JUnit4.class) +public class GroupByKeyIT { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static final String PROJECT_ID = "apache-beam-testing"; + private static final String SECRET_ID = "gbek-test"; + private static String gcpSecretVersionName; + private static String secretId; + + @BeforeClass + public static void setup() throws IOException { + secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000)); + SecretManagerServiceClient client; + try { + client = SecretManagerServiceClient.create(); + } catch (IOException e) { + gcpSecretVersionName = null; + return; + } + ProjectName projectName = ProjectName.of(PROJECT_ID); + SecretName secretName = SecretName.of(PROJECT_ID, secretId); + + try { + client.getSecret(secretName); + } catch (Exception e) { + com.google.cloud.secretmanager.v1.Secret secret = + com.google.cloud.secretmanager.v1.Secret.newBuilder() + .setReplication( + com.google.cloud.secretmanager.v1.Replication.newBuilder() + .setAutomatic( + com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() + .build()) + .build()) + .build(); + client.createSecret(projectName, secretId, secret); + byte[] secretBytes = new byte[32]; + new SecureRandom().nextBytes(secretBytes); + client.addSecretVersion( + secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + } + gcpSecretVersionName = secretName.toString() + "/versions/latest"; + } + + @AfterClass + public static void tearDown() throws IOException { + if (gcpSecretVersionName != null) { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretName secretName = SecretName.of(PROJECT_ID, secretId); + client.deleteSecret(secretName); + } + } + + @Test + public void testGroupByKeyWithValidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + Pipeline p = Pipeline.create(options); + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + PCollection>> output = input.apply(GroupByKey.create()); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), + KV.of("k2", Arrays.asList(66, -33)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + + @Test + public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); + Pipeline p = Pipeline.create(options); + p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); + thrown.expect(RuntimeException.class); + p.run(); + } + + // Redistribute depends on GBK under the hood and can have runner-specific implementations + @Test + public void redistributeWithValidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + Pipeline p = Pipeline.create(options); + + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + PCollection> output = input.apply(Redistribute.byKey()); + PAssert.that(output).containsInAnyOrder(ungroupedPairs); + + p.run(); + } + + @Test + public void testRedistributeWithInvalidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); + Pipeline p = Pipeline.create(options); + p.apply(Create.of(KV.of("k1", 1))).apply(Redistribute.byKey()); + thrown.expect(RuntimeException.class); + p.run(); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index d420debe6455..326da99f1a81 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -26,12 +26,18 @@ import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertThrows; +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.protobuf.ByteString; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; +import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -90,7 +96,9 @@ import org.hamcrest.Matcher; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.AfterClass; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -109,6 +117,55 @@ public class GroupByKeyTest implements Serializable { /** Shared test base class with setup/teardown helpers. */ public abstract static class SharedTestBase { @Rule public transient TestPipeline p = TestPipeline.create(); + + private static final String PROJECT_ID = "apache-beam-testing"; + private static final String SECRET_ID = "gbek-test"; + public static String gcpSecretVersionName; + private static String secretId; + + @BeforeClass + public static void setup() throws IOException { + secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000)); + SecretManagerServiceClient client; + try { + client = SecretManagerServiceClient.create(); + } catch (IOException e) { + gcpSecretVersionName = null; + return; + } + ProjectName projectName = ProjectName.of(PROJECT_ID); + SecretName secretName = SecretName.of(PROJECT_ID, secretId); + + try { + client.getSecret(secretName); + } catch (Exception e) { + com.google.cloud.secretmanager.v1.Secret secret = + com.google.cloud.secretmanager.v1.Secret.newBuilder() + .setReplication( + com.google.cloud.secretmanager.v1.Replication.newBuilder() + .setAutomatic( + com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() + .build()) + .build()) + .build(); + client.createSecret(projectName, secretId, secret); + byte[] secretBytes = new byte[32]; + new SecureRandom().nextBytes(secretBytes); + client.addSecretVersion( + secretName, + SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + } + gcpSecretVersionName = secretName.toString() + "/versions/latest"; + } + + @AfterClass + public static void tearDown() throws IOException { + if (gcpSecretVersionName != null) { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretName secretName = SecretName.of(PROJECT_ID, secretId); + client.deleteSecret(secretName); + } + } } /** Tests validating basic {@link GroupByKey} scenarios. */ @@ -615,6 +672,56 @@ public void testLargeKeys100MB() throws Exception { runLargeKeysTest(p, 100 << 20); } + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyWithValidGcpSecretOption() { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); + + p.getOptions().setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + PCollection>> output = input.apply(GroupByKey.create()); + + SerializableFunction>>, Void> checker = + containsKvs( + kv("k1", 3, 4), + kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE), + kv("k2", 66, -33), + kv("k3", 0)); + PAssert.that(output).satisfies(checker); + PAssert.that(output).inWindow(GlobalWindow.INSTANCE).satisfies(checker); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyWithInvalidGcpSecretOption() { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + p.getOptions().setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); + p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); + assertThrows(RuntimeException.class, () -> p.run()); + } + } + /** Tests validating GroupByKey behaviors with windowing. */ @RunWith(JUnit4.class) public static class WindowTests extends SharedTestBase { From 143032ad053797090d8092e011758a4bc3a8eb59 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 10 Oct 2025 09:14:44 -0400 Subject: [PATCH 13/15] See if just removing the tests is enough, or if setup is causing the problems --- .../beam/sdk/transforms/GroupByKeyTest.java | 98 +++++++++---------- 1 file changed, 49 insertions(+), 49 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 326da99f1a81..c7c4d9595311 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -672,55 +672,55 @@ public void testLargeKeys100MB() throws Exception { runLargeKeysTest(p, 100 << 20); } - @Test - @Category(NeedsRunner.class) - public void testGroupByKeyWithValidGcpSecretOption() { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - List> ungroupedPairs = - Arrays.asList( - KV.of("k1", 3), - KV.of("k5", Integer.MAX_VALUE), - KV.of("k5", Integer.MIN_VALUE), - KV.of("k2", 66), - KV.of("k1", 4), - KV.of("k2", -33), - KV.of("k3", 0)); - - PCollection> input = - p.apply( - Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); - - p.getOptions().setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); - PCollection>> output = input.apply(GroupByKey.create()); - - SerializableFunction>>, Void> checker = - containsKvs( - kv("k1", 3, 4), - kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE), - kv("k2", 66, -33), - kv("k3", 0)); - PAssert.that(output).satisfies(checker); - PAssert.that(output).inWindow(GlobalWindow.INSTANCE).satisfies(checker); - - p.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testGroupByKeyWithInvalidGcpSecretOption() { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - p.getOptions().setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); - p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); - assertThrows(RuntimeException.class, () -> p.run()); - } - } + // @Test + // @Category(NeedsRunner.class) + // public void testGroupByKeyWithValidGcpSecretOption() { + // if (gcpSecretVersionName == null) { + // // Skip test if we couldn't set up secret manager + // return; + // } + // List> ungroupedPairs = + // Arrays.asList( + // KV.of("k1", 3), + // KV.of("k5", Integer.MAX_VALUE), + // KV.of("k5", Integer.MIN_VALUE), + // KV.of("k2", 66), + // KV.of("k1", 4), + // KV.of("k2", -33), + // KV.of("k3", 0)); + + // PCollection> input = + // p.apply( + // Create.of(ungroupedPairs) + // .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); + + // p.getOptions().setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + // PCollection>> output = input.apply(GroupByKey.create()); + + // SerializableFunction>>, Void> checker = + // containsKvs( + // kv("k1", 3, 4), + // kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE), + // kv("k2", 66, -33), + // kv("k3", 0)); + // PAssert.that(output).satisfies(checker); + // PAssert.that(output).inWindow(GlobalWindow.INSTANCE).satisfies(checker); + + // p.run(); + // } + + // @Test + // @Category(NeedsRunner.class) + // public void testGroupByKeyWithInvalidGcpSecretOption() { + // if (gcpSecretVersionName == null) { + // // Skip test if we couldn't set up secret manager + // return; + // } + // p.getOptions().setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); + // p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); + // assertThrows(RuntimeException.class, () -> p.run()); + // } + // } /** Tests validating GroupByKey behaviors with windowing. */ @RunWith(JUnit4.class) From 4394d9ff38dc643f7f9b19cc829b9e78a2dc4fd5 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 10 Oct 2025 09:55:14 -0400 Subject: [PATCH 14/15] Parentheses :( --- .../main/java/org/apache/beam/sdk/transforms/GroupByKey.java | 2 +- .../java/org/apache/beam/sdk/transforms/GroupByKeyTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index 95ff73f55e74..1970b42a658a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -122,7 +122,7 @@ public class GroupByKey private GroupByKey(boolean fewKeys) { this.fewKeys = fewKeys; this.insideGBEK = false; - surroundsGBEK = false; + this.surroundsGBEK = false; } /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index c7c4d9595311..01e0c9b22a81 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -720,7 +720,7 @@ public void testLargeKeys100MB() throws Exception { // p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); // assertThrows(RuntimeException.class, () -> p.run()); // } - // } + } /** Tests validating GroupByKey behaviors with windowing. */ @RunWith(JUnit4.class) From 0bb9b4c241b289a8ba84ddff3fb9f80ba55fa826 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 10 Oct 2025 12:17:53 -0400 Subject: [PATCH 15/15] Remove setup call --- .../beam/sdk/transforms/GroupByKeyTest.java | 84 +++++++++---------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 01e0c9b22a81..cdc405c94cce 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -123,49 +123,49 @@ public abstract static class SharedTestBase { public static String gcpSecretVersionName; private static String secretId; - @BeforeClass - public static void setup() throws IOException { - secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000)); - SecretManagerServiceClient client; - try { - client = SecretManagerServiceClient.create(); - } catch (IOException e) { - gcpSecretVersionName = null; - return; - } - ProjectName projectName = ProjectName.of(PROJECT_ID); - SecretName secretName = SecretName.of(PROJECT_ID, secretId); - - try { - client.getSecret(secretName); - } catch (Exception e) { - com.google.cloud.secretmanager.v1.Secret secret = - com.google.cloud.secretmanager.v1.Secret.newBuilder() - .setReplication( - com.google.cloud.secretmanager.v1.Replication.newBuilder() - .setAutomatic( - com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() - .build()) - .build()) - .build(); - client.createSecret(projectName, secretId, secret); - byte[] secretBytes = new byte[32]; - new SecureRandom().nextBytes(secretBytes); - client.addSecretVersion( - secretName, - SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); - } - gcpSecretVersionName = secretName.toString() + "/versions/latest"; - } + // @BeforeClass + // public static void setup() throws IOException { + // secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000)); + // SecretManagerServiceClient client; + // try { + // client = SecretManagerServiceClient.create(); + // } catch (IOException e) { + // gcpSecretVersionName = null; + // return; + // } + // ProjectName projectName = ProjectName.of(PROJECT_ID); + // SecretName secretName = SecretName.of(PROJECT_ID, secretId); + + // try { + // client.getSecret(secretName); + // } catch (Exception e) { + // com.google.cloud.secretmanager.v1.Secret secret = + // com.google.cloud.secretmanager.v1.Secret.newBuilder() + // .setReplication( + // com.google.cloud.secretmanager.v1.Replication.newBuilder() + // .setAutomatic( + // com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() + // .build()) + // .build()) + // .build(); + // client.createSecret(projectName, secretId, secret); + // byte[] secretBytes = new byte[32]; + // new SecureRandom().nextBytes(secretBytes); + // client.addSecretVersion( + // secretName, + // SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + // } + // gcpSecretVersionName = secretName.toString() + "/versions/latest"; + // } - @AfterClass - public static void tearDown() throws IOException { - if (gcpSecretVersionName != null) { - SecretManagerServiceClient client = SecretManagerServiceClient.create(); - SecretName secretName = SecretName.of(PROJECT_ID, secretId); - client.deleteSecret(secretName); - } - } + // @AfterClass + // public static void tearDown() throws IOException { + // if (gcpSecretVersionName != null) { + // SecretManagerServiceClient client = SecretManagerServiceClient.create(); + // SecretName secretName = SecretName.of(PROJECT_ID, secretId); + // client.deleteSecret(secretName); + // } + // } } /** Tests validating basic {@link GroupByKey} scenarios. */