From db2334d7d86b4ca4d3be40d9dbcb19a31d6f0fdd Mon Sep 17 00:00:00 2001 From: Jeff Klukas Date: Thu, 1 Nov 2018 14:50:32 -0400 Subject: [PATCH 1/7] [BEAM-5910] Add lastModified field to MatchResult.Metadata --- .../java/org/apache/beam/sdk/io/FileIO.java | 1 + .../apache/beam/sdk/io/LocalFileSystem.java | 1 + .../apache/beam/sdk/io/fs/MatchResult.java | 4 +++ .../apache/beam/sdk/io/fs/MetadataCoder.java | 3 ++ .../org/apache/beam/sdk/io/FileIOTest.java | 31 ++++++++++++++----- .../extensions/gcp/storage/GcsFileSystem.java | 3 ++ .../beam/sdk/io/aws/s3/S3FileSystem.java | 9 ++++-- .../beam/sdk/io/aws/s3/S3ResourceId.java | 18 +++++++++-- .../sdk/io/aws/s3/MatchResultMatcher.java | 3 +- .../beam/sdk/io/aws/s3/S3FileSystemTest.java | 23 +++++++++++++- .../beam/sdk/io/hdfs/HadoopFileSystem.java | 1 + .../sdk/io/hdfs/HadoopFileSystemTest.java | 14 +++++++++ 12 files changed, 97 insertions(+), 14 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 99dddc51767d..8d48a2868145 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -745,6 +745,7 @@ public void process(ProcessContext c) { MatchResult.Metadata.builder() .setResourceId(metadata.resourceId()) .setSizeBytes(metadata.sizeBytes()) + .setLastModifiedMillis(metadata.lastModifiedMillis()) .setIsReadSeekEfficient( metadata.isReadSeekEfficient() && compression == Compression.UNCOMPRESSED) .build(), diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java index 386253637df8..8d231ac05b22 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java @@ -272,6 +272,7 @@ private Metadata toMetadata(File file) { .setResourceId(LocalResourceId.fromPath(file.toPath(), file.isDirectory())) .setIsReadSeekEfficient(true) .setSizeBytes(file.length()) + .setLastModifiedMillis(file.lastModified()) .build(); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java index c0e2bfa9d973..91b0bdab1b55 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java @@ -82,6 +82,8 @@ public abstract static class Metadata implements Serializable { public abstract long sizeBytes(); + public abstract long lastModifiedMillis(); + public abstract boolean isReadSeekEfficient(); public static Builder builder() { @@ -95,6 +97,8 @@ public abstract static class Builder { public abstract Builder setSizeBytes(long value); + public abstract Builder setLastModifiedMillis(long value); + public abstract Builder setIsReadSeekEfficient(boolean value); public abstract Metadata build(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java index 5c9c4d763723..0998224fb217 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java @@ -42,6 +42,7 @@ public void encode(Metadata value, OutputStream os) throws IOException { RESOURCE_ID_CODER.encode(value.resourceId(), os); INT_CODER.encode(value.isReadSeekEfficient() ? 1 : 0, os); LONG_CODER.encode(value.sizeBytes(), os); + LONG_CODER.encode(value.lastModifiedMillis(), os); } @Override @@ -49,10 +50,12 @@ public Metadata decode(InputStream is) throws IOException { ResourceId resourceId = RESOURCE_ID_CODER.decode(is); boolean isReadSeekEfficient = INT_CODER.decode(is) == 1; long sizeBytes = LONG_CODER.decode(is); + long lastModifiedMillis = LONG_CODER.decode(is); return Metadata.builder() .setResourceId(resourceId) .setIsReadSeekEfficient(isReadSeekEfficient) .setSizeBytes(sizeBytes) + .setLastModifiedMillis(lastModifiedMillis) .build(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java index a282acfb4f2b..d46ec115b3ac 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java @@ -32,6 +32,7 @@ import java.io.Writer; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.attribute.FileTime; import java.util.Arrays; import java.util.List; import java.util.zip.GZIPOutputStream; @@ -79,24 +80,30 @@ public void testMatchAndMatchAll() throws IOException { Path secondPath = tmpFolder.newFile("second").toPath(); int firstSize = 37; int secondSize = 42; + long firstModified = 1541097000L; + long secondModified = 1541098000L; Files.write(firstPath, new byte[firstSize]); Files.write(secondPath, new byte[secondSize]); + Files.setLastModifiedTime(firstPath, FileTime.fromMillis(firstModified)); + Files.setLastModifiedTime(secondPath, FileTime.fromMillis(secondModified)); + MatchResult.Metadata firstMetadata = metadata(firstPath, firstSize, firstModified); + MatchResult.Metadata secondMetadata = metadata(secondPath, secondSize, secondModified); PAssert.that( p.apply( "Match existing", FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*"))) - .containsInAnyOrder(metadata(firstPath, firstSize), metadata(secondPath, secondSize)); + .containsInAnyOrder(firstMetadata, secondMetadata); PAssert.that( p.apply( "Match existing with provider", FileIO.match() .filepattern(p.newProvider(tmpFolder.getRoot().getAbsolutePath() + "/*")))) - .containsInAnyOrder(metadata(firstPath, firstSize), metadata(secondPath, secondSize)); + .containsInAnyOrder(firstMetadata, secondMetadata); PAssert.that( p.apply("Create existing", Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*")) .apply("MatchAll existing", FileIO.matchAll())) - .containsInAnyOrder(metadata(firstPath, firstSize), metadata(secondPath, secondSize)); + .containsInAnyOrder(firstMetadata, secondMetadata); PAssert.that( p.apply( @@ -232,9 +239,18 @@ public void testMatchWatchForNewFiles() throws IOException, InterruptedException List expected = Arrays.asList( - metadata(basePath.resolve("first"), 42), - metadata(basePath.resolve("second"), 37), - metadata(basePath.resolve("third"), 99)); + metadata( + basePath.resolve("first"), + 42, + Files.getLastModifiedTime(basePath.resolve("first")).toMillis()), + metadata( + basePath.resolve("second"), + 37, + Files.getLastModifiedTime(basePath.resolve("second")).toMillis()), + metadata( + basePath.resolve("third"), + 99, + Files.getLastModifiedTime(basePath.resolve("third")).toMillis())); PAssert.that(matchMetadata).containsInAnyOrder(expected); PAssert.that(matchAllMetadata).containsInAnyOrder(expected); p.run(); @@ -309,11 +325,12 @@ public void testRead() throws IOException { p.run(); } - private static MatchResult.Metadata metadata(Path path, int size) { + private static MatchResult.Metadata metadata(Path path, int size, long lastModifiedMillis) { return MatchResult.Metadata.builder() .setResourceId(FileSystems.matchNewResource(path.toString(), false /* isDirectory */)) .setIsReadSeekEfficient(true) .setSizeBytes(size) + .setLastModifiedMillis(lastModifiedMillis) .build(); } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java index db6aea965e8c..84004bcb14ea 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java @@ -22,6 +22,7 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState; +import com.google.api.client.util.DateTime; import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; import java.io.FileNotFoundException; @@ -268,6 +269,8 @@ private Metadata toMetadata(StorageObject storageObject) { .setResourceId(GcsResourceId.fromGcsPath(GcsPath.fromObject(storageObject))); BigInteger size = firstNonNull(storageObject.getSize(), BigInteger.ZERO); ret.setSizeBytes(size.longValue()); + DateTime lastModified = firstNonNull(storageObject.getUpdated(), new DateTime(0L)); + ret.setLastModifiedMillis(lastModified.getValue()); return ret.build(); } diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java index 1276a79c6ddf..39ebcbb51368 100644 --- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java @@ -47,6 +47,7 @@ import java.nio.channels.WritableByteChannel; import java.util.ArrayList; import java.util.Collection; +import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -318,7 +319,8 @@ private ExpandedGlob expandGlob(S3ResourceId glob) { if (wildcardRegexp.matcher(objectSummary.getKey()).matches()) { S3ResourceId expandedPath = S3ResourceId.fromComponents(objectSummary.getBucketName(), objectSummary.getKey()) - .withSize(objectSummary.getSize()); + .withSize(objectSummary.getSize()) + .withLastModified(objectSummary.getLastModified()); LOG.debug("Expanded S3 object path {}", expandedPath); expandedPaths.add(expandedPath); } @@ -373,7 +375,8 @@ MatchResult matchNonGlobPath(S3ResourceId path) { MatchResult.Status.OK, ImmutableList.of( createBeamMetadata( - path.withSize(s3Metadata.getContentLength()), + path.withSize(s3Metadata.getContentLength()) + .withLastModified(s3Metadata.getLastModified()), Strings.nullToEmpty(s3Metadata.getContentEncoding())))); } @@ -382,10 +385,12 @@ private static MatchResult.Metadata createBeamMetadata( checkArgument(path.getSize().isPresent(), "path has size"); checkNotNull(contentEncoding, "contentEncoding"); boolean isReadSeekEfficient = !NON_READ_SEEK_EFFICIENT_ENCODINGS.contains(contentEncoding); + return MatchResult.Metadata.builder() .setIsReadSeekEfficient(isReadSeekEfficient) .setResourceId(path) .setSizeBytes(path.getSize().get()) + .setLastModifiedMillis(path.getLastModified().transform(Date::getTime).or(0L)) .build(); } diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ResourceId.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ResourceId.java index 120d08b7dc61..1c9a2c3f2553 100644 --- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ResourceId.java +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ResourceId.java @@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState; +import java.util.Date; import java.util.Objects; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -44,19 +45,22 @@ class S3ResourceId implements ResourceId { private final String bucket; private final String key; private final Long size; + private final Date lastModified; - private S3ResourceId(String bucket, String key, @Nullable Long size) { + private S3ResourceId( + String bucket, String key, @Nullable Long size, @Nullable Date lastModified) { checkArgument(!Strings.isNullOrEmpty(bucket), "bucket"); this.bucket = bucket; this.key = checkNotNull(key, "key"); this.size = size; + this.lastModified = lastModified; } static S3ResourceId fromComponents(String bucket, String key) { if (!key.startsWith("/")) { key = "/" + key; } - return new S3ResourceId(bucket, key, null); + return new S3ResourceId(bucket, key, null, null); } static S3ResourceId fromUri(String uri) { @@ -85,7 +89,15 @@ Optional getSize() { } S3ResourceId withSize(long size) { - return new S3ResourceId(bucket, key, size); + return new S3ResourceId(bucket, key, size, lastModified); + } + + Optional getLastModified() { + return Optional.fromNullable(lastModified); + } + + S3ResourceId withLastModified(Date lastModified) { + return new S3ResourceId(bucket, key, size, lastModified); } @Override diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/MatchResultMatcher.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/MatchResultMatcher.java index aabbfb8d603c..c0b31051797f 100644 --- a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/MatchResultMatcher.java +++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/MatchResultMatcher.java @@ -58,10 +58,11 @@ private static MatchResultMatcher create(MatchResult.Metadata expectedMetadata) } static MatchResultMatcher create( - long sizeBytes, ResourceId resourceId, boolean isReadSeekEfficient) { + long sizeBytes, long lastModifiedMillis, ResourceId resourceId, boolean isReadSeekEfficient) { return create( MatchResult.Metadata.builder() .setSizeBytes(sizeBytes) + .setLastModifiedMillis(lastModifiedMillis) .setResourceId(resourceId) .setIsReadSeekEfficient(isReadSeekEfficient) .build()); diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java index 46640e39ab96..9f2650ec1165 100644 --- a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java +++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java @@ -67,6 +67,7 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.util.ArrayList; +import java.util.Date; import java.util.List; import org.apache.beam.sdk.io.aws.options.S3Options; import org.apache.beam.sdk.io.fs.MatchResult; @@ -330,9 +331,11 @@ public void matchNonGlob() { S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options()); S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists"); + long lastModifiedMillis = 1540000000000L; ObjectMetadata s3ObjectMetadata = new ObjectMetadata(); s3ObjectMetadata.setContentLength(100); s3ObjectMetadata.setContentEncoding("read-seek-efficient"); + s3ObjectMetadata.setLastModified(new Date(lastModifiedMillis)); when(s3FileSystem .getAmazonS3Client() .getObjectMetadata( @@ -348,6 +351,7 @@ public void matchNonGlob() { ImmutableList.of( MatchResult.Metadata.builder() .setSizeBytes(100) + .setLastModifiedMillis(lastModifiedMillis) .setResourceId(path) .setIsReadSeekEfficient(true) .build()))); @@ -358,8 +362,10 @@ public void matchNonGlobNotReadSeekEfficient() { S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options()); S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists"); + long lastModifiedMillis = 1540000000000L; ObjectMetadata s3ObjectMetadata = new ObjectMetadata(); s3ObjectMetadata.setContentLength(100); + s3ObjectMetadata.setLastModified(new Date(lastModifiedMillis)); s3ObjectMetadata.setContentEncoding("gzip"); when(s3FileSystem .getAmazonS3Client() @@ -376,6 +382,7 @@ public void matchNonGlobNotReadSeekEfficient() { ImmutableList.of( MatchResult.Metadata.builder() .setSizeBytes(100) + .setLastModifiedMillis(lastModifiedMillis) .setResourceId(path) .setIsReadSeekEfficient(false) .build()))); @@ -386,8 +393,10 @@ public void matchNonGlobNullContentEncoding() { S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options()); S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists"); + long lastModifiedMillis = 1540000000000L; ObjectMetadata s3ObjectMetadata = new ObjectMetadata(); s3ObjectMetadata.setContentLength(100); + s3ObjectMetadata.setLastModified(new Date(lastModifiedMillis)); s3ObjectMetadata.setContentEncoding(null); when(s3FileSystem .getAmazonS3Client() @@ -404,6 +413,7 @@ public void matchNonGlobNullContentEncoding() { ImmutableList.of( MatchResult.Metadata.builder() .setSizeBytes(100) + .setLastModifiedMillis(lastModifiedMillis) .setResourceId(path) .setIsReadSeekEfficient(true) .build()))); @@ -488,12 +498,14 @@ public void matchGlob() throws IOException { firstMatch.setBucketName(path.getBucket()); firstMatch.setKey("foo/bar0baz"); firstMatch.setSize(100); + firstMatch.setLastModified(new Date(1540000000001L)); // Expected to not be returned; prefix matches, but substring after wildcard does not S3ObjectSummary secondMatch = new S3ObjectSummary(); secondMatch.setBucketName(path.getBucket()); secondMatch.setKey("foo/bar1qux"); secondMatch.setSize(200); + secondMatch.setLastModified(new Date(1540000000002L)); // Expected first request returns continuation token ListObjectsV2Result firstResult = new ListObjectsV2Result(); @@ -517,6 +529,7 @@ public void matchGlob() throws IOException { thirdMatch.setBucketName(path.getBucket()); thirdMatch.setKey("foo/bar2baz"); thirdMatch.setSize(300); + thirdMatch.setLastModified(new Date(1540000000003L)); // Expected second request returns third prefix match and no continuation token ListObjectsV2Result secondResult = new ListObjectsV2Result(); @@ -542,6 +555,7 @@ public void matchGlob() throws IOException { S3ResourceId.fromComponents( firstMatch.getBucketName(), firstMatch.getKey())) .setSizeBytes(firstMatch.getSize()) + .setLastModifiedMillis(firstMatch.getLastModified().getTime()) .build(), MatchResult.Metadata.builder() .setIsReadSeekEfficient(true) @@ -549,6 +563,7 @@ public void matchGlob() throws IOException { S3ResourceId.fromComponents( thirdMatch.getBucketName(), thirdMatch.getKey())) .setSizeBytes(thirdMatch.getSize()) + .setLastModifiedMillis(thirdMatch.getLastModified().getTime()) .build()))); } @@ -569,12 +584,14 @@ public void matchGlobWithSlashes() throws IOException { firstMatch.setBucketName(path.getBucket()); firstMatch.setKey("foo/bar\\baz0"); firstMatch.setSize(100); + firstMatch.setLastModified(new Date(1540000000001L)); // Expected to not be returned; prefix matches, but substring after wildcard does not S3ObjectSummary secondMatch = new S3ObjectSummary(); secondMatch.setBucketName(path.getBucket()); secondMatch.setKey("foo/bar/baz1"); secondMatch.setSize(200); + secondMatch.setLastModified(new Date(1540000000002L)); // Expected first request returns continuation token ListObjectsV2Result result = new ListObjectsV2Result(); @@ -600,6 +617,7 @@ public void matchGlobWithSlashes() throws IOException { S3ResourceId.fromComponents( firstMatch.getBucketName(), firstMatch.getKey())) .setSizeBytes(firstMatch.getSize()) + .setLastModifiedMillis(firstMatch.getLastModified().getTime()) .build()))); } @@ -636,6 +654,7 @@ public void matchVariousInvokeThreadPool() throws IOException { S3ResourceId pathExist = S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists"); ObjectMetadata s3ObjectMetadata = new ObjectMetadata(); s3ObjectMetadata.setContentLength(100); + s3ObjectMetadata.setLastModified(new Date(1540000000000L)); s3ObjectMetadata.setContentEncoding("not-gzip"); when(s3FileSystem .getAmazonS3Client() @@ -651,6 +670,7 @@ public void matchVariousInvokeThreadPool() throws IOException { foundListObject.setBucketName(pathGlob.getBucket()); foundListObject.setKey("path/part-0"); foundListObject.setSize(200); + foundListObject.setLastModified(new Date(1541000000000L)); ListObjectsV2Result listObjectsResult = new ListObjectsV2Result(); listObjectsResult.setNextContinuationToken(null); @@ -679,9 +699,10 @@ public void matchVariousInvokeThreadPool() throws IOException { MatchResultMatcher.create(MatchResult.Status.NOT_FOUND, new FileNotFoundException()), MatchResultMatcher.create( MatchResult.Status.ERROR, new IOException(forbiddenException)), - MatchResultMatcher.create(100, pathExist, true), + MatchResultMatcher.create(100, 1540000000000L, pathExist, true), MatchResultMatcher.create( 200, + 1541000000000L, S3ResourceId.fromComponents(pathGlob.getBucket(), foundListObject.getKey()), true))); } diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java index 0bd556f092f8..4eefc86e8c4d 100644 --- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java @@ -102,6 +102,7 @@ protected List match(List specs) { .setResourceId(new HadoopResourceId(uri)) .setIsReadSeekEfficient(true) .setSizeBytes(fileStatus.getLen()) + .setLastModifiedMillis(fileStatus.getModificationTime()) .build()); } } diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java index 782e75ce3f6f..acb550f24548 100644 --- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java +++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java @@ -158,6 +158,7 @@ public void testDelete() throws Exception { .setResourceId(testPath("testFileB")) .setIsReadSeekEfficient(true) .setSizeBytes("testDataB".getBytes(StandardCharsets.UTF_8).length) + .setLastModifiedMillis(lastModified("testFileB")) .build())))); } @@ -188,11 +189,13 @@ public void testMatch() throws Exception { .setResourceId(testPath("testFileAA")) .setIsReadSeekEfficient(true) .setSizeBytes("testDataAA".getBytes(StandardCharsets.UTF_8).length) + .setLastModifiedMillis(lastModified("testFileAA")) .build(), Metadata.builder() .setResourceId(testPath("testFileA")) .setIsReadSeekEfficient(true) .setSizeBytes("testDataA".getBytes(StandardCharsets.UTF_8).length) + .setLastModifiedMillis(lastModified("testFileA")) .build())); } @@ -223,6 +226,7 @@ public void testMatchForNonExistentFile() throws Exception { .setResourceId(testPath("testFileAA")) .setIsReadSeekEfficient(true) .setSizeBytes("testDataAA".getBytes(StandardCharsets.UTF_8).length) + .setLastModifiedMillis(lastModified("testFileAA")) .build())), MatchResult.create(Status.NOT_FOUND, ImmutableList.of()), MatchResult.create( @@ -232,6 +236,7 @@ public void testMatchForNonExistentFile() throws Exception { .setResourceId(testPath("testFileBB")) .setIsReadSeekEfficient(true) .setSizeBytes("testDataBB".getBytes(StandardCharsets.UTF_8).length) + .setLastModifiedMillis(lastModified("testFileBB")) .build()))); assertThat(matchResults, equalTo(expected)); } @@ -258,11 +263,13 @@ public void testRename() throws Exception { .setResourceId(testPath("renameFileA")) .setIsReadSeekEfficient(true) .setSizeBytes("testDataA".getBytes(StandardCharsets.UTF_8).length) + .setLastModifiedMillis(lastModified("renameFileA")) .build(), Metadata.builder() .setResourceId(testPath("renameFileB")) .setIsReadSeekEfficient(true) .setSizeBytes("testDataB".getBytes(StandardCharsets.UTF_8).length) + .setLastModifiedMillis(lastModified("renameFileB")) .build())); // ensure files exist @@ -377,6 +384,13 @@ private byte[] read(String relativePath, long bytesToSkip) throws Exception { } } + private long lastModified(String relativePath) throws Exception { + return fileSystem + .fileSystem + .getFileStatus(testPath(relativePath).toPath()) + .getModificationTime(); + } + private HadoopResourceId testPath(String relativePath) { return new HadoopResourceId(hdfsClusterBaseUri.resolve(relativePath)); } From 9fdb9a511b017ef4c216f9550d993d5d180af277 Mon Sep 17 00:00:00 2001 From: Jeff Klukas Date: Wed, 26 Dec 2018 16:33:55 -0500 Subject: [PATCH 2/7] Add MetadataCoderV2 for encoding lastModifiedMillis --- .../apache/beam/sdk/io/fs/MatchResult.java | 14 +++-- .../apache/beam/sdk/io/fs/MetadataCoder.java | 19 +++++-- .../beam/sdk/io/fs/MetadataCoderV2.java | 56 +++++++++++++++++++ 3 files changed, 80 insertions(+), 9 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java index 91b0bdab1b55..3beafe2c2f68 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java @@ -82,10 +82,16 @@ public abstract static class Metadata implements Serializable { public abstract long sizeBytes(); - public abstract long lastModifiedMillis(); - public abstract boolean isReadSeekEfficient(); + /** + * Last modification timestamp in milliseconds since Unix epoch. + * + *

