From a3c37697cb6caee626e8997cfc1cd05aa7e6c7b5 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 20 Feb 2023 13:39:50 +0800 Subject: [PATCH 1/4] [SPARK-41952] Fix Parquet zstd off-heap memory leak as a workaround for PARQUET-2160 --- .../parquet/ParquetCodecFactory.java | 110 ++++++++++++++++++ .../SpecificParquetRecordReaderBase.java | 2 + 2 files changed, 112 insertions(+) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java new file mode 100644 index 0000000000000..f1547375887ee --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.codec.ZstandardCodec; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +/** + * This class implements a codec factory that is used when reading from Parquet. It adds a + * workaround for memory issues encountered when reading from zstd-compressed files. For + * details, see PARQUET-2160 + */ +public class ParquetCodecFactory extends CodecFactory { + + public ParquetCodecFactory(Configuration configuration, int pageSize) { + super(configuration, pageSize); + } + + /** + * Copied and modified from CodecFactory.HeapBytesDecompressor + */ + @SuppressWarnings("deprecation") + class HeapBytesDecompressor extends BytesDecompressor { + + private final CompressionCodec codec; + private final Decompressor decompressor; + + HeapBytesDecompressor(CompressionCodecName codecName) { + this.codec = getCodec(codecName); + if (codec != null) { + decompressor = CodecPool.getDecompressor(codec); + } else { + decompressor = null; + } + } + + @Override + public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + final BytesInput decompressed; + if (codec != null) { + if (decompressor != null) { + decompressor.reset(); + } + InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); + + if (codec instanceof ZstandardCodec) { + // We need to explicitly close the ZstdDecompressorStream here to release the resources it holds to avoid + // off-heap memory fragmentation issue, see https://issues.apache.org/jira/browse/PARQUET-2160. + // This change will load the decompressor stream into heap a little earlier, since the problem it solves + // only happens in the ZSTD codec, so this modification is only made for ZSTD streams. + decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize)); + is.close(); + } else { + decompressed = BytesInput.from(is, uncompressedSize); + } + } else { + decompressed = bytes; + } + return decompressed; + } + + @Override + public void decompress( + ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) + throws IOException { + ByteBuffer decompressed = decompress(BytesInput.from(input), uncompressedSize).toByteBuffer(); + output.put(decompressed); + } + + @Override + public void release() { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + } + } + } + + @Override + @SuppressWarnings("deprecation") + protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { + return new HeapBytesDecompressor(codecName); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 8cefa589c0ecb..678b287a5e3a6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -96,6 +96,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont ParquetReadOptions options = HadoopReadOptions .builder(configuration, file) .withRange(split.getStart(), split.getStart() + split.getLength()) + .withCodecFactory(new ParquetCodecFactory(configuration, 0)) .build(); ParquetFileReader fileReader = new ParquetFileReader( HadoopInputFile.fromPath(file, configuration), options); @@ -159,6 +160,7 @@ protected void initialize(String path, List columns) throws IOException ParquetReadOptions options = HadoopReadOptions .builder(config, file) .withRange(0, length) + .withCodecFactory(new ParquetCodecFactory(config, 0)) .build(); ParquetFileReader fileReader = ParquetFileReader.open( HadoopInputFile.fromPath(file, config), options); From 1391354610a7b1f8834d95ab04f8fcaddd89a234 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 20 Feb 2023 14:29:44 +0800 Subject: [PATCH 2/4] code style --- .../parquet/ParquetCodecFactory.java | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java index f1547375887ee..7e52f918bbe3b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java @@ -1,20 +1,18 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.spark.sql.execution.datasources.parquet; @@ -71,10 +69,11 @@ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOEx InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); if (codec instanceof ZstandardCodec) { - // We need to explicitly close the ZstdDecompressorStream here to release the resources it holds to avoid - // off-heap memory fragmentation issue, see https://issues.apache.org/jira/browse/PARQUET-2160. - // This change will load the decompressor stream into heap a little earlier, since the problem it solves - // only happens in the ZSTD codec, so this modification is only made for ZSTD streams. + // We need to explicitly close the ZstdDecompressorStream here to release the resources + // it holds to avoid off-heap memory fragmentation issue, see PARQUET-2160. + // This change will load the decompressor stream into heap a little earlier, since the + // problem it solves only happens in the ZSTD codec, so this modification is only made + // for ZSTD streams. decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize)); is.close(); } else { From 9ee8ba0db167e4538d18ac4b81f4ee8359abb9e0 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 20 Feb 2023 14:30:17 +0800 Subject: [PATCH 3/4] code style --- .../sql/execution/datasources/parquet/ParquetCodecFactory.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java index 7e52f918bbe3b..43c66d96a6180 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java @@ -89,7 +89,8 @@ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOEx public void decompress( ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) throws IOException { - ByteBuffer decompressed = decompress(BytesInput.from(input), uncompressedSize).toByteBuffer(); + ByteBuffer decompressed = + decompress(BytesInput.from(input), uncompressedSize).toByteBuffer(); output.put(decompressed); } From d2b84d0fd566f794d2060e7e4d1d7a30eca2ffcd Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 20 Feb 2023 16:05:47 +0800 Subject: [PATCH 4/4] nit --- .../sql/execution/datasources/parquet/ParquetCodecFactory.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java index 43c66d96a6180..2edbdc70da2fd 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java @@ -34,6 +34,8 @@ * This class implements a codec factory that is used when reading from Parquet. It adds a * workaround for memory issues encountered when reading from zstd-compressed files. For * details, see PARQUET-2160 + * + * TODO: Remove this workaround after upgrading Parquet which include PARQUET-2160. */ public class ParquetCodecFactory extends CodecFactory {