@@ -1010,7 +1010,6 @@ public static Sink sink(Schema schema) {
10101010 .setPageSize (ParquetWriter .DEFAULT_PAGE_SIZE )
10111011 .setEnableDictionary (ParquetWriter .DEFAULT_IS_DICTIONARY_ENABLED )
10121012 .setEnableBloomFilter (ParquetProperties .DEFAULT_BLOOM_FILTER_ENABLED )
1013- .setMinRowCountForPageSizeCheck (ParquetProperties .DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK )
10141013 .build ();
10151014 }
10161015
@@ -1032,7 +1031,7 @@ public abstract static class Sink implements FileIO.Sink<GenericRecord> {
10321031
10331032 abstract boolean getEnableBloomFilter ();
10341033
1035- abstract int getMinRowCountForPageSizeCheck ();
1034+ abstract @ Nullable ValueProvider < Integer > getMinRowCountForPageSizeCheck ();
10361035
10371036 abstract @ Nullable Class <? extends GenericData > getAvroDataModelClass ();
10381037
@@ -1054,7 +1053,8 @@ abstract static class Builder {
10541053
10551054 abstract Builder setEnableBloomFilter (boolean enableBloomFilter );
10561055
1057- abstract Builder setMinRowCountForPageSizeCheck (int minRowCountForPageSizeCheck );
1056+ abstract Builder setMinRowCountForPageSizeCheck (
1057+ ValueProvider <Integer > minRowCountForPageSizeCheck );
10581058
10591059 abstract Builder setAvroDataModelClass (Class <? extends GenericData > modelClass );
10601060
@@ -1109,6 +1109,24 @@ public Sink withBloomFilterEnabled(boolean enableBloomFilter) {
11091109 public Sink withMinRowCountForPageSizeCheck (int minRowCountForPageSizeCheck ) {
11101110 checkArgument (
11111111 minRowCountForPageSizeCheck > 0 , "minRowCountForPageSizeCheck must be positive" );
1112+ return toBuilder ()
1113+ .setMinRowCountForPageSizeCheck (
1114+ ValueProvider .StaticValueProvider .of (minRowCountForPageSizeCheck ))
1115+ .build ();
1116+ }
1117+
1118+ /**
1119+ * Like {@link #withMinRowCountForPageSizeCheck(int)}, but accepts a {@link ValueProvider} so
1120+ * the value can be supplied at runtime (required for classic Dataflow templates).
1121+ */
1122+ public Sink withMinRowCountForPageSizeCheck (
1123+ ValueProvider <Integer > minRowCountForPageSizeCheck ) {
1124+ checkNotNull (minRowCountForPageSizeCheck , "minRowCountForPageSizeCheck can not be null" );
1125+ if (minRowCountForPageSizeCheck .isAccessible ()) {
1126+ Integer value = minRowCountForPageSizeCheck .get ();
1127+ checkNotNull (value , "minRowCountForPageSizeCheck value cannot be null" );
1128+ checkArgument (value > 0 , "minRowCountForPageSizeCheck must be positive" );
1129+ }
11121130 return toBuilder ().setMinRowCountForPageSizeCheck (minRowCountForPageSizeCheck ).build ();
11131131 }
11141132
@@ -1141,8 +1159,16 @@ public void open(WritableByteChannel channel) throws IOException {
11411159 .withRowGroupSize (getRowGroupSize ())
11421160 .withPageSize (getPageSize ())
11431161 .withDictionaryEncoding (getEnableDictionary ())
1144- .withBloomFilterEnabled (getEnableBloomFilter ())
1145- .withMinRowCountForPageSizeCheck (getMinRowCountForPageSizeCheck ());
1162+ .withBloomFilterEnabled (getEnableBloomFilter ());
1163+
1164+ ValueProvider <Integer > minRowCountProvider = getMinRowCountForPageSizeCheck ();
1165+ if (minRowCountProvider != null ) {
1166+ Integer minRowCount = minRowCountProvider .get ();
1167+ if (minRowCount != null ) {
1168+ checkArgument (minRowCount > 0 , "minRowCountForPageSizeCheck must be positive" );
1169+ builder = builder .withMinRowCountForPageSizeCheck (minRowCount );
1170+ }
1171+ }
11461172
11471173 if (modelClass != null ) {
11481174 try {
0 commit comments