Note that this field is not encoded with the default {@link MetadataCoder} due to a need + * for compatibility with previous versions of the Beam SDK. + */ + public abstract long lastModifiedMillis(); + public static Builder builder() { return new AutoValue_MatchResult_Metadata.Builder(); } @@ -97,10 +103,10 @@ public abstract static class Builder { public abstract Builder setSizeBytes(long value); - public abstract Builder setLastModifiedMillis(long value); - public abstract Builder setIsReadSeekEfficient(boolean value); + public abstract Builder setLastModifiedMillis(long value); + public abstract Metadata build(); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java index 0998224fb217..3095d808b643 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java @@ -26,8 +26,16 @@ import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -/** A {@link Coder} for {@link Metadata}. */ +/** + * A {@link Coder} for {@link Metadata}. + * + *

The {@link Metadata#lastModifiedMillis()} field was added after this coder was already + * deployed, so this class decodes a default value for backwards compatibility. See {@link + * MetadataCoderV2} for retaining timestamp information. + */ public class MetadataCoder extends AtomicCoder { + public static final long UNKNOWN_LAST_MODIFIED_MILLIS = -1L; + private static final ResourceIdCoder RESOURCE_ID_CODER = ResourceIdCoder.of(); private static final VarIntCoder INT_CODER = VarIntCoder.of(); private static final VarLongCoder LONG_CODER = VarLongCoder.of(); @@ -42,21 +50,22 @@ public void encode(Metadata value, OutputStream os) throws IOException { RESOURCE_ID_CODER.encode(value.resourceId(), os); INT_CODER.encode(value.isReadSeekEfficient() ? 1 : 0, os); LONG_CODER.encode(value.sizeBytes(), os); - LONG_CODER.encode(value.lastModifiedMillis(), os); } @Override public Metadata decode(InputStream is) throws IOException { + return decodeBuilder(is).build(); + } + + Metadata.Builder decodeBuilder(InputStream is) throws IOException { ResourceId resourceId = RESOURCE_ID_CODER.decode(is); boolean isReadSeekEfficient = INT_CODER.decode(is) == 1; long sizeBytes = LONG_CODER.decode(is); - long lastModifiedMillis = LONG_CODER.decode(is); return Metadata.builder() .setResourceId(resourceId) .setIsReadSeekEfficient(isReadSeekEfficient) .setSizeBytes(sizeBytes) - .setLastModifiedMillis(lastModifiedMillis) - .build(); + .setLastModifiedMillis(UNKNOWN_LAST_MODIFIED_MILLIS); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java new file mode 100644 index 000000000000..4eef169d6bed --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata.Builder; + +/** A {@link Coder} for {@link Metadata} that includes {@link Metadata#lastModifiedMillis()}. */ +public class MetadataCoderV2 extends AtomicCoder { + private static final MetadataCoder V1_CODER = MetadataCoder.of(); + private static final VarLongCoder LONG_CODER = VarLongCoder.of(); + + /** Creates a {@link MetadataCoder}. */ + public static MetadataCoder of() { + return new MetadataCoder(); + } + + @Override + public void encode(Metadata value, OutputStream os) throws IOException { + V1_CODER.encode(value, os); + LONG_CODER.encode(value.lastModifiedMillis(), os); + } + + @Override + public Metadata decode(InputStream is) throws IOException { + Builder builder = V1_CODER.decodeBuilder(is); + long lastModifiedMillis = LONG_CODER.decode(is); + return builder.setLastModifiedMillis(lastModifiedMillis).build(); + } + + @Override + public boolean consistentWithEquals() { + return true; + } +} From caa4f64152eac6280516df7e95d99cc31a746e54 Mon Sep 17 00:00:00 2001 From: Jeff Klukas Date: Wed, 16 Jan 2019 12:33:46 -0500 Subject: [PATCH 3/7] Mark new Metadata field and coder @Experimental --- .../src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java | 2 ++ .../main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java index 3beafe2c2f68..8365e183489b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.io.FileSystems; /** The result of {@link org.apache.beam.sdk.io.FileSystem#match}. */ @@ -90,6 +91,7 @@ public abstract static class Metadata implements Serializable { *

Note that this field is not encoded with the default {@link MetadataCoder} due to a need * for compatibility with previous versions of the Beam SDK. */ + @Experimental public abstract long lastModifiedMillis(); public static Builder builder() { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java index 4eef169d6bed..6072fac87f31 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarLongCoder; @@ -27,6 +28,7 @@ import org.apache.beam.sdk.io.fs.MatchResult.Metadata.Builder; /** A {@link Coder} for {@link Metadata} that includes {@link Metadata#lastModifiedMillis()}. */ +@Experimental public class MetadataCoderV2 extends AtomicCoder { private static final MetadataCoder V1_CODER = MetadataCoder.of(); private static final VarLongCoder LONG_CODER = VarLongCoder.of(); From b7df051a1109db11ee3811ed6059860c17dcc26a Mon Sep 17 00:00:00 2001 From: Jeff Klukas Date: Wed, 16 Jan 2019 13:07:20 -0500 Subject: [PATCH 4/7] Document an example of explicitly setting the Metadata coder --- .../java/org/apache/beam/sdk/io/fs/MatchResult.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java index 8365e183489b..d24b85bd4a31 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java @@ -89,7 +89,18 @@ public abstract static class Metadata implements Serializable { * Last modification timestamp in milliseconds since Unix epoch. * *

Note that this field is not encoded with the default {@link MetadataCoder} due to a need - * for compatibility with previous versions of the Beam SDK. + * for compatibility with previous versions of the Beam SDK. If you want to rely on {@code + * lastModifiedMillis} values, be sure to explicitly set the coder to {@link MetadataCoderV2}. + * + *

The following example sets the coder explicitly and accesses {@code lastModifiedMillis} to + * set record timestamps: + * + *

{@code
+     * PCollection metadataWithTimestamp = p
+     *     .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
+     *     .setCoder(MetadataCoderV2.of())
+     *     .apply(WithTimestamps.of(metadata -> new Instant(metadata.lastModifiedMillis())));
+     * }
*/ @Experimental public abstract long lastModifiedMillis(); From 4c7a5081c3c2974b257b40826e1e544c82275617 Mon Sep 17 00:00:00 2001 From: Jeff Klukas Date: Thu, 17 Jan 2019 10:46:58 -0500 Subject: [PATCH 5/7] Have both MetadataCoder variants return singletons --- .../java/org/apache/beam/sdk/io/fs/MetadataCoder.java | 7 +++++-- .../java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java | 9 ++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java index 3095d808b643..cd770d5a87c6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java @@ -36,13 +36,16 @@ public class MetadataCoder extends AtomicCoder { public static final long UNKNOWN_LAST_MODIFIED_MILLIS = -1L; + private static final MetadataCoder INSTANCE = new MetadataCoder(); private static final ResourceIdCoder RESOURCE_ID_CODER = ResourceIdCoder.of(); private static final VarIntCoder INT_CODER = VarIntCoder.of(); private static final VarLongCoder LONG_CODER = VarLongCoder.of(); - /** Creates a {@link MetadataCoder}. */ + private MetadataCoder() {} + + /** Returns the singleton {@link MetadataCoder} instance. */ public static MetadataCoder of() { - return new MetadataCoder(); + return INSTANCE; } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java index 6072fac87f31..4e164d538870 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoderV2.java @@ -30,12 +30,15 @@ /** A {@link Coder} for {@link Metadata} that includes {@link Metadata#lastModifiedMillis()}. */ @Experimental public class MetadataCoderV2 extends AtomicCoder { + private static final MetadataCoderV2 INSTANCE = new MetadataCoderV2(); private static final MetadataCoder V1_CODER = MetadataCoder.of(); private static final VarLongCoder LONG_CODER = VarLongCoder.of(); - /** Creates a {@link MetadataCoder}. */ - public static MetadataCoder of() { - return new MetadataCoder(); + private MetadataCoderV2() {} + + /** Returns the singleton {@link MetadataCoderV2} instance. */ + public static MetadataCoderV2 of() { + return INSTANCE; } @Override From 3b4bbebf59d693e39676d7ccb2968c3f9640770b Mon Sep 17 00:00:00 2001 From: Jeff Klukas Date: Thu, 17 Jan 2019 11:35:56 -0500 Subject: [PATCH 6/7] Add tests for MetadataCoder and MetadataCoderV2 --- .../beam/sdk/io/fs/MetadataCoderTest.java | 66 +++++++++++++++++++ .../beam/sdk/io/fs/MetadataCoderV2Test.java | 65 ++++++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderTest.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderV2Test.java diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderTest.java new file mode 100644 index 000000000000..207bf547c868 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +import java.nio.file.Path; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.testing.CoderProperties; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** Tests for {@link MetadataCoder}. */ +public class MetadataCoderTest { + + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testEncodeDecodeWithDefaultLastModifiedMills() throws Exception { + Path filePath = tmpFolder.newFile("somefile").toPath(); + Metadata metadata = + Metadata.builder() + .setResourceId( + FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */)) + .setIsReadSeekEfficient(true) + .setSizeBytes(1024) + .setLastModifiedMillis(MetadataCoder.UNKNOWN_LAST_MODIFIED_MILLIS) + .build(); + CoderProperties.coderDecodeEncodeEqual(MetadataCoder.of(), metadata); + } + + @Test(expected = AssertionError.class) + public void testEncodeDecodeWithCustomLastModifiedMills() throws Exception { + Path filePath = tmpFolder.newFile("somefile").toPath(); + Metadata metadata = + Metadata.builder() + .setResourceId( + FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */)) + .setIsReadSeekEfficient(true) + .setSizeBytes(1024) + .setLastModifiedMillis(1541097000L) + .build(); + // This should throw because the decoded Metadata has default lastModifiedMills. + CoderProperties.coderDecodeEncodeEqual(MetadataCoder.of(), metadata); + } + + @Test + public void testCoderSerializable() { + CoderProperties.coderSerializable(MetadataCoder.of()); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderV2Test.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderV2Test.java new file mode 100644 index 000000000000..e3a49e737b62 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderV2Test.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +import java.nio.file.Path; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.testing.CoderProperties; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** Tests for {@link MetadataCoderV2}. */ +public class MetadataCoderV2Test { + + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testEncodeDecodeWithDefaultLastModifiedMills() throws Exception { + Path filePath = tmpFolder.newFile("somefile").toPath(); + Metadata metadata = + Metadata.builder() + .setResourceId( + FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */)) + .setIsReadSeekEfficient(true) + .setSizeBytes(1024) + .setLastModifiedMillis(MetadataCoder.UNKNOWN_LAST_MODIFIED_MILLIS) + .build(); + CoderProperties.coderDecodeEncodeEqual(MetadataCoderV2.of(), metadata); + } + + @Test + public void testEncodeDecodeWithCustomLastModifiedMills() throws Exception { + Path filePath = tmpFolder.newFile("somefile").toPath(); + Metadata metadata = + Metadata.builder() + .setResourceId( + FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */)) + .setIsReadSeekEfficient(true) + .setSizeBytes(1024) + .setLastModifiedMillis(1541097000L) + .build(); + CoderProperties.coderDecodeEncodeEqual(MetadataCoderV2.of(), metadata); + } + + @Test + public void testCoderSerializable() { + CoderProperties.coderSerializable(MetadataCoderV2.of()); + } +} From 6bc7794f0dedd9e97bfdfadb64d78c8e2f99b0ab Mon Sep 17 00:00:00 2001 From: Jeff Klukas Date: Thu, 17 Jan 2019 12:33:28 -0500 Subject: [PATCH 7/7] Set lastModifiedMillis default in Metadata rather than MetadataCoder --- .../main/java/org/apache/beam/sdk/io/fs/MatchResult.java | 8 +++++++- .../java/org/apache/beam/sdk/io/fs/MetadataCoder.java | 5 +---- .../java/org/apache/beam/sdk/io/fs/MetadataCoderTest.java | 1 - .../org/apache/beam/sdk/io/fs/MetadataCoderV2Test.java | 1 - 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java index d24b85bd4a31..143340f593d5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.fs; import com.google.auto.value.AutoValue; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.List; @@ -79,6 +80,8 @@ public static MatchResult unknown() { /** {@link Metadata} of a matched file. */ @AutoValue public abstract static class Metadata implements Serializable { + public static final long UNKNOWN_LAST_MODIFIED_MILLIS = 0L; + public abstract ResourceId resourceId(); public abstract long sizeBytes(); @@ -91,6 +94,8 @@ public abstract static class Metadata implements Serializable { *

Note that this field is not encoded with the default {@link MetadataCoder} due to a need * for compatibility with previous versions of the Beam SDK. If you want to rely on {@code * lastModifiedMillis} values, be sure to explicitly set the coder to {@link MetadataCoderV2}. + * Otherwise, all instances will have the default value of 0, consistent with the behavior of + * {@link File#lastModified()}. * *

The following example sets the coder explicitly and accesses {@code lastModifiedMillis} to * set record timestamps: @@ -106,7 +111,8 @@ public abstract static class Metadata implements Serializable { public abstract long lastModifiedMillis(); public static Builder builder() { - return new AutoValue_MatchResult_Metadata.Builder(); + return new AutoValue_MatchResult_Metadata.Builder() + .setLastModifiedMillis(UNKNOWN_LAST_MODIFIED_MILLIS); } /** Builder class for {@link Metadata}. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java index cd770d5a87c6..65261e6ae630 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java @@ -34,8 +34,6 @@ * MetadataCoderV2} for retaining timestamp information. */ public class MetadataCoder extends AtomicCoder { - public static final long UNKNOWN_LAST_MODIFIED_MILLIS = -1L; - private static final MetadataCoder INSTANCE = new MetadataCoder(); private static final ResourceIdCoder RESOURCE_ID_CODER = ResourceIdCoder.of(); private static final VarIntCoder INT_CODER = VarIntCoder.of(); @@ -67,8 +65,7 @@ Metadata.Builder decodeBuilder(InputStream is) throws IOException { return Metadata.builder() .setResourceId(resourceId) .setIsReadSeekEfficient(isReadSeekEfficient) - .setSizeBytes(sizeBytes) - .setLastModifiedMillis(UNKNOWN_LAST_MODIFIED_MILLIS); + .setSizeBytes(sizeBytes); } @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderTest.java index 207bf547c868..47b7a314ec7d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderTest.java @@ -39,7 +39,6 @@ public void testEncodeDecodeWithDefaultLastModifiedMills() throws Exception { FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */)) .setIsReadSeekEfficient(true) .setSizeBytes(1024) - .setLastModifiedMillis(MetadataCoder.UNKNOWN_LAST_MODIFIED_MILLIS) .build(); CoderProperties.coderDecodeEncodeEqual(MetadataCoder.of(), metadata); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderV2Test.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderV2Test.java index e3a49e737b62..7527f9ffa602 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderV2Test.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/MetadataCoderV2Test.java @@ -39,7 +39,6 @@ public void testEncodeDecodeWithDefaultLastModifiedMills() throws Exception { FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */)) .setIsReadSeekEfficient(true) .setSizeBytes(1024) - .setLastModifiedMillis(MetadataCoder.UNKNOWN_LAST_MODIFIED_MILLIS) .build(); CoderProperties.coderDecodeEncodeEqual(MetadataCoderV2.of(), metadata); }