Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import static java.util.Locale.ROOT;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -1826,4 +1828,36 @@ public void testPipelineOptionsFactoryUsesTccl() throws Exception {
PipelineOptionsFactory.resetCache();
}
}

@Test
public void testDefaultMethodIgnoresDefaultImplementation() {
OptionsWithDefaultMethod optsWithDefault =
PipelineOptionsFactory.as(OptionsWithDefaultMethod.class);
assertThat(optsWithDefault.getValue(), nullValue());

optsWithDefault.setValue(12.25);
assertThat(optsWithDefault.getValue(), equalTo(12.25));
}

private interface ExtendedOptionsWithDefault extends OptionsWithDefaultMethod {}

@Test
public void testDefaultMethodInExtendedClassIgnoresDefaultImplementation() {
OptionsWithDefaultMethod extendedOptsWithDefault =
PipelineOptionsFactory.as(ExtendedOptionsWithDefault.class);
assertThat(extendedOptsWithDefault.getValue(), nullValue());

extendedOptsWithDefault.setValue(Double.NEGATIVE_INFINITY);
assertThat(extendedOptsWithDefault.getValue(), equalTo(Double.NEGATIVE_INFINITY));
}

private interface OptionsWithDefaultMethod extends PipelineOptions {
default Number getValue() {
return 1024;
}

void setValue(Number value);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -72,23 +75,26 @@
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
* Tests for Combine transforms.
* Tests for {@link Combine} transforms.
*/
@RunWith(JUnit4.class)
public class CombineTest implements Serializable {
Expand Down Expand Up @@ -1235,4 +1241,128 @@ public void processElement(ProcessContext c) throws Exception {
}
}));
}

/**
* Class for use in testing use of Java 8 method references.
*/
private static class Summer implements Serializable {
public int sum(Iterable<Integer> integers) {
int sum = 0;
for (int i : integers) {
sum += i;
}
return sum;
}
}

/**
* Tests creation of a global {@link Combine} via Java 8 lambda.
*/
@Test
@Category(ValidatesRunner.class)
public void testCombineGloballyLambda() {

PCollection<Integer> output = pipeline
.apply(Create.of(1, 2, 3, 4))
.apply(Combine.globally(integers -> {
int sum = 0;
for (int i : integers) {
sum += i;
}
return sum;
}));

PAssert.that(output).containsInAnyOrder(10);
pipeline.run();
}

/**
* Tests creation of a global {@link Combine} via a Java 8 method reference.
*/
@Test
@Category(ValidatesRunner.class)
public void testCombineGloballyInstanceMethodReference() {

PCollection<Integer> output = pipeline
.apply(Create.of(1, 2, 3, 4))
.apply(Combine.globally(new Summer()::sum));

PAssert.that(output).containsInAnyOrder(10);
pipeline.run();
}

/**
* Tests creation of a per-key {@link Combine} via a Java 8 lambda.
*/
@Test
@Category(ValidatesRunner.class)
public void testCombinePerKeyLambda() {

PCollection<KV<String, Integer>> output = pipeline
.apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4)))
.apply(Combine.perKey(integers -> {
int sum = 0;
for (int i : integers) {
sum += i;
}
return sum;
}));

PAssert.that(output).containsInAnyOrder(
KV.of("a", 4),
KV.of("b", 2),
KV.of("c", 4));
pipeline.run();
}

/**
* Tests creation of a per-key {@link Combine} via a Java 8 method reference.
*/
@Test
@Category(ValidatesRunner.class)
public void testCombinePerKeyInstanceMethodReference() {

PCollection<KV<String, Integer>> output = pipeline
.apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4)))
.apply(Combine.perKey(new Summer()::sum));

PAssert.that(output).containsInAnyOrder(
KV.of("a", 4),
KV.of("b", 2),
KV.of("c", 4));
pipeline.run();
}

/**
* Tests that we can serialize {@link Combine.CombineFn CombineFns} constructed from a lambda.
* Lambdas can be problematic because the {@link Class} object is synthetic and cannot be
* deserialized.
*/
@Test
public void testLambdaSerialization() {
SerializableFunction<Iterable<Object>, Object> combiner = xs -> Iterables.getFirst(xs, 0);

boolean lambdaClassSerializationThrows;
try {
SerializableUtils.clone(combiner.getClass());
lambdaClassSerializationThrows = false;
} catch (IllegalArgumentException e) {
// Expected
lambdaClassSerializationThrows = true;
}
Assume.assumeTrue("Expected lambda class serialization to fail. "
+ "If it's fixed, we can remove special behavior in Combine.",
lambdaClassSerializationThrows);


Combine.Globally<?, ?> combine = Combine.globally(combiner);
SerializableUtils.clone(combine); // should not throw.
}

@Test
public void testLambdaDisplayData() {
Combine.Globally<?, ?> combine = Combine.globally(xs -> Iterables.getFirst(xs, 0));
DisplayData displayData = DisplayData.from(combine);
MatcherAssert.assertThat(displayData.items(), not(empty()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@
*/
package org.apache.beam.sdk.transforms;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
Expand All @@ -48,6 +56,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

Expand All @@ -57,6 +66,9 @@ public class DistinctTest {

@Rule public final TestPipeline p = TestPipeline.create();

@Rule public ExpectedException thrown = ExpectedException.none();


@Test
@Category(ValidatesRunner.class)
public void testDistinct() {
Expand Down Expand Up @@ -273,4 +285,53 @@ public void testTriggeredDistinctRepresentativeValuesEmpty() {
PAssert.that(distinctValues).containsInAnyOrder(KV.of(1, "k1"));
triggeredDistinctRepresentativePipeline.run();
}

@Test
public void withLambdaRepresentativeValuesFnNoTypeDescriptorShouldThrow() {

Multimap<Integer, String> predupedContents = HashMultimap.create();
predupedContents.put(3, "foo");
predupedContents.put(4, "foos");
predupedContents.put(6, "barbaz");
predupedContents.put(6, "bazbar");
PCollection<String> dupes =
p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo"));

thrown.expect(IllegalStateException.class);
thrown.expectMessage("Unable to return a default Coder for RemoveRepresentativeDupes");

// Thrown when applying a transform to the internal WithKeys that withRepresentativeValueFn is
// implemented with
dupes.apply("RemoveRepresentativeDupes", Distinct.withRepresentativeValueFn(String::length));
}

@Test
@Category(NeedsRunner.class)
public void withLambdaRepresentativeValuesFnAndTypeDescriptorShouldApplyFn() {

PCollection<String> dupes =
p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo"));
PCollection<String> deduped =
dupes.apply(
Distinct.withRepresentativeValueFn(String::length)
.withRepresentativeType(TypeDescriptor.of(Integer.class)));

PAssert.that(deduped).satisfies((Iterable<String> strs) -> {
Multimap<Integer, String> predupedContents = HashMultimap.create();
predupedContents.put(3, "foo");
predupedContents.put(4, "foos");
predupedContents.put(6, "barbaz");
predupedContents.put(6, "bazbar");

Set<Integer> seenLengths = new HashSet<>();
for (String s : strs) {
assertThat(predupedContents.values(), hasItem(s));
assertThat(seenLengths, not(contains(s.length())));
seenLengths.add(s.length());
}
return null;
});

p.run();
}
}
Loading