From 03764d6e5a09001901265c3b04bc07cf5fa20706 Mon Sep 17 00:00:00 2001 From: Alexey Romanenko Date: Mon, 19 Feb 2018 18:06:21 +0100 Subject: [PATCH] [BEAM-3538] Remove (or merge) Java 8 specific tests module into the main one. --- .../options/PipelineOptionsFactoryTest.java | 34 ++++ .../beam/sdk/transforms/CombineTest.java | 132 +++++++++++++- .../beam/sdk/transforms/DistinctTest.java | 61 +++++++ .../beam/sdk/transforms/FilterTest.java | 75 ++++++++ .../sdk/transforms/FlatMapElementsTest.java | 43 +++++ .../beam/sdk/transforms/MapElementsTest.java | 63 +++++++ .../beam/sdk/transforms/PartitionTest.java | 31 ++++ .../sdk/transforms/SimpleFunctionTest.java | 33 ++++ .../beam/sdk/transforms/WithKeysTest.java | 34 ++++ .../sdk/transforms/WithTimestampsTest.java | 30 +++ sdks/java/java8tests/build.gradle | 31 ---- sdks/java/java8tests/pom.xml | 89 --------- .../PipelineOptionsFactoryJava8Test.java | 92 ---------- .../beam/sdk/transforms/CombineJava8Test.java | 171 ------------------ .../sdk/transforms/DistinctJava8Test.java | 97 ---------- .../beam/sdk/transforms/FilterJava8Test.java | 114 ------------ .../transforms/FlatMapElementsJava8Test.java | 84 --------- .../sdk/transforms/MapElementsJava8Test.java | 97 ---------- .../sdk/transforms/PartitionJava8Test.java | 74 -------- .../transforms/SimpleFunctionJava8Test.java | 68 ------- .../sdk/transforms/WithKeysJava8Test.java | 73 -------- .../transforms/WithTimestampsJava8Test.java | 71 -------- sdks/java/pom.xml | 1 - settings.gradle | 1 - 24 files changed, 535 insertions(+), 1064 deletions(-) delete mode 100644 sdks/java/java8tests/build.gradle delete mode 100644 sdks/java/java8tests/pom.xml delete mode 100644 sdks/java/java8tests/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryJava8Test.java delete mode 100644 sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java delete mode 100644 sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java delete mode 100644 sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java delete mode 100644 sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java delete mode 100644 sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java delete mode 100644 sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java delete mode 100644 sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionJava8Test.java delete mode 100644 sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java delete mode 100644 sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index 5c9c7865ad2c..e4c4102d6b64 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -19,8 +19,10 @@ import static java.util.Locale.ROOT; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -1826,4 +1828,36 @@ public void testPipelineOptionsFactoryUsesTccl() throws Exception { PipelineOptionsFactory.resetCache(); } } + + @Test + public void testDefaultMethodIgnoresDefaultImplementation() { + OptionsWithDefaultMethod optsWithDefault = + PipelineOptionsFactory.as(OptionsWithDefaultMethod.class); + assertThat(optsWithDefault.getValue(), nullValue()); + + optsWithDefault.setValue(12.25); + assertThat(optsWithDefault.getValue(), equalTo(12.25)); + } + + private interface ExtendedOptionsWithDefault extends OptionsWithDefaultMethod {} + + @Test + public void testDefaultMethodInExtendedClassIgnoresDefaultImplementation() { + OptionsWithDefaultMethod extendedOptsWithDefault = + PipelineOptionsFactory.as(ExtendedOptionsWithDefault.class); + assertThat(extendedOptsWithDefault.getValue(), nullValue()); + + extendedOptsWithDefault.setValue(Double.NEGATIVE_INFINITY); + assertThat(extendedOptsWithDefault.getValue(), equalTo(Double.NEGATIVE_INFINITY)); + } + + private interface OptionsWithDefaultMethod extends PipelineOptions { + default Number getValue() { + return 1024; + } + + void setValue(Number value); + } + + } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index 11e40d987a72..0200b0887987 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -23,14 +23,17 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -72,15 +75,18 @@ import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; +import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.TimestampedValue; +import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Assume; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -88,7 +94,7 @@ import org.junit.runners.JUnit4; /** - * Tests for Combine transforms. + * Tests for {@link Combine} transforms. */ @RunWith(JUnit4.class) public class CombineTest implements Serializable { @@ -1235,4 +1241,128 @@ public void processElement(ProcessContext c) throws Exception { } })); } + + /** + * Class for use in testing use of Java 8 method references. + */ + private static class Summer implements Serializable { + public int sum(Iterable integers) { + int sum = 0; + for (int i : integers) { + sum += i; + } + return sum; + } + } + + /** + * Tests creation of a global {@link Combine} via Java 8 lambda. + */ + @Test + @Category(ValidatesRunner.class) + public void testCombineGloballyLambda() { + + PCollection output = pipeline + .apply(Create.of(1, 2, 3, 4)) + .apply(Combine.globally(integers -> { + int sum = 0; + for (int i : integers) { + sum += i; + } + return sum; + })); + + PAssert.that(output).containsInAnyOrder(10); + pipeline.run(); + } + + /** + * Tests creation of a global {@link Combine} via a Java 8 method reference. + */ + @Test + @Category(ValidatesRunner.class) + public void testCombineGloballyInstanceMethodReference() { + + PCollection output = pipeline + .apply(Create.of(1, 2, 3, 4)) + .apply(Combine.globally(new Summer()::sum)); + + PAssert.that(output).containsInAnyOrder(10); + pipeline.run(); + } + + /** + * Tests creation of a per-key {@link Combine} via a Java 8 lambda. + */ + @Test + @Category(ValidatesRunner.class) + public void testCombinePerKeyLambda() { + + PCollection> output = pipeline + .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4))) + .apply(Combine.perKey(integers -> { + int sum = 0; + for (int i : integers) { + sum += i; + } + return sum; + })); + + PAssert.that(output).containsInAnyOrder( + KV.of("a", 4), + KV.of("b", 2), + KV.of("c", 4)); + pipeline.run(); + } + + /** + * Tests creation of a per-key {@link Combine} via a Java 8 method reference. + */ + @Test + @Category(ValidatesRunner.class) + public void testCombinePerKeyInstanceMethodReference() { + + PCollection> output = pipeline + .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4))) + .apply(Combine.perKey(new Summer()::sum)); + + PAssert.that(output).containsInAnyOrder( + KV.of("a", 4), + KV.of("b", 2), + KV.of("c", 4)); + pipeline.run(); + } + + /** + * Tests that we can serialize {@link Combine.CombineFn CombineFns} constructed from a lambda. + * Lambdas can be problematic because the {@link Class} object is synthetic and cannot be + * deserialized. + */ + @Test + public void testLambdaSerialization() { + SerializableFunction, Object> combiner = xs -> Iterables.getFirst(xs, 0); + + boolean lambdaClassSerializationThrows; + try { + SerializableUtils.clone(combiner.getClass()); + lambdaClassSerializationThrows = false; + } catch (IllegalArgumentException e) { + // Expected + lambdaClassSerializationThrows = true; + } + Assume.assumeTrue("Expected lambda class serialization to fail. " + + "If it's fixed, we can remove special behavior in Combine.", + lambdaClassSerializationThrows); + + + Combine.Globally combine = Combine.globally(combiner); + SerializableUtils.clone(combine); // should not throw. + } + + @Test + public void testLambdaDisplayData() { + Combine.Globally combine = Combine.globally(xs -> Iterables.getFirst(xs, 0)); + DisplayData displayData = DisplayData.from(combine); + MatcherAssert.assertThat(displayData.items(), not(empty())); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java index 802b937cd6b1..5835c1e50503 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java @@ -17,13 +17,21 @@ */ package org.apache.beam.sdk.transforms; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -48,6 +56,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -57,6 +66,9 @@ public class DistinctTest { @Rule public final TestPipeline p = TestPipeline.create(); + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test @Category(ValidatesRunner.class) public void testDistinct() { @@ -273,4 +285,53 @@ public void testTriggeredDistinctRepresentativeValuesEmpty() { PAssert.that(distinctValues).containsInAnyOrder(KV.of(1, "k1")); triggeredDistinctRepresentativePipeline.run(); } + + @Test + public void withLambdaRepresentativeValuesFnNoTypeDescriptorShouldThrow() { + + Multimap predupedContents = HashMultimap.create(); + predupedContents.put(3, "foo"); + predupedContents.put(4, "foos"); + predupedContents.put(6, "barbaz"); + predupedContents.put(6, "bazbar"); + PCollection dupes = + p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo")); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Unable to return a default Coder for RemoveRepresentativeDupes"); + + // Thrown when applying a transform to the internal WithKeys that withRepresentativeValueFn is + // implemented with + dupes.apply("RemoveRepresentativeDupes", Distinct.withRepresentativeValueFn(String::length)); + } + + @Test + @Category(NeedsRunner.class) + public void withLambdaRepresentativeValuesFnAndTypeDescriptorShouldApplyFn() { + + PCollection dupes = + p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo")); + PCollection deduped = + dupes.apply( + Distinct.withRepresentativeValueFn(String::length) + .withRepresentativeType(TypeDescriptor.of(Integer.class))); + + PAssert.that(deduped).satisfies((Iterable strs) -> { + Multimap predupedContents = HashMultimap.create(); + predupedContents.put(3, "foo"); + predupedContents.put(4, "foos"); + predupedContents.put(6, "barbaz"); + predupedContents.put(6, "bazbar"); + + Set seenLengths = new HashSet<>(); + for (String s : strs) { + assertThat(predupedContents.values(), hasItem(s)); + assertThat(seenLengths, not(contains(s.length()))); + seenLengths.add(s.length()); + } + return null; + }); + + p.run(); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java index a2c5ad532609..afe5b7cfeee1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import java.io.Serializable; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.ValidatesRunner; @@ -29,6 +30,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -61,6 +63,9 @@ public Boolean apply(Integer elem) { @Rule public final TestPipeline p = TestPipeline.create(); + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + @Test @Category(ValidatesRunner.class) public void testIdentityFilterByPredicate() { @@ -161,4 +166,74 @@ public void testDisplayData() { assertThat(DisplayData.from(Filter.equal(567)), hasDisplayItem("predicate", "x == 567")); } + + @Test + @Category(ValidatesRunner.class) + public void testIdentityFilterByPredicateWithLambda() { + + PCollection output = p + .apply(Create.of(591, 11789, 1257, 24578, 24799, 307)) + .apply(Filter.by(i -> true)); + + PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307); + p.run(); + } + + @Test + @Category(ValidatesRunner.class) + public void testNoFilterByPredicateWithLambda() { + + PCollection output = p + .apply(Create.of(1, 2, 4, 5)) + .apply(Filter.by(i -> false)); + + PAssert.that(output).empty(); + p.run(); + } + + @Test + @Category(ValidatesRunner.class) + public void testFilterByPredicateWithLambda() { + + PCollection output = p + .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) + .apply(Filter.by(i -> i % 2 == 0)); + + PAssert.that(output).containsInAnyOrder(2, 4, 6); + p.run(); + } + + /** + * Confirms that in Java 8 style, where a lambda results in a rawtype, the output type token is + * not useful. If this test ever fails there may be simplifications available to us. + */ + @Test + public void testFilterParDoOutputTypeDescriptorRawWithLambda() throws Exception { + + @SuppressWarnings({"unchecked", "rawtypes"}) + PCollection output = p + .apply(Create.of("hello")) + .apply(Filter.by(s -> true)); + + thrown.expect(CannotProvideCoderException.class); + p.getCoderRegistry().getCoder(output.getTypeDescriptor()); + } + + @Test + @Category(ValidatesRunner.class) + public void testFilterByMethodReferenceWithLambda() { + + PCollection output = p + .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) + .apply(Filter.by(new EvenFilter()::isEven)); + + PAssert.that(output).containsInAnyOrder(2, 4, 6); + p.run(); + } + + private static class EvenFilter implements Serializable { + public boolean isEven(int i) { + return i % 2 == 0; + } + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java index 18bd8413d94b..7c15c0965ab0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -224,4 +225,46 @@ public Iterable> apply(KV input) { })); } } + + /** + * Basic test of {@link FlatMapElements} with a lambda (which is instantiated as a + * {@link SerializableFunction}). + */ + @Test + @Category(NeedsRunner.class) + public void testFlatMapBasicWithLambda() throws Exception { + PCollection output = pipeline + .apply(Create.of(1, 2, 3)) + .apply(FlatMapElements + // Note that the input type annotation is required. + .into(TypeDescriptors.integers()) + .via((Integer i) -> ImmutableList.of(i, -i))); + + PAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2); + pipeline.run(); + } + + /** + * Basic test of {@link FlatMapElements} with a method reference. + */ + @Test + @Category(NeedsRunner.class) + public void testFlatMapMethodReference() throws Exception { + + PCollection output = pipeline + .apply(Create.of(1, 2, 3)) + .apply(FlatMapElements + // Note that the input type annotation is required. + .into(TypeDescriptors.integers()) + .via(new Negater()::numAndNegation)); + + PAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2); + pipeline.run(); + } + + private static class Negater implements Serializable { + public List numAndNegation(int input) { + return ImmutableList.of(input, -input); + } + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java index 39a65d15366e..96a4cc8b8580 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -296,4 +297,66 @@ public KV apply(KV input) { })); } } + + /** + * Basic test of {@link MapElements} with a lambda (which is instantiated as a {@link + * SerializableFunction}). + */ + @Test + @Category(NeedsRunner.class) + public void testMapLambda() throws Exception { + + PCollection output = pipeline + .apply(Create.of(1, 2, 3)) + .apply(MapElements + // Note that the type annotation is required. + .into(TypeDescriptors.integers()) + .via((Integer i) -> i * 2)); + + PAssert.that(output).containsInAnyOrder(6, 2, 4); + pipeline.run(); + } + + /** + * Basic test of {@link MapElements} with a lambda wrapped into a {@link SimpleFunction} to + * remember its type. + */ + @Test + @Category(NeedsRunner.class) + public void testMapWrappedLambda() throws Exception { + + PCollection output = + pipeline + .apply(Create.of(1, 2, 3)) + .apply( + MapElements + .via(new SimpleFunction((Integer i) -> i * 2) {})); + + PAssert.that(output).containsInAnyOrder(6, 2, 4); + pipeline.run(); + } + + /** + * Basic test of {@link MapElements} with a method reference. + */ + @Test + @Category(NeedsRunner.class) + public void testMapMethodReference() throws Exception { + + PCollection output = pipeline + .apply(Create.of(1, 2, 3)) + .apply(MapElements + // Note that the type annotation is required. + .into(TypeDescriptors.integers()) + .via(new Doubler()::doubleIt)); + + PAssert.that(output).containsInAnyOrder(6, 2, 4); + pipeline.run(); + } + + private static class Doubler implements Serializable { + public int doubleIt(int val) { + return val * 2; + } + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java index f43c162c7232..4977d0e8b2c8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java @@ -25,6 +25,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -148,4 +149,34 @@ public void testDisplayData() { assertThat(displayData, hasDisplayItem("numPartitions", 123)); assertThat(displayData, hasDisplayItem("partitionFn", IdentityFn.class)); } + + @Test + @Category(NeedsRunner.class) + public void testModPartitionWithLambda() { + + PCollectionList outputs = pipeline + .apply(Create.of(1, 2, 4, 5)) + .apply(Partition.of(3, (element, numPartitions) -> element % numPartitions)); + assertEquals(3, outputs.size()); + PAssert.that(outputs.get(0)).empty(); + PAssert.that(outputs.get(1)).containsInAnyOrder(1, 4); + PAssert.that(outputs.get(2)).containsInAnyOrder(2, 5); + pipeline.run(); + } + + /** + * Confirms that in Java 8 style, where a lambda results in a rawtype, the output type token is + * not useful. If this test ever fails there may be simplifications available to us. + */ + @Test + @Category(NeedsRunner.class) + public void testPartitionFnOutputTypeDescriptorRaw() throws Exception { + + PCollectionList output = pipeline + .apply(Create.of("hello")) + .apply(Partition.of(1, (element, numPartitions) -> 0)); + + thrown.expect(CannotProvideCoderException.class); + pipeline.getCoderRegistry().getCoder(output.get(0).getTypeDescriptor()); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionTest.java index bcfb5588396a..4fcaea303f1e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionTest.java @@ -17,6 +17,10 @@ */ package org.apache.beam.sdk.transforms; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.values.TypeDescriptors; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -40,4 +44,33 @@ public void testFailureIfNotOverridden() { SimpleFunction broken = new SimpleFunction() {}; } + + /** + * Basic test of {@link MapElements} with a lambda (which is instantiated as a {@link + * SerializableFunction}). + */ + @Test + public void testGoodTypeForLambda() throws Exception { + SimpleFunction fn = new SimpleFunction(Object::toString) {}; + + assertThat(fn.getInputTypeDescriptor(), equalTo(TypeDescriptors.integers())); + assertThat(fn.getOutputTypeDescriptor(), equalTo(TypeDescriptors.strings())); + } + + /** + * Basic test of {@link MapElements} with a lambda wrapped into a {@link SimpleFunction} to + * remember its type. + */ + @Test + public void testGoodTypeForMethodRef() throws Exception { + SimpleFunction fn = + new SimpleFunction(SimpleFunctionTest::toStringThisThing) {}; + + assertThat(fn.getInputTypeDescriptor(), equalTo(TypeDescriptors.integers())); + assertThat(fn.getOutputTypeDescriptor(), equalTo(TypeDescriptors.strings())); + } + + private static String toStringThisThing(Integer i) { + return i.toString(); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java index 57b887ff32a3..97614d68633f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java @@ -25,12 +25,14 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -74,6 +76,9 @@ public class WithKeysTest { @Rule public final TestPipeline p = TestPipeline.create(); + @Rule + public ExpectedException thrown = ExpectedException.none(); + @Test @Category(NeedsRunner.class) public void testExtractKeys() { @@ -150,4 +155,33 @@ public Integer apply(String value) { return value.length(); } } + + @Test + @Category(ValidatesRunner.class) + public void withLambdaAndTypeDescriptorShouldSucceed() { + + PCollection values = p.apply(Create.of("1234", "3210", "0", "-12")); + PCollection> kvs = values.apply( + WithKeys.of((SerializableFunction) Integer::valueOf) + .withKeyType(TypeDescriptor.of(Integer.class))); + + PAssert.that(kvs).containsInAnyOrder( + KV.of(1234, "1234"), KV.of(0, "0"), KV.of(-12, "-12"), KV.of(3210, "3210")); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void withLambdaAndNoTypeDescriptorShouldThrow() { + + PCollection values = p.apply(Create.of("1234", "3210", "0", "-12")); + + values.apply("ApplyKeysWithWithKeys", WithKeys.of(Integer::valueOf)); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Unable to return a default Coder for ApplyKeysWithWithKeys"); + + p.run(); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java index 02ce55968dd5..77cac64fde7a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java @@ -173,4 +173,34 @@ public void withTimestampsWithNullFnShouldThrowOnConstruction() { p.run(); } + + @Test + @Category(ValidatesRunner.class) + public void withTimestampsLambdaShouldApplyTimestamps() { + + final String yearTwoThousand = "946684800000"; + PCollection timestamped = + p.apply(Create.of("1234", "0", Integer.toString(Integer.MAX_VALUE), yearTwoThousand)) + .apply(WithTimestamps.of((String input) -> new Instant(Long.valueOf(input)))); + + PCollection> timestampedVals = + timestamped.apply(ParDo.of(new DoFn>() { + @ProcessElement + public void processElement(ProcessContext c) + throws Exception { + c.output(KV.of(c.element(), c.timestamp())); + } + })); + + PAssert.that(timestamped) + .containsInAnyOrder(yearTwoThousand, "0", "1234", Integer.toString(Integer.MAX_VALUE)); + PAssert.that(timestampedVals) + .containsInAnyOrder( + KV.of("0", new Instant(0)), + KV.of("1234", new Instant(Long.valueOf("1234"))), + KV.of(Integer.toString(Integer.MAX_VALUE), new Instant(Integer.MAX_VALUE)), + KV.of(yearTwoThousand, new Instant(Long.valueOf(yearTwoThousand)))); + + p.run(); + } } diff --git a/sdks/java/java8tests/build.gradle b/sdks/java/java8tests/build.gradle deleted file mode 100644 index 48e75bdce31f..000000000000 --- a/sdks/java/java8tests/build.gradle +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -apply from: project(":").file("build_rules.gradle") -applyJavaNature() - -description = "Apache Beam :: SDKs :: Java :: Java 8 Tests" - -dependencies { - testCompile project(path: ":sdks:java:core", configuration: "shadow") - testCompile project(path: ":runners:direct-java", configuration: "shadow") - testCompile library.java.guava - testCompile library.java.joda_time - testCompile library.java.hamcrest_core - testCompile library.java.junit -} diff --git a/sdks/java/java8tests/pom.xml b/sdks/java/java8tests/pom.xml deleted file mode 100644 index 3709339a9f35..000000000000 --- a/sdks/java/java8tests/pom.xml +++ /dev/null @@ -1,89 +0,0 @@ - - - - - 4.0.0 - - - org.apache.beam - beam-sdks-java-parent - 2.4.0-SNAPSHOT - ../pom.xml - - - beam-sdks-java-java8tests - Apache Beam :: SDKs :: Java :: Java 8 Tests - Apache Beam Java SDK provides a simple, Java-based - interface for processing virtually any size data. - This artifact includes tests of the SDK from a Java 8 - user. - - - - - - org.jacoco - jacoco-maven-plugin - - - - - - - org.apache.beam - beam-sdks-java-core - test - - - - org.apache.beam - beam-runners-direct-java - test - - - - com.google.guava - guava - test - - - - joda-time - joda-time - test - - - - org.hamcrest - hamcrest-core - test - - - - org.hamcrest - hamcrest-library - test - - - - junit - junit - test - - - diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryJava8Test.java deleted file mode 100644 index bc0c70bb1446..000000000000 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryJava8Test.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.options; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Java 8 tests for {@link PipelineOptionsFactory}. - */ -@RunWith(JUnit4.class) -public class PipelineOptionsFactoryJava8Test { - @Rule public ExpectedException thrown = ExpectedException.none(); - - private interface OptionsWithDefaultMethod extends PipelineOptions { - default Number getValue() { - return 1024; - } - - void setValue(Number value); - } - - @Test - public void testDefaultMethodIgnoresDefaultImplementation() { - OptionsWithDefaultMethod optsWithDefault = - PipelineOptionsFactory.as(OptionsWithDefaultMethod.class); - assertThat(optsWithDefault.getValue(), nullValue()); - - optsWithDefault.setValue(12.25); - assertThat(optsWithDefault.getValue(), equalTo(12.25)); - } - - private interface ExtendedOptionsWithDefault extends OptionsWithDefaultMethod {} - - @Test - public void testDefaultMethodInExtendedClassIgnoresDefaultImplementation() { - OptionsWithDefaultMethod extendedOptsWithDefault = - PipelineOptionsFactory.as(ExtendedOptionsWithDefault.class); - assertThat(extendedOptsWithDefault.getValue(), nullValue()); - - extendedOptsWithDefault.setValue(Double.NEGATIVE_INFINITY); - assertThat(extendedOptsWithDefault.getValue(), equalTo(Double.NEGATIVE_INFINITY)); - } - - private interface Options extends PipelineOptions { - Number getValue(); - - void setValue(Number value); - } - - private interface SubtypeReturingOptions extends Options { - @Override - Integer getValue(); - void setValue(Integer value); - } - - @Test - public void testReturnTypeConflictThrows() throws Exception { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage( - "Method [getValue] has multiple definitions [public abstract java.lang.Integer " - + "org.apache.beam.sdk.options.PipelineOptionsFactoryJava8Test$" - + "SubtypeReturingOptions.getValue(), public abstract java.lang.Number " - + "org.apache.beam.sdk.options.PipelineOptionsFactoryJava8Test$Options" - + ".getValue()] with different return types for [" - + "org.apache.beam.sdk.options.PipelineOptionsFactoryJava8Test$" - + "SubtypeReturingOptions]."); - PipelineOptionsFactory.as(SubtypeReturingOptions.class); - } -} diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java deleted file mode 100644 index a0f7ce65f87a..000000000000 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.not; - -import com.google.common.collect.Iterables; -import java.io.Serializable; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.junit.Assume; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Java 8 Tests for {@link Combine}. - */ -@RunWith(JUnit4.class) -@SuppressWarnings("serial") -public class CombineJava8Test implements Serializable { - - @Rule - public final transient TestPipeline pipeline = TestPipeline.create(); - - @Rule - public transient ExpectedException thrown = ExpectedException.none(); - - /** - * Class for use in testing use of Java 8 method references. - */ - private static class Summer implements Serializable { - public int sum(Iterable integers) { - int sum = 0; - for (int i : integers) { - sum += i; - } - return sum; - } - } - - /** - * Tests creation of a global {@link Combine} via Java 8 lambda. - */ - @Test - public void testCombineGloballyLambda() { - - PCollection output = pipeline - .apply(Create.of(1, 2, 3, 4)) - .apply(Combine.globally(integers -> { - int sum = 0; - for (int i : integers) { - sum += i; - } - return sum; - })); - - PAssert.that(output).containsInAnyOrder(10); - pipeline.run(); - } - - /** - * Tests creation of a global {@link Combine} via a Java 8 method reference. - */ - @Test - public void testCombineGloballyInstanceMethodReference() { - - PCollection output = pipeline - .apply(Create.of(1, 2, 3, 4)) - .apply(Combine.globally(new Summer()::sum)); - - PAssert.that(output).containsInAnyOrder(10); - pipeline.run(); - } - - /** - * Tests creation of a per-key {@link Combine} via a Java 8 lambda. - */ - @Test - public void testCombinePerKeyLambda() { - - PCollection> output = pipeline - .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4))) - .apply(Combine.perKey(integers -> { - int sum = 0; - for (int i : integers) { - sum += i; - } - return sum; - })); - - PAssert.that(output).containsInAnyOrder( - KV.of("a", 4), - KV.of("b", 2), - KV.of("c", 4)); - pipeline.run(); - } - - /** - * Tests creation of a per-key {@link Combine} via a Java 8 method reference. - */ - @Test - public void testCombinePerKeyInstanceMethodReference() { - - PCollection> output = pipeline - .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4))) - .apply(Combine.perKey(new Summer()::sum)); - - PAssert.that(output).containsInAnyOrder( - KV.of("a", 4), - KV.of("b", 2), - KV.of("c", 4)); - pipeline.run(); - } - - /** - * Tests that we can serialize {@link Combine.CombineFn CombineFns} constructed from a lambda. - * Lambdas can be problematic because the {@link Class} object is synthetic and cannot be - * deserialized. - */ - @Test - public void testLambdaSerialization() { - SerializableFunction, Object> combiner = xs -> Iterables.getFirst(xs, 0); - - boolean lambdaClassSerializationThrows; - try { - SerializableUtils.clone(combiner.getClass()); - lambdaClassSerializationThrows = false; - } catch (IllegalArgumentException e) { - // Expected - lambdaClassSerializationThrows = true; - } - Assume.assumeTrue("Expected lambda class serialization to fail. " - + "If it's fixed, we can remove special behavior in Combine.", - lambdaClassSerializationThrows); - - - Combine.Globally combine = Combine.globally(combiner); - SerializableUtils.clone(combine); // should not throw. - } - - @Test - public void testLambdaDisplayData() { - Combine.Globally combine = Combine.globally(xs -> Iterables.getFirst(xs, 0)); - DisplayData displayData = DisplayData.from(combine); - assertThat(displayData.items(), not(empty())); - } -} diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java deleted file mode 100644 index 4b71a40946f3..000000000000 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertThat; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import java.util.HashSet; -import java.util.Set; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Java 8 tests for {@link Distinct}. - */ -@RunWith(JUnit4.class) -public class DistinctJava8Test { - - @Rule - public final transient TestPipeline p = TestPipeline.create(); - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Test - public void withLambdaRepresentativeValuesFnAndTypeDescriptorShouldApplyFn() { - - Multimap predupedContents = HashMultimap.create(); - predupedContents.put(3, "foo"); - predupedContents.put(4, "foos"); - predupedContents.put(6, "barbaz"); - predupedContents.put(6, "bazbar"); - PCollection dupes = - p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo")); - PCollection deduped = - dupes.apply( - Distinct.withRepresentativeValueFn(String::length) - .withRepresentativeType(TypeDescriptor.of(Integer.class))); - - PAssert.that(deduped).satisfies((Iterable strs) -> { - Set seenLengths = new HashSet<>(); - for (String s : strs) { - assertThat(predupedContents.values(), hasItem(s)); - assertThat(seenLengths, not(contains(s.length()))); - seenLengths.add(s.length()); - } - return null; - }); - - p.run(); - } - - @Test - public void withLambdaRepresentativeValuesFnNoTypeDescriptorShouldThrow() { - - Multimap predupedContents = HashMultimap.create(); - predupedContents.put(3, "foo"); - predupedContents.put(4, "foos"); - predupedContents.put(6, "barbaz"); - predupedContents.put(6, "bazbar"); - PCollection dupes = - p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo")); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("Unable to return a default Coder for RemoveRepresentativeDupes"); - - // Thrown when applying a transform to the internal WithKeys that withRepresentativeValueFn is - // implemented with - dupes.apply("RemoveRepresentativeDupes", Distinct.withRepresentativeValueFn(String::length)); - } -} diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java deleted file mode 100644 index b38250e6147a..000000000000 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import java.io.Serializable; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.ValidatesRunner; -import org.apache.beam.sdk.values.PCollection; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Java 8 Tests for {@link Filter}. - */ -@RunWith(JUnit4.class) -@SuppressWarnings("serial") -public class FilterJava8Test implements Serializable { - - @Rule - public final transient TestPipeline pipeline = TestPipeline.create(); - - @Rule - public transient ExpectedException thrown = ExpectedException.none(); - - @Test - @Category(ValidatesRunner.class) - public void testIdentityFilterByPredicate() { - - PCollection output = pipeline - .apply(Create.of(591, 11789, 1257, 24578, 24799, 307)) - .apply(Filter.by(i -> true)); - - PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307); - pipeline.run(); - } - - @Test - public void testNoFilterByPredicate() { - - PCollection output = pipeline - .apply(Create.of(1, 2, 4, 5)) - .apply(Filter.by(i -> false)); - - PAssert.that(output).empty(); - pipeline.run(); - } - - @Test - @Category(ValidatesRunner.class) - public void testFilterByPredicate() { - - PCollection output = pipeline - .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) - .apply(Filter.by(i -> i % 2 == 0)); - - PAssert.that(output).containsInAnyOrder(2, 4, 6); - pipeline.run(); - } - - /** - * Confirms that in Java 8 style, where a lambda results in a rawtype, the output type token is - * not useful. If this test ever fails there may be simplifications available to us. - */ - @Test - public void testFilterParDoOutputTypeDescriptorRaw() throws Exception { - - @SuppressWarnings({"unchecked", "rawtypes"}) - PCollection output = pipeline - .apply(Create.of("hello")) - .apply(Filter.by(s -> true)); - - thrown.expect(CannotProvideCoderException.class); - pipeline.getCoderRegistry().getCoder(output.getTypeDescriptor()); - } - - @Test - @Category(ValidatesRunner.class) - public void testFilterByMethodReference() { - - PCollection output = pipeline - .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) - .apply(Filter.by(new EvenFilter()::isEven)); - - PAssert.that(output).containsInAnyOrder(2, 4, 6); - pipeline.run(); - } - - private static class EvenFilter implements Serializable { - public boolean isEven(int i) { - return i % 2 == 0; - } - } -} diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java deleted file mode 100644 index 501b0d1bd8aa..000000000000 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import com.google.common.collect.ImmutableList; -import java.io.Serializable; -import java.util.List; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptors; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Java 8 Tests for {@link FlatMapElements}. - */ -@RunWith(JUnit4.class) -public class FlatMapElementsJava8Test implements Serializable { - - @Rule - public final transient TestPipeline pipeline = TestPipeline.create(); - - @Rule - public transient ExpectedException thrown = ExpectedException.none(); - - /** - * Basic test of {@link FlatMapElements} with a lambda (which is instantiated as a - * {@link SerializableFunction}). - */ - @Test - public void testFlatMapBasic() throws Exception { - PCollection output = pipeline - .apply(Create.of(1, 2, 3)) - .apply(FlatMapElements - // Note that the input type annotation is required. - .into(TypeDescriptors.integers()) - .via((Integer i) -> ImmutableList.of(i, -i))); - - PAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2); - pipeline.run(); - } - - /** - * Basic test of {@link FlatMapElements} with a method reference. - */ - @Test - public void testFlatMapMethodReference() throws Exception { - - PCollection output = pipeline - .apply(Create.of(1, 2, 3)) - .apply(FlatMapElements - // Note that the input type annotation is required. - .into(TypeDescriptors.integers()) - .via(new Negater()::numAndNegation)); - - PAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2); - pipeline.run(); - } - - private static class Negater implements Serializable { - public List numAndNegation(int input) { - return ImmutableList.of(input, -input); - } - } -} diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java deleted file mode 100644 index dbd5ef3d209e..000000000000 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import java.io.Serializable; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptors; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Java 8 tests for {@link MapElements}. - */ -@RunWith(JUnit4.class) -public class MapElementsJava8Test implements Serializable { - - @Rule - public final transient TestPipeline pipeline = TestPipeline.create(); - - /** - * Basic test of {@link MapElements} with a lambda (which is instantiated as a {@link - * SerializableFunction}). - */ - @Test - public void testMapLambda() throws Exception { - - PCollection output = pipeline - .apply(Create.of(1, 2, 3)) - .apply(MapElements - // Note that the type annotation is required. - .into(TypeDescriptors.integers()) - .via((Integer i) -> i * 2)); - - PAssert.that(output).containsInAnyOrder(6, 2, 4); - pipeline.run(); - } - - /** - * Basic test of {@link MapElements} with a lambda wrapped into a {@link SimpleFunction} to - * remember its type. - */ - @Test - public void testMapWrappedLambda() throws Exception { - - PCollection output = - pipeline - .apply(Create.of(1, 2, 3)) - .apply( - MapElements - .via(new SimpleFunction((Integer i) -> i * 2) {})); - - PAssert.that(output).containsInAnyOrder(6, 2, 4); - pipeline.run(); - } - - /** - * Basic test of {@link MapElements} with a method reference. - */ - @Test - public void testMapMethodReference() throws Exception { - - PCollection output = pipeline - .apply(Create.of(1, 2, 3)) - .apply(MapElements - // Note that the type annotation is required. - .into(TypeDescriptors.integers()) - .via(new Doubler()::doubleIt)); - - PAssert.that(output).containsInAnyOrder(6, 2, 4); - pipeline.run(); - } - - private static class Doubler implements Serializable { - public int doubleIt(int val) { - return val * 2; - } - } -} diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java deleted file mode 100644 index 94353a5bf7df..000000000000 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import static org.junit.Assert.assertEquals; - -import java.io.Serializable; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollectionList; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Java 8 Tests for {@link Filter}. - */ -@RunWith(JUnit4.class) -@SuppressWarnings("serial") -public class PartitionJava8Test implements Serializable { - - @Rule - public final transient TestPipeline pipeline = TestPipeline.create(); - - @Rule - public transient ExpectedException thrown = ExpectedException.none(); - - @Test - public void testModPartition() { - - - PCollectionList outputs = pipeline - .apply(Create.of(1, 2, 4, 5)) - .apply(Partition.of(3, (element, numPartitions) -> element % numPartitions)); - assertEquals(3, outputs.size()); - PAssert.that(outputs.get(0)).empty(); - PAssert.that(outputs.get(1)).containsInAnyOrder(1, 4); - PAssert.that(outputs.get(2)).containsInAnyOrder(2, 5); - pipeline.run(); - } - - /** - * Confirms that in Java 8 style, where a lambda results in a rawtype, the output type token is - * not useful. If this test ever fails there may be simplifications available to us. - */ - @Test - public void testPartitionFnOutputTypeDescriptorRaw() throws Exception { - - PCollectionList output = pipeline - .apply(Create.of("hello")) - .apply(Partition.of(1, (element, numPartitions) -> 0)); - - thrown.expect(CannotProvideCoderException.class); - pipeline.getCoderRegistry().getCoder(output.get(0).getTypeDescriptor()); - } -} diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionJava8Test.java deleted file mode 100644 index 327fa589536d..000000000000 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/SimpleFunctionJava8Test.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; - -import java.io.Serializable; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.TypeDescriptors; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Java 8 tests for {@link SimpleFunction}. - */ -@RunWith(JUnit4.class) -public class SimpleFunctionJava8Test implements Serializable { - - @Rule - public final transient TestPipeline pipeline = TestPipeline.create(); - - /** - * Basic test of {@link MapElements} with a lambda (which is instantiated as a {@link - * SerializableFunction}). - */ - @Test - public void testGoodTypeForLambda() throws Exception { - SimpleFunction fn = new SimpleFunction(Object::toString) {}; - - assertThat(fn.getInputTypeDescriptor(), equalTo(TypeDescriptors.integers())); - assertThat(fn.getOutputTypeDescriptor(), equalTo(TypeDescriptors.strings())); - } - - /** - * Basic test of {@link MapElements} with a lambda wrapped into a {@link SimpleFunction} to - * remember its type. - */ - @Test - public void testGoodTypeForMethodRef() throws Exception { - SimpleFunction fn = - new SimpleFunction(SimpleFunctionJava8Test::toStringThisThing) {}; - - assertThat(fn.getInputTypeDescriptor(), equalTo(TypeDescriptors.integers())); - assertThat(fn.getOutputTypeDescriptor(), equalTo(TypeDescriptors.strings())); - } - - private static String toStringThisThing(Integer i) { - return i.toString(); - } -} diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java deleted file mode 100644 index 34e42aca2555..000000000000 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.ValidatesRunner; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Java 8 Tests for {@link WithKeys}. - */ -@RunWith(JUnit4.class) -public class WithKeysJava8Test { - - @Rule - public final transient TestPipeline p = TestPipeline.create(); - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Test - @Category(ValidatesRunner.class) - public void withLambdaAndTypeDescriptorShouldSucceed() { - - - PCollection values = p.apply(Create.of("1234", "3210", "0", "-12")); - PCollection> kvs = values.apply( - WithKeys.of((SerializableFunction) Integer::valueOf) - .withKeyType(TypeDescriptor.of(Integer.class))); - - PAssert.that(kvs).containsInAnyOrder( - KV.of(1234, "1234"), KV.of(0, "0"), KV.of(-12, "-12"), KV.of(3210, "3210")); - - p.run(); - } - - @Test - public void withLambdaAndNoTypeDescriptorShouldThrow() { - - PCollection values = p.apply(Create.of("1234", "3210", "0", "-12")); - - values.apply("ApplyKeysWithWithKeys", WithKeys.of(Integer::valueOf)); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("Unable to return a default Coder for ApplyKeysWithWithKeys"); - - p.run(); - } -} diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java deleted file mode 100644 index ee23d95dabf0..000000000000 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import java.io.Serializable; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.ValidatesRunner; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Java 8 tests for {@link WithTimestamps}. - */ -@RunWith(JUnit4.class) -public class WithTimestampsJava8Test implements Serializable { - - @Rule - public final transient TestPipeline p = TestPipeline.create(); - - @Test - @Category(ValidatesRunner.class) - public void withTimestampsLambdaShouldApplyTimestamps() { - - final String yearTwoThousand = "946684800000"; - PCollection timestamped = - p.apply(Create.of("1234", "0", Integer.toString(Integer.MAX_VALUE), yearTwoThousand)) - .apply(WithTimestamps.of((String input) -> new Instant(Long.valueOf(input)))); - - PCollection> timestampedVals = - timestamped.apply(ParDo.of(new DoFn>() { - @ProcessElement - public void processElement(ProcessContext c) - throws Exception { - c.output(KV.of(c.element(), c.timestamp())); - } - })); - - PAssert.that(timestamped) - .containsInAnyOrder(yearTwoThousand, "0", "1234", Integer.toString(Integer.MAX_VALUE)); - PAssert.that(timestampedVals) - .containsInAnyOrder( - KV.of("0", new Instant(0)), - KV.of("1234", new Instant(Long.valueOf("1234"))), - KV.of(Integer.toString(Integer.MAX_VALUE), new Instant(Integer.MAX_VALUE)), - KV.of(yearTwoThousand, new Instant(Long.valueOf(yearTwoThousand)))); - - p.run(); - } -} diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml index e037e946b195..78b7c21a819e 100644 --- a/sdks/java/pom.xml +++ b/sdks/java/pom.xml @@ -40,7 +40,6 @@ container core io - java8tests maven-archetypes extensions fn-execution diff --git a/settings.gradle b/settings.gradle index 09e68f504341..8446c45c6c9b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -74,7 +74,6 @@ include ":sdks:java:io:redis" include ":sdks:java:io:solr" include ":sdks:java:io:tika" include ":sdks:java:io:xml" -include ":sdks:java:java8tests" include ":sdks:java:maven-archetypes:examples" include ":sdks:java:maven-archetypes:starter" include ":sdks:java:nexmark"