Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,7 @@ public void process(ProcessContext c) {
MatchResult.Metadata.builder()
.setResourceId(metadata.resourceId())
.setSizeBytes(metadata.sizeBytes())
.setLastModifiedMillis(metadata.lastModifiedMillis())
.setIsReadSeekEfficient(
metadata.isReadSeekEfficient() && compression == Compression.UNCOMPRESSED)
.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ private Metadata toMetadata(File file) {
.setResourceId(LocalResourceId.fromPath(file.toPath(), file.isDirectory()))
.setIsReadSeekEfficient(true)
.setSizeBytes(file.length())
.setLastModifiedMillis(file.lastModified())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.beam.sdk.io.fs;

import com.google.auto.value.AutoValue;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.FileSystems;

/** The result of {@link org.apache.beam.sdk.io.FileSystem#match}. */
Expand Down Expand Up @@ -78,14 +80,39 @@ public static MatchResult unknown() {
/** {@link Metadata} of a matched file. */
@AutoValue
public abstract static class Metadata implements Serializable {
public static final long UNKNOWN_LAST_MODIFIED_MILLIS = 0L;

public abstract ResourceId resourceId();

public abstract long sizeBytes();

public abstract boolean isReadSeekEfficient();

/**
* Last modification timestamp in milliseconds since Unix epoch.
*
* <p>Note that this field is not encoded with the default {@link MetadataCoder} due to a need
* for compatibility with previous versions of the Beam SDK. If you want to rely on {@code
* lastModifiedMillis} values, be sure to explicitly set the coder to {@link MetadataCoderV2}.
* Otherwise, all instances will have the default value of 0, consistent with the behavior of
* {@link File#lastModified()}.
*
* <p>The following example sets the coder explicitly and accesses {@code lastModifiedMillis} to
* set record timestamps:
*
* <pre>{@code
* PCollection<Metadata> metadataWithTimestamp = p
* .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
* .setCoder(MetadataCoderV2.of())
* .apply(WithTimestamps.of(metadata -> new Instant(metadata.lastModifiedMillis())));
* }</pre>
*/
@Experimental
public abstract long lastModifiedMillis();

public static Builder builder() {
return new AutoValue_MatchResult_Metadata.Builder();
return new AutoValue_MatchResult_Metadata.Builder()
.setLastModifiedMillis(UNKNOWN_LAST_MODIFIED_MILLIS);
}

/** Builder class for {@link Metadata}. */
Expand All @@ -97,6 +124,8 @@ public abstract static class Builder {

public abstract Builder setIsReadSeekEfficient(boolean value);

public abstract Builder setLastModifiedMillis(long value);

public abstract Metadata build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,24 @@
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;

/** A {@link Coder} for {@link Metadata}. */
/**
* A {@link Coder} for {@link Metadata}.
*
* <p>The {@link Metadata#lastModifiedMillis()} field was added after this coder was already
* deployed, so this class decodes a default value for backwards compatibility. See {@link
* MetadataCoderV2} for retaining timestamp information.
*/
public class MetadataCoder extends AtomicCoder<Metadata> {
private static final MetadataCoder INSTANCE = new MetadataCoder();
private static final ResourceIdCoder RESOURCE_ID_CODER = ResourceIdCoder.of();
private static final VarIntCoder INT_CODER = VarIntCoder.of();
private static final VarLongCoder LONG_CODER = VarLongCoder.of();

/** Creates a {@link MetadataCoder}. */
private MetadataCoder() {}

/** Returns the singleton {@link MetadataCoder} instance. */
public static MetadataCoder of() {
return new MetadataCoder();
return INSTANCE;
}

@Override
Expand All @@ -46,14 +55,17 @@ public void encode(Metadata value, OutputStream os) throws IOException {

@Override
public Metadata decode(InputStream is) throws IOException {
return decodeBuilder(is).build();
}

Metadata.Builder decodeBuilder(InputStream is) throws IOException {
ResourceId resourceId = RESOURCE_ID_CODER.decode(is);
boolean isReadSeekEfficient = INT_CODER.decode(is) == 1;
long sizeBytes = LONG_CODER.decode(is);
return Metadata.builder()
.setResourceId(resourceId)
.setIsReadSeekEfficient(isReadSeekEfficient)
.setSizeBytes(sizeBytes)
.build();
.setSizeBytes(sizeBytes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.fs;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata.Builder;

/** A {@link Coder} for {@link Metadata} that includes {@link Metadata#lastModifiedMillis()}. */
@Experimental
public class MetadataCoderV2 extends AtomicCoder<Metadata> {
private static final MetadataCoderV2 INSTANCE = new MetadataCoderV2();
private static final MetadataCoder V1_CODER = MetadataCoder.of();
private static final VarLongCoder LONG_CODER = VarLongCoder.of();

private MetadataCoderV2() {}

/** Returns the singleton {@link MetadataCoderV2} instance. */
public static MetadataCoderV2 of() {
return INSTANCE;
}

@Override
public void encode(Metadata value, OutputStream os) throws IOException {
V1_CODER.encode(value, os);
LONG_CODER.encode(value.lastModifiedMillis(), os);
}

@Override
public Metadata decode(InputStream is) throws IOException {
Builder builder = V1_CODER.decodeBuilder(is);
long lastModifiedMillis = LONG_CODER.decode(is);
return builder.setLastModifiedMillis(lastModifiedMillis).build();
}

@Override
public boolean consistentWithEquals() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.Writer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileTime;
import java.util.Arrays;
import java.util.List;
import java.util.zip.GZIPOutputStream;
Expand Down Expand Up @@ -79,24 +80,30 @@ public void testMatchAndMatchAll() throws IOException {
Path secondPath = tmpFolder.newFile("second").toPath();
int firstSize = 37;
int secondSize = 42;
long firstModified = 1541097000L;
long secondModified = 1541098000L;
Files.write(firstPath, new byte[firstSize]);
Files.write(secondPath, new byte[secondSize]);
Files.setLastModifiedTime(firstPath, FileTime.fromMillis(firstModified));
Files.setLastModifiedTime(secondPath, FileTime.fromMillis(secondModified));
MatchResult.Metadata firstMetadata = metadata(firstPath, firstSize, firstModified);
MatchResult.Metadata secondMetadata = metadata(secondPath, secondSize, secondModified);

PAssert.that(
p.apply(
"Match existing",
FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*")))
.containsInAnyOrder(metadata(firstPath, firstSize), metadata(secondPath, secondSize));
.containsInAnyOrder(firstMetadata, secondMetadata);
PAssert.that(
p.apply(
"Match existing with provider",
FileIO.match()
.filepattern(p.newProvider(tmpFolder.getRoot().getAbsolutePath() + "/*"))))
.containsInAnyOrder(metadata(firstPath, firstSize), metadata(secondPath, secondSize));
.containsInAnyOrder(firstMetadata, secondMetadata);
PAssert.that(
p.apply("Create existing", Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*"))
.apply("MatchAll existing", FileIO.matchAll()))
.containsInAnyOrder(metadata(firstPath, firstSize), metadata(secondPath, secondSize));
.containsInAnyOrder(firstMetadata, secondMetadata);

PAssert.that(
p.apply(
Expand Down Expand Up @@ -232,9 +239,18 @@ public void testMatchWatchForNewFiles() throws IOException, InterruptedException

List<MatchResult.Metadata> expected =
Arrays.asList(
metadata(basePath.resolve("first"), 42),
metadata(basePath.resolve("second"), 37),
metadata(basePath.resolve("third"), 99));
metadata(
basePath.resolve("first"),
42,
Files.getLastModifiedTime(basePath.resolve("first")).toMillis()),
metadata(
basePath.resolve("second"),
37,
Files.getLastModifiedTime(basePath.resolve("second")).toMillis()),
metadata(
basePath.resolve("third"),
99,
Files.getLastModifiedTime(basePath.resolve("third")).toMillis()));
PAssert.that(matchMetadata).containsInAnyOrder(expected);
PAssert.that(matchAllMetadata).containsInAnyOrder(expected);
p.run();
Expand Down Expand Up @@ -309,11 +325,12 @@ public void testRead() throws IOException {
p.run();
}

private static MatchResult.Metadata metadata(Path path, int size) {
private static MatchResult.Metadata metadata(Path path, int size, long lastModifiedMillis) {
return MatchResult.Metadata.builder()
.setResourceId(FileSystems.matchNewResource(path.toString(), false /* isDirectory */))
.setIsReadSeekEfficient(true)
.setSizeBytes(size)
.setLastModifiedMillis(lastModifiedMillis)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.fs;

import java.nio.file.Path;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.testing.CoderProperties;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/** Tests for {@link MetadataCoder}. */
public class MetadataCoderTest {

@Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();

@Test
public void testEncodeDecodeWithDefaultLastModifiedMills() throws Exception {
Path filePath = tmpFolder.newFile("somefile").toPath();
Metadata metadata =
Metadata.builder()
.setResourceId(
FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */))
.setIsReadSeekEfficient(true)
.setSizeBytes(1024)
.build();
CoderProperties.coderDecodeEncodeEqual(MetadataCoder.of(), metadata);
}

@Test(expected = AssertionError.class)
public void testEncodeDecodeWithCustomLastModifiedMills() throws Exception {
Path filePath = tmpFolder.newFile("somefile").toPath();
Metadata metadata =
Metadata.builder()
.setResourceId(
FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */))
.setIsReadSeekEfficient(true)
.setSizeBytes(1024)
.setLastModifiedMillis(1541097000L)
.build();
// This should throw because the decoded Metadata has default lastModifiedMills.
CoderProperties.coderDecodeEncodeEqual(MetadataCoder.of(), metadata);
}

@Test
public void testCoderSerializable() {
CoderProperties.coderSerializable(MetadataCoder.of());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.fs;

import java.nio.file.Path;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.testing.CoderProperties;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/** Tests for {@link MetadataCoderV2}. */
public class MetadataCoderV2Test {

@Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();

@Test
public void testEncodeDecodeWithDefaultLastModifiedMills() throws Exception {
Path filePath = tmpFolder.newFile("somefile").toPath();
Metadata metadata =
Metadata.builder()
.setResourceId(
FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */))
.setIsReadSeekEfficient(true)
.setSizeBytes(1024)
.build();
CoderProperties.coderDecodeEncodeEqual(MetadataCoderV2.of(), metadata);
}

@Test
public void testEncodeDecodeWithCustomLastModifiedMills() throws Exception {
Path filePath = tmpFolder.newFile("somefile").toPath();
Metadata metadata =
Metadata.builder()
.setResourceId(
FileSystems.matchNewResource(filePath.toString(), false /* isDirectory */))
.setIsReadSeekEfficient(true)
.setSizeBytes(1024)
.setLastModifiedMillis(1541097000L)
.build();
CoderProperties.coderDecodeEncodeEqual(MetadataCoderV2.of(), metadata);
}

@Test
public void testCoderSerializable() {
CoderProperties.coderSerializable(MetadataCoderV2.of());
}
}
Loading