Skip to content

reuse container when reading parquet records#1522

Merged
rdblue merged 2 commits into
apache:masterfrom
chenjunjiedada:reuse-container-for-flink-parquet-reader
Oct 13, 2020
Merged

reuse container when reading parquet records#1522
rdblue merged 2 commits into
apache:masterfrom
chenjunjiedada:reuse-container-for-flink-parquet-reader

Conversation

@chenjunjiedada

Copy link
Copy Markdown
Contributor

This is for discussion here. I don't add the unit test here since the existing end to end unit tests should cover.

@chenjunjiedada

Copy link
Copy Markdown
Contributor Author

@rdblue , I see the spark parquet reader doesn't reuse the container while vectorized code path reuses the container. Any consideration on it?

@rdblue

rdblue commented Sep 30, 2020

Copy link
Copy Markdown
Contributor

I noticed this a few months ago and tested the performance of the two paths. I didn't really see a speed up in my initial test so I didn't change the default. We just need to evaluate the performance to know whether to do this.

@holdenk

holdenk commented Sep 30, 2020

Copy link
Copy Markdown
Contributor

Would the values produced by the ParquetReader iterable ever be returned directly to Spark? Say in the distributed planning that folks are considering? Because if so we should check to make sure the code path is one of the ones in Spark where it's ok.

@rdblue

rdblue commented Sep 30, 2020

Copy link
Copy Markdown
Contributor

Would the values produced by the ParquetReader iterable ever be returned directly to Spark?

Yes, we return an iterator with the reused containers. I believe that this is okay because Spark generally converts to unsafe immediately. What code paths can't handle reused containers?

@chenjunjiedada

chenjunjiedada commented Oct 12, 2020

Copy link
Copy Markdown
Contributor Author

I executed some spark jmh cases with NUM_RECORDS = 5000000 (I didn't use 10000000 in current code, because that causes OOM on my machine with jmh jvmArgs=-Xmx4096m, not sure how we passed before.), the results are shown as following:

when not reuse the container

Benchmark                                                                          Mode  Cnt  Score   Error  Units
SparkParquetReadersNestedDataBenchmark.readUsingIcebergReader                        ss    5  2.801 ?0.089   s/op
SparkParquetReadersNestedDataBenchmark.readUsingIcebergReaderUnsafe                  ss    5  3.383 ?0.090   s/op
SparkParquetReadersNestedDataBenchmark.readUsingSparkReader                          ss    5  4.353 ?0.162   s/op
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingIcebergReader          ss    5  1.488 ?0.051   s/op
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingIcebergReaderUnsafe    ss    5  1.886 ?0.250   s/op
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingSparkReader            ss    5  2.078 ?0.227   s/op

When reusing the container

Benchmark                                                                          Mode  Cnt  Score   Error  Units
SparkParquetReadersNestedDataBenchmark.readUsingIcebergReader                        ss    5  2.707 ?0.053   s/op
SparkParquetReadersNestedDataBenchmark.readUsingIcebergReaderUnsafe                  ss    5  3.149 ?0.144   s/op
SparkParquetReadersNestedDataBenchmark.readUsingSparkReader                          ss    5  4.344 ?0.168   s/op
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingIcebergReader          ss    5  1.360 ?0.155   s/op
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingIcebergReaderUnsafe    ss    5  1.863 ?0.180   s/op
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingSparkReader            ss    5  2.072 ?0.147   s/op

It shows slight benefit when reusing the container. @rdblue, Does that make sense for spark side change? I will try to write some jmh benchmark for Flink input format and try it again.

Plus I found two issues:

  1. The jmh benchmarks were moved to spark2 module while the comments haven't updated.
  2. The jmh benchmarks cases throw an exception like below:
org.apache.iceberg.exceptions.AlreadyExistsException: File already exists: /tmp/parquet-nested-data-benchmark3999980592702894424.parquet
        at org.apache.iceberg.Files$LocalOutputFile.create(Files.java:58)
        at org.apache.iceberg.parquet.ParquetIO$ParquetOutputFile.create(ParquetIO.java:148)
        at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:295)
        at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:283)
        at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
        at org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:265)
        at org.apache.iceberg.spark.data.parquet.SparkParquetReadersNestedDataBenchmark.setupBenchmark(SparkParquetReadersNestedDataBenchmark.java:102)
        at org.apache.iceberg.spark.data.parquet.generated.SparkParquetReadersNestedDataBenchmark_readUsingSparkReader_jmhTest._jmh_tryInit_f_sparkparquetreadersnesteddatabenchmark0_G(SparkParquetReadersNestedDataBenchmark_readUsingSparkReader_jmhTest.java:438)
        at org.apache.iceberg.spark.data.parquet.generated.SparkParquetReadersNestedDataBenchmark_readUsingSparkReader_jmhTest.readUsingSparkReader_SingleShotTime(SparkParquetReadersNestedDataBenchmark_readUsingSparkReader_jmhTest.java:363)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:453)
        at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:437)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

We need to delete the created temp file at first.

I will fix found issues tmr.

@rdblue

rdblue commented Oct 12, 2020

Copy link
Copy Markdown
Contributor

Thanks for running the benchmarks, @chenjunjiedada! I think it looks like we should commit this. @aokolnychyi, what do you think?

@rdblue

rdblue commented Oct 13, 2020

Copy link
Copy Markdown
Contributor

I'm restarting the failed JDK 8 tests because the failure was flaky Hive tests.

@rdblue rdblue merged commit 3ff9df3 into apache:master Oct 13, 2020
@rdblue

rdblue commented Oct 13, 2020

Copy link
Copy Markdown
Contributor

Yes, we return an iterator with the reused containers. I believe that this is okay because Spark generally converts to unsafe immediately. What code paths can't handle reused containers?

Following up on this, it is safe for Spark in the v2 path because Spark ensures that there is a projection that converts to unsafe rows. That's because some Spark exec nodes expect unsafe.

@rdblue rdblue added this to the Java 0.10.0 Release milestone Nov 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants