Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 4
"modification": 2
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java_DataflowV1.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 4,
"modification": 2,
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java_DataflowV2.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"modification": 6,
"modification": 4,
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 4
"modification": 5
}
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public Long create(PipelineOptions options) {
* <p>Beam will infer the secret type and value based on the secret itself. This guarantees that
* any data at rest during the performing a GBK, so this can be used to guarantee that data is not
* unencrypted. Runners with this behavior include the Dataflow, Flink, and Spark runners. The
* secret should be a url safe base64 encoded 32 byte value. The option should be structured like:
* option should be structured like:
*
* <pre><code>
* --gbek=type:<secret_type>;<secret_param>:<value>
Expand All @@ -432,19 +432,14 @@ public Long create(PipelineOptions options) {
* --gbek=type:GcpSecret;version_name:my_secret/versions/latest"
* </code></pre>
*
* All variables should use snake case to allow consistency across languages. For an example of
* generating a properly formatted secret, see
* https://github.com/apache/beam/blob/c8df4da229da49d533491857e1bb4ab5dbf4fd37/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java#L82
* All variables should use snake case to allow consistency across languages.
*/
@Description(
"When set, will replace all GroupByKey transforms in the pipeline the option. Beam will"
+ " infer the secret type and value based on the secret itself. This guarantees that"
+ " any data at rest during the performing a GBK, so this can be used to guarantee"
+ " that data is not unencrypted. Runners with this behavior include the Dataflow,"
+ " Flink, and Spark runners. The secret should be a url safe base64 encoded 32 byte"
+ " value. For an example of generating a properly formatted secret, see"
+ " https://github.com/apache/beam/blob/c8df4da229da49d533491857e1bb4ab5dbf4fd37/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java#L82"
+ " When passing in the gbek option, it should be structured like:"
+ " Flink, and Spark runners. The option should be structured like:"
+ " --gbek=type:<secret_type>;<secret_param>:<value>, for example "
+ " --gbek=type:GcpSecret;version_name:my_secret/versions/latest. All variables "
+ " should use snake case to allow consistency across languages.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.Globally;
import org.apache.beam.sdk.transforms.Combine.PerKey;
import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
Expand Down Expand Up @@ -1503,7 +1499,6 @@ public static class PerKey<K, InputT, OutputT>
private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final boolean fewKeys;
private final List<PCollectionView<?>> sideInputs;
private boolean shouldSkipReplacement;

private PerKey(
GlobalCombineFn<? super InputT, ?, OutputT> fn,
Expand All @@ -1513,7 +1508,6 @@ private PerKey(
this.fnDisplayData = fnDisplayData;
this.fewKeys = fewKeys;
this.sideInputs = ImmutableList.of();
this.shouldSkipReplacement = false;
}

private PerKey(
Expand All @@ -1525,7 +1519,6 @@ private PerKey(
this.fnDisplayData = fnDisplayData;
this.fewKeys = fewKeys;
this.sideInputs = sideInputs;
this.shouldSkipReplacement = false;
}

@Override
Expand Down Expand Up @@ -1599,11 +1592,6 @@ public List<PCollectionView<?>> getSideInputs() {
return sideInputs;
}

/** Returns whether a runner should skip replacing this transform. For runner use only */
public boolean shouldSkipReplacement() {
return this.shouldSkipReplacement;
}

/**
* Returns the side inputs of this {@link Combine}, tagged with the tag of the {@link
* PCollectionView}. The values of the returned map will be equal to the result of {@link
Expand All @@ -1616,13 +1604,6 @@ public Map<TupleTag<?>, PValue> getAdditionalInputs() {

@Override
public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
PipelineOptions options = input.getPipeline().getOptions();
String gbekOveride = options.getGbek();
if (gbekOveride != null && !gbekOveride.trim().isEmpty()) {
// Don't replace this transform if we're using GBEK since the runner may insert
// its own GBK which doesn't perform encryption.
this.shouldSkipReplacement = true;
}
return input
.apply(fewKeys ? GroupByKey.createWithFewKeys() : GroupByKey.create())
.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
* the output. This is useful when the keys contain sensitive data that should not be stored at rest
* by the runner.
*
* <p>The transform requires a {@link Secret} which returns a base64 encoded 32 byte secret which
* can be used to generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm.
* <p>The transform requires a {@link Secret} which returns a 32 byte secret which can be used to
* generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm.
*
* <p>Note the following caveats: 1) Runners can implement arbitrary materialization steps, so this
* does not guarantee that the whole pipeline will not have unencrypted data at rest by itself. 2)
Expand Down Expand Up @@ -153,7 +153,7 @@ private static class EncryptMessage<K, V> extends DoFn<KV<K, V>, KV<byte[], KV<b
@Setup
public void setup() {
try {
byte[] secretBytes = java.util.Base64.getUrlDecoder().decode(this.hmacKey.getSecretBytes());
byte[] secretBytes = this.hmacKey.getSecretBytes();
this.mac = Mac.getInstance("HmacSHA256");
this.mac.init(new SecretKeySpec(secretBytes, "HmacSHA256"));
this.cipher = Cipher.getInstance("AES/GCM/NoPadding");
Expand Down Expand Up @@ -229,9 +229,7 @@ private static class DecryptMessage<K, V>
public void setup() {
try {
this.cipher = Cipher.getInstance("AES/GCM/NoPadding");
this.secretKeySpec =
new SecretKeySpec(
java.util.Base64.getUrlDecoder().decode(this.hmacKey.getSecretBytes()), "AES");
this.secretKeySpec = new SecretKeySpec(this.hmacKey.getSecretBytes(), "AES");
} catch (Exception ex) {
throw new RuntimeException(
"Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public class GroupByKey<K, V>
private GroupByKey(boolean fewKeys) {
this.fewKeys = fewKeys;
this.insideGBEK = false;
surroundsGBEK = false;
this.surroundsGBEK = false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,12 @@ public String getUrn() {
return PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN;
}

@Override
public String getUrn(Combine.PerKey<?, ?, ?> transform) {
if (transform.shouldSkipReplacement()) {
return "beam:transform:combine_per_key_wrapper:v1";
}
return PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN;
}

@Override
public FunctionSpec translate(
AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>> transform, SdkComponents components)
throws IOException {
Combine.PerKey underlyingCombine = transform.getTransform();
if (underlyingCombine.shouldSkipReplacement()) {
// Can use null for spec for generic composite.
return null;
}
if (underlyingCombine.getSideInputs().isEmpty()) {
GlobalCombineFn<?, ?, ?> combineFn = underlyingCombine.getFn();
if (transform.getTransform().getSideInputs().isEmpty()) {
GlobalCombineFn<?, ?, ?> combineFn = transform.getTransform().getFn();
Coder<?> accumulatorCoder =
extractAccumulatorCoder(combineFn, (AppliedPTransform) transform);
return FunctionSpec.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class GroupByEncryptedKeyTest implements Serializable {

private static class FakeSecret implements Secret {
private final byte[] secret =
"YUt3STJQbXFZRnQycDV0TktDeUJTNXFZV0hoSHNHWmM".getBytes(Charset.defaultCharset());
"aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZc".getBytes(Charset.defaultCharset());

@Override
public byte[] getSecretBytes() {
Expand Down Expand Up @@ -123,10 +123,7 @@ public static void setup() throws IOException {
byte[] secretBytes = new byte[32];
new SecureRandom().nextBytes(secretBytes);
client.addSecretVersion(
secretName,
SecretPayload.newBuilder()
.setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes)))
.build());
secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build());
}
gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ public static void setup() throws IOException {
byte[] secretBytes = new byte[32];
new SecureRandom().nextBytes(secretBytes);
client.addSecretVersion(
secretName,
SecretPayload.newBuilder()
.setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes)))
.build());
secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build());
}
gcpSecretVersionName = secretName.toString() + "/versions/latest";
}
Expand Down Expand Up @@ -151,7 +148,7 @@ public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception {

// Redistribute depends on GBK under the hood and can have runner-specific implementations
@Test
public void testRedistributeWithValidGcpSecretOption() throws Exception {
public void redistributeWithValidGcpSecretOption() throws Exception {
if (gcpSecretVersionName == null) {
// Skip test if we couldn't set up secret manager
return;
Expand Down Expand Up @@ -192,44 +189,4 @@ public void testRedistributeWithInvalidGcpSecretOption() throws Exception {
thrown.expect(RuntimeException.class);
p.run();
}

// Combine.PerKey depends on GBK under the hood, but can be overriden by a runner. This can
// fail unless it is handled specially, so we should test it specifically
@Test
public void testCombinePerKeyWithValidGcpSecretOption() throws Exception {
if (gcpSecretVersionName == null) {
// Skip test if we couldn't set up secret manager
return;
}
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName));
Pipeline p = Pipeline.create(options);

List<KV<String, Integer>> ungroupedPairs =
Arrays.asList(
KV.of("k1", 3), KV.of("k2", 66), KV.of("k1", 4), KV.of("k2", -33), KV.of("k3", 0));
List<KV<String, Integer>> sums = Arrays.asList(KV.of("k1", 7), KV.of("k2", 33), KV.of("k3", 0));
PCollection<KV<String, Integer>> input =
p.apply(
Create.of(ungroupedPairs)
.withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())));
PCollection<KV<String, Integer>> output = input.apply(Combine.perKey(Sum.ofIntegers()));
PAssert.that(output).containsInAnyOrder(sums);

p.run();
}

@Test
public void testCombinePerKeyWithInvalidGcpSecretOption() throws Exception {
if (gcpSecretVersionName == null) {
// Skip test if we couldn't set up secret manager
return;
}
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest");
Pipeline p = Pipeline.create(options);
p.apply(Create.of(KV.of("k1", 1))).apply(Combine.perKey(Sum.ofIntegers()));
thrown.expect(RuntimeException.class);
p.run();
}
}
Loading
Loading