diff --git a/HDF5-Benchmark-Run-Scenarios.md b/HDF5-Benchmark-Run-Scenarios.md new file mode 100644 index 00000000000..afda95a45b6 --- /dev/null +++ b/HDF5-Benchmark-Run-Scenarios.md @@ -0,0 +1,433 @@ +# HDF5 Benchmark Run Scenarios + +This guide explains simple ways to run the test below: +```text +HDF5BenchmarkTest#benchmarkHDF5ReadWrite +``` + +You can find the test file under this path: +```text +src/test/java/org/apache/sysds/test/functions/io/hdf5/HDF5BenchmarkTest.java +``` + +The benchmark is disabled by default. Every scenario must include: +```text +-Dsysds.test.hdf5.benchmark=true +``` + +I've provisioned 2 data profiles for which you can run the test: +```text +dense_double_only +sparse_like_double +``` + +For each of data profile above, the test checks below actions: +```text +raw_write +raw_read +hdf5_write +seq hdf5_read +par hdf5_read +``` + +After running the tests successfully, you can find the results under the below path: +```text +target/hdf5-benchmark-results.csv +target/hdf5-benchmark-results.json +``` + +For better analysis of performance, on each row of the results file, you can check rows with: +```json +"is_warmup": false +``` + +--- + +## Scenario 1: Quick test + +### What it does + +Runs the benchmark with the default small matrix size. + +Default size: +```text +10000 x 100 +``` + +### Why this Scenario? + +I chose this scenario just to make sure that the test compiles, HDF5 files can be written/read, and CSV/JSON output is created. + +### Windows PowerShell + +```powershell +mvn ` + "-Dhadoop.home.dir=$hadoopHome" ` + "-Dtest=HDF5BenchmarkTest#benchmarkHDF5ReadWrite" ` + "-Dsysds.test.hdf5.benchmark=true" ` + "-Dsysds.test.hdf5.keep.files=true" ` + test +``` + +### macOS + +```bash +mvn \ + -Dtest=HDF5BenchmarkTest#benchmarkHDF5ReadWrite \ + -Dsysds.test.hdf5.benchmark=true \ + -Dsysds.test.hdf5.keep.files=true \ + test +``` + +### Expected result +The test should pass and create below files: +```text +target/hdf5-benchmark-results.csv +target/hdf5-benchmark-results.json +``` + +--- + +## Scenario 2: Full baseline benchmark +### What it does +Runs the main benchmark size used for performance comparison. + +Size: +```text +100000 x 100 +``` +The sparse-like data profile is created as below: +```text +1 nonzero per row = 1% logical density +``` + +### Why this scenario? +I defined this test as the standard benchmark run for comparing raw I/O, HDF5 write, sequential HDF5 read, and parallel HDF5 read. + +### Windows PowerShell +```powershell +mvn ` + "-Dhadoop.home.dir=$hadoopHome" ` + "-Dtest=HDF5BenchmarkTest#benchmarkHDF5ReadWrite" ` + "-Dsysds.test.hdf5.benchmark=true" ` + "-Dsysds.test.hdf5.rows=100000" ` + "-Dsysds.test.hdf5.cols=100" ` + "-Dsysds.test.hdf5.block.size=1024" ` + "-Dsysds.test.hdf5.sparse.nnz.per.row=1" ` + "-Dsysds.test.hdf5.warmup.reps=1" ` + "-Dsysds.test.hdf5.measure.reps=3" ` + "-Dsysds.hdf5.read.parallel.threads=4" ` + "-Dsysds.hdf5.read.parallel.min.bytes=67108864" ` + "-Dsysds.test.hdf5.keep.files=true" ` + test +``` + +### macOS +```bash +mvn \ + -Dtest=HDF5BenchmarkTest#benchmarkHDF5ReadWrite \ + -Dsysds.test.hdf5.benchmark=true \ + -Dsysds.test.hdf5.rows=100000 \ + -Dsysds.test.hdf5.cols=100 \ + -Dsysds.test.hdf5.block.size=1024 \ + -Dsysds.test.hdf5.sparse.nnz.per.row=1 \ + -Dsysds.test.hdf5.warmup.reps=1 \ + -Dsysds.test.hdf5.measure.reps=3 \ + -Dsysds.hdf5.read.parallel.threads=4 \ + -Dsysds.hdf5.read.parallel.min.bytes=67108864 \ + -Dsysds.test.hdf5.keep.files=true \ + test +``` + +### Expected result +The output should have 40 result rows: +```text +2 profiles x 5 operations x (1 warmup + 3 measured reps) +``` + +--- + +## Scenario 3: Full baseline without keeping temporary files + +### What it does +Runs the baseline benchmark and deletes temporary raw/HDF5 work files after success. + +### Why this scenario? +When you only need the CSV/JSON output and do not want to keep the temporary HDF5 files. + +### Windows PowerShell +```powershell +mvn ` + "-Dhadoop.home.dir=$hadoopHome" ` + "-Dtest=HDF5BenchmarkTest#benchmarkHDF5ReadWrite" ` + "-Dsysds.test.hdf5.benchmark=true" ` + "-Dsysds.test.hdf5.rows=100000" ` + "-Dsysds.test.hdf5.cols=100" ` + "-Dsysds.test.hdf5.keep.files=false" ` + test +``` + +### macOS +```bash +mvn \ + -Dtest=HDF5BenchmarkTest#benchmarkHDF5ReadWrite \ + -Dsysds.test.hdf5.benchmark=true \ + -Dsysds.test.hdf5.rows=100000 \ + -Dsysds.test.hdf5.cols=100 \ + -Dsysds.test.hdf5.keep.files=false \ + test +``` + +### Expected result +CSV/JSON remain in: +```text +target/ +``` + +Temporary files under: +```text +target/hdf5-benchmark-work-... +``` +are deleted. + +Note for Windows: if cleanup fails because an HDF5 file is still locked, run with: +```text +-Dsysds.test.hdf5.keep.files=true +``` + +--- + +## Scenario 4: Skip nonzero counting during read +### What it does +Runs the same benchmark but sets: +```text +-Dsysds.hdf5.read.skip.nnz=true +``` + +### Why this scenario? +Use this to check how much read time is affected by nonzero counting. + +### Windows PowerShell +```powershell +mvn ` + "-Dhadoop.home.dir=$hadoopHome" ` + "-Dtest=HDF5BenchmarkTest#benchmarkHDF5ReadWrite" ` + "-Dsysds.test.hdf5.benchmark=true" ` + "-Dsysds.test.hdf5.rows=100000" ` + "-Dsysds.test.hdf5.cols=100" ` + "-Dsysds.test.hdf5.sparse.nnz.per.row=1" ` + "-Dsysds.hdf5.read.parallel.threads=4" ` + "-Dsysds.hdf5.read.parallel.min.bytes=67108864" ` + "-Dsysds.hdf5.read.skip.nnz=true" ` + "-Dsysds.test.hdf5.keep.files=true" ` + test +``` + +### macOS +```bash +mvn \ + -Dtest=HDF5BenchmarkTest#benchmarkHDF5ReadWrite \ + -Dsysds.test.hdf5.benchmark=true \ + -Dsysds.test.hdf5.rows=100000 \ + -Dsysds.test.hdf5.cols=100 \ + -Dsysds.test.hdf5.sparse.nnz.per.row=1 \ + -Dsysds.hdf5.read.parallel.threads=4 \ + -Dsysds.hdf5.read.parallel.min.bytes=67108864 \ + -Dsysds.hdf5.read.skip.nnz=true \ + -Dsysds.test.hdf5.keep.files=true \ + test +``` + +--- + +## Scenario 5: Disable mmap for diagnostic comparison +### What it does +Runs the benchmark with: +```text +-Dsysds.hdf5.read.mmap=false +``` + +### Why this scenario? +This is only for diagnostic purposes.. It helps check whether memory-mapped reading affects parallel read performance or Windows file locking. + +### Windows PowerShell +```powershell +mvn ` + "-Dhadoop.home.dir=$hadoopHome" ` + "-Dtest=HDF5BenchmarkTest#benchmarkHDF5ReadWrite" ` + "-Dsysds.test.hdf5.benchmark=true" ` + "-Dsysds.test.hdf5.rows=100000" ` + "-Dsysds.test.hdf5.cols=100" ` + "-Dsysds.hdf5.read.parallel.threads=4" ` + "-Dsysds.hdf5.read.parallel.min.bytes=67108864" ` + "-Dsysds.hdf5.read.mmap=false" ` + "-Dsysds.test.hdf5.keep.files=true" ` + test +``` + +### macOS +```bash +mvn \ + -Dtest=HDF5BenchmarkTest#benchmarkHDF5ReadWrite \ + -Dsysds.test.hdf5.benchmark=true \ + -Dsysds.test.hdf5.rows=100000 \ + -Dsysds.test.hdf5.cols=100 \ + -Dsysds.hdf5.read.parallel.threads=4 \ + -Dsysds.hdf5.read.parallel.min.bytes=67108864 \ + -Dsysds.hdf5.read.mmap=false \ + -Dsysds.test.hdf5.keep.files=true \ + test +``` + +--- + +## Scenario 6: Multi-thread Run +### What it does +Runs the same test several times with different parallel read thread counts: +```text +1, 2, 4, 8 +``` + +### Why this scenario? +Use this to see where parallel HDF5 read stops improving. +It is done by calling Maven multiple times. + +### Windows PowerShell +```powershell +foreach ($threads in 1,2,4,8) { + Write-Host "Running HDF5 benchmark with threads=$threads" + + mvn ` + "-Dhadoop.home.dir=$hadoopHome" ` + "-Dtest=HDF5BenchmarkTest#benchmarkHDF5ReadWrite" ` + "-Dsysds.test.hdf5.benchmark=true" ` + "-Dsysds.test.hdf5.rows=100000" ` + "-Dsysds.test.hdf5.cols=100" ` + "-Dsysds.test.hdf5.sparse.nnz.per.row=1" ` + "-Dsysds.test.hdf5.warmup.reps=1" ` + "-Dsysds.test.hdf5.measure.reps=3" ` + "-Dsysds.hdf5.read.parallel.threads=$threads" ` + "-Dsysds.hdf5.read.parallel.min.bytes=67108864" ` + "-Dsysds.test.hdf5.keep.files=true" ` + test + + Copy-Item ".\target\hdf5-benchmark-results.json" ".\target\hdf5-benchmark-results-threads-$threads.json" -Force + Copy-Item ".\target\hdf5-benchmark-results.csv" ".\target\hdf5-benchmark-results-threads-$threads.csv" -Force +} +``` + +### macOS / Linux +```bash +for threads in 1 2 4 8; do + echo "Running HDF5 benchmark with threads=$threads" + + mvn \ + -Dtest=HDF5BenchmarkTest#benchmarkHDF5ReadWrite \ + -Dsysds.test.hdf5.benchmark=true \ + -Dsysds.test.hdf5.rows=100000 \ + -Dsysds.test.hdf5.cols=100 \ + -Dsysds.test.hdf5.sparse.nnz.per.row=1 \ + -Dsysds.test.hdf5.warmup.reps=1 \ + -Dsysds.test.hdf5.measure.reps=3 \ + -Dsysds.hdf5.read.parallel.threads=$threads \ + -Dsysds.hdf5.read.parallel.min.bytes=67108864 \ + -Dsysds.test.hdf5.keep.files=true \ + test + + cp target/hdf5-benchmark-results.json target/hdf5-benchmark-results-threads-$threads.json + cp target/hdf5-benchmark-results.csv target/hdf5-benchmark-results-threads-$threads.csv +done +``` + +### Expected result +The copied outputs are stored as: +```text +target/hdf5-benchmark-results-threads-1.json +target/hdf5-benchmark-results-threads-2.json +target/hdf5-benchmark-results-threads-4.json +target/hdf5-benchmark-results-threads-8.json +``` +and matching `.csv` files. + +--- + +## Scenario 7: Change sparse density +### What it does +Runs the sparse-like data profile with more nonzeros per row. + +Example: +```text +5 nonzeros per row / 100 columns = 5% logical density +``` + +### Why this scenario? +It's there to check how logical sparsity affects HDF5 file size, write time, and sparse read time. + +### Windows PowerShell +```powershell +mvn ` + "-Dhadoop.home.dir=$hadoopHome" ` + "-Dtest=HDF5BenchmarkTest#benchmarkHDF5ReadWrite" ` + "-Dsysds.test.hdf5.benchmark=true" ` + "-Dsysds.test.hdf5.rows=100000" ` + "-Dsysds.test.hdf5.cols=100" ` + "-Dsysds.test.hdf5.sparse.nnz.per.row=5" ` + "-Dsysds.hdf5.read.parallel.threads=4" ` + "-Dsysds.test.hdf5.keep.files=true" ` + test +``` + +### macOS / Linux +```bash +mvn \ + -Dtest=HDF5BenchmarkTest#benchmarkHDF5ReadWrite \ + -Dsysds.test.hdf5.benchmark=true \ + -Dsysds.test.hdf5.rows=100000 \ + -Dsysds.test.hdf5.cols=100 \ + -Dsysds.test.hdf5.sparse.nnz.per.row=5 \ + -Dsysds.hdf5.read.parallel.threads=4 \ + -Dsysds.test.hdf5.keep.files=true \ + test +``` + +--- + +## Scenario 8: Change raw I/O buffer size +### What it does +Changes the Java stream buffer used by the raw byte baseline. + +Example: +```text +-Dsysds.test.hdf5.raw.buffer.bytes=1048576 +``` + +### Why run it +Only to check the raw I/O baseline. It does not change HDF5 writer/reader logic. + +### Windows PowerShell +```powershell +mvn ` + "-Dhadoop.home.dir=$hadoopHome" ` + "-Dtest=HDF5BenchmarkTest#benchmarkHDF5ReadWrite" ` + "-Dsysds.test.hdf5.benchmark=true" ` + "-Dsysds.test.hdf5.rows=100000" ` + "-Dsysds.test.hdf5.cols=100" ` + "-Dsysds.test.hdf5.raw.buffer.bytes=1048576" ` + "-Dsysds.test.hdf5.keep.files=true" ` + test +``` + +### macOS / Linux +```bash +mvn \ + -Dtest=HDF5BenchmarkTest#benchmarkHDF5ReadWrite \ + -Dsysds.test.hdf5.benchmark=true \ + -Dsysds.test.hdf5.rows=100000 \ + -Dsysds.test.hdf5.cols=100 \ + -Dsysds.test.hdf5.raw.buffer.bytes=1048576 \ + -Dsysds.test.hdf5.keep.files=true \ + test +``` + +### Expected result +Only raw baseline timings should be directly affected. diff --git a/Parquet-Benchmark-Run-Scenarios.md b/Parquet-Benchmark-Run-Scenarios.md new file mode 100644 index 00000000000..8b3c6e4d9a5 --- /dev/null +++ b/Parquet-Benchmark-Run-Scenarios.md @@ -0,0 +1,366 @@ +# Parquet Benchmark Run Scenarios + +This guide explains simple ways to run the test below: +```text +ParquetBenchmarkTest#benchmarkParquetReadWrite +``` + +You can find the test file under this path: +```text +src/test/java/org/apache/sysds/test/functions/io/ParquetBenchmarkTest.java +``` + +The benchmark is disabled by default. Every scenario must include: +```text +-Dsysds.test.parquet.benchmark=true +``` + +After running the tests successfully, you can find the results under the below path: +```text +target/parquet-benchmark.csv +target/parquet-benchmark.json +``` + +For better analysis of performance, on each row of the results file, you can check rows with: +```json +"is_warmup": false +``` + +--- + +## Scenario 1: Quick test +### What it does +Runs a very small dense Double benchmark. + +Size: +```text +10000 x 20 +``` + +Profile: +```text +dense_double_only +``` + +### Why this scenario: +It's there to check that the test compiles, Parquet files can be written/read, and CSV/JSON output is created. + +### Windows PowerShell +```powershell +mvn ` + "-Dhadoop.home.dir=$hadoopHome" ` + "-Dtest=ParquetBenchmarkTest#benchmarkParquetReadWrite" ` + "-Dsysds.test.parquet.benchmark=true" ` + "-Dsysds.test.parquet.rows=10000" ` + "-Dsysds.test.parquet.cols=20" ` + "-Dsysds.test.parquet.warmup=0" ` + "-Dsysds.test.parquet.reps=1" ` + "-Dsysds.test.parquet.profiles=dense_double_only" ` + test +``` + +### macOS +```bash +mvn \ + -Dtest=ParquetBenchmarkTest#benchmarkParquetReadWrite \ + -Dsysds.test.parquet.benchmark=true \ + -Dsysds.test.parquet.rows=10000 \ + -Dsysds.test.parquet.cols=20 \ + -Dsysds.test.parquet.warmup=0 \ + -Dsysds.test.parquet.reps=1 \ + -Dsysds.test.parquet.profiles=dense_double_only \ + test +``` + +### Expected result +The test should pass and create: +```text +target/parquet-benchmark.csv +target/parquet-benchmark.json +``` + +Expected number of result rows: +```text +1 profile x 8 operations x 1 measured rep = 8 rows +``` + +--- + +## Scenario 2: Final profile benchmark +### What it does +Runs the default final Parquet benchmark across three data profiles: +```text +dense_double_only +mixed_schema +sparse_like_double +``` + +Default size: +```text +100000 x 50 +``` + +For each profile, the scenario runs: +```text +seq write +seq raw_io_read +seq footer_read +seq read +parallel write +parallel raw_io_read +parallel footer_read +parallel read +``` + +### Why this scenario +This is intended for a standard final Parquet benchmark. It gives evidence for dense numeric data, mixed type handling, and sparse-like numeric behavior. + +### Windows PowerShell +```powershell +mvn ` + "-Dhadoop.home.dir=$hadoopHome" ` + "-Dtest=ParquetBenchmarkTest#benchmarkParquetReadWrite" ` + "-Dsysds.test.parquet.benchmark=true" ` + "-Dsysds.test.parquet.rows=100000" ` + "-Dsysds.test.parquet.cols=50" ` + "-Dsysds.test.parquet.warmup=1" ` + "-Dsysds.test.parquet.reps=3" ` + test +``` + +### macOS +```bash +mvn \ + -Dtest=ParquetBenchmarkTest#benchmarkParquetReadWrite \ + -Dsysds.test.parquet.benchmark=true \ + -Dsysds.test.parquet.rows=100000 \ + -Dsysds.test.parquet.cols=50 \ + -Dsysds.test.parquet.warmup=1 \ + -Dsysds.test.parquet.reps=3 \ + test +``` + +### Expected result +Expected number of result rows: +```text +3 profiles x 8 operations x (1 warmup + 3 measured reps) = 96 rows +``` + +--- + +## Scenario 3: Dense Double baseline benchmark +### What it does +Runs a larger dense double-only benchmark. + +Size: +```text +200000 x 50 +``` + +Profile: +```text +dense_double_only +``` + +### Why this scenario +This will reproduce the earlier dense-only baseline used to compare sequential and current parallel Parquet read/write behavior. + +### Windows PowerShell +```powershell +mvn ` + "-Dhadoop.home.dir=$hadoopHome" ` + "-Dtest=ParquetBenchmarkTest#benchmarkParquetReadWrite" ` + "-Dsysds.test.parquet.benchmark=true" ` + "-Dsysds.test.parquet.rows=200000" ` + "-Dsysds.test.parquet.cols=50" ` + "-Dsysds.test.parquet.warmup=2" ` + "-Dsysds.test.parquet.reps=5" ` + "-Dsysds.test.parquet.profiles=dense_double_only" ` + test +``` + +### macOS +```bash +mvn \ + -Dtest=ParquetBenchmarkTest#benchmarkParquetReadWrite \ + -Dsysds.test.parquet.benchmark=true \ + -Dsysds.test.parquet.rows=200000 \ + -Dsysds.test.parquet.cols=50 \ + -Dsysds.test.parquet.warmup=2 \ + -Dsysds.test.parquet.reps=5 \ + -Dsysds.test.parquet.profiles=dense_double_only \ + test +``` + +### Expected result +Expected number of result rows: +```text +1 profile x 8 operations x (2 warmup + 5 measured reps) = 56 rows +``` + +--- + +## Scenario 4: Manual multipart parallel-reader experiment +### What it does +Manually creates Parquet input directories with multiple part files: +```text +2, 4, 8 part files +``` +Then it benchmarks the current parallel reader on these multi-file inputs. + +### Why this scenario +We wanna see whether the current parallel reader benefits from multiple input part files, independent of the current writer partitioning behavior. + +### Windows PowerShell +```powershell +mvn ` + "-Dhadoop.home.dir=$hadoopHome" ` + "-Dtest=ParquetBenchmarkTest#benchmarkParquetReadWrite" ` + "-Dsysds.test.parquet.benchmark=true" ` + "-Dsysds.test.parquet.rows=200000" ` + "-Dsysds.test.parquet.cols=50" ` + "-Dsysds.test.parquet.warmup=2" ` + "-Dsysds.test.parquet.reps=5" ` + "-Dsysds.test.parquet.profiles=dense_double_only" ` + "-Dsysds.test.parquet.manual.parts=2,4,8" ` + test +``` + +### macOS +```bash +mvn \ + -Dtest=ParquetBenchmarkTest#benchmarkParquetReadWrite \ + -Dsysds.test.parquet.benchmark=true \ + -Dsysds.test.parquet.rows=200000 \ + -Dsysds.test.parquet.cols=50 \ + -Dsysds.test.parquet.warmup=2 \ + -Dsysds.test.parquet.reps=5 \ + -Dsysds.test.parquet.profiles=dense_double_only \ + -Dsysds.test.parquet.manual.parts=2,4,8 \ + test +``` + +### Expected result +Expected number of result rows: +```text +Normal dense benchmark: 8 operations x 7 reps = 56 rows +Manual multipart: 3 part settings x 3 operations x 7 reps = 63 rows +Total = 119 rows +``` + +--- + +## Scenario 5: Run only one data profile +### What it does +Runs only one selected data profile, for example: +```text +mixed_schema +``` + +or: + +```text +sparse_like_double +``` + +### Why this scenario? +When we only want to inspect one behavior: mixed type handling or sparse-like compression/materialization. + +### Windows PowerShell: For mixed schema only +```powershell +mvn ` + "-Dhadoop.home.dir=$hadoopHome" ` + "-Dtest=ParquetBenchmarkTest#benchmarkParquetReadWrite" ` + "-Dsysds.test.parquet.benchmark=true" ` + "-Dsysds.test.parquet.rows=100000" ` + "-Dsysds.test.parquet.cols=50" ` + "-Dsysds.test.parquet.warmup=1" ` + "-Dsysds.test.parquet.reps=3" ` + "-Dsysds.test.parquet.profiles=mixed_schema" ` + test +``` + +### macOS: For mixed schema only +```bash +mvn \ + -Dtest=ParquetBenchmarkTest#benchmarkParquetReadWrite \ + -Dsysds.test.parquet.benchmark=true \ + -Dsysds.test.parquet.rows=100000 \ + -Dsysds.test.parquet.cols=50 \ + -Dsysds.test.parquet.warmup=1 \ + -Dsysds.test.parquet.reps=3 \ + -Dsysds.test.parquet.profiles=mixed_schema \ + test +``` + +### Windows PowerShell: For sparse-like only +```powershell +mvn ` + "-Dhadoop.home.dir=$hadoopHome" ` + "-Dtest=ParquetBenchmarkTest#benchmarkParquetReadWrite" ` + "-Dsysds.test.parquet.benchmark=true" ` + "-Dsysds.test.parquet.rows=100000" ` + "-Dsysds.test.parquet.cols=50" ` + "-Dsysds.test.parquet.warmup=1" ` + "-Dsysds.test.parquet.reps=3" ` + "-Dsysds.test.parquet.profiles=sparse_like_double" ` + test +``` + +### macOS: For sparse-like only +```bash +mvn \ + -Dtest=ParquetBenchmarkTest#benchmarkParquetReadWrite \ + -Dsysds.test.parquet.benchmark=true \ + -Dsysds.test.parquet.rows=100000 \ + -Dsysds.test.parquet.cols=50 \ + -Dsysds.test.parquet.warmup=1 \ + -Dsysds.test.parquet.reps=3 \ + -Dsysds.test.parquet.profiles=sparse_like_double \ + test +``` + +### Expected result +Expected number of result rows: +```text +1 profile x 8 operations x (1 warmup + 3 measured reps) = 32 rows +``` + +--- + +## Scenario 6: Change benchmark size +### What it does +Runs the same benchmark with a custom matrix/frame size. + +Example size: +```text +50000 x 100 +``` + +### Why this scenario? +Use this to check to see if results scale roughly with the number of cells and file size. + +### Windows PowerShell +```powershell +mvn ` + "-Dhadoop.home.dir=$hadoopHome" ` + "-Dtest=ParquetBenchmarkTest#benchmarkParquetReadWrite" ` + "-Dsysds.test.parquet.benchmark=true" ` + "-Dsysds.test.parquet.rows=50000" ` + "-Dsysds.test.parquet.cols=100" ` + "-Dsysds.test.parquet.warmup=1" ` + "-Dsysds.test.parquet.reps=3" ` + test +``` + +### macOS +```bash +mvn \ + -Dtest=ParquetBenchmarkTest#benchmarkParquetReadWrite \ + -Dsysds.test.parquet.benchmark=true \ + -Dsysds.test.parquet.rows=50000 \ + -Dsysds.test.parquet.cols=100 \ + -Dsysds.test.parquet.warmup=1 \ + -Dsysds.test.parquet.reps=3 \ + test +``` \ No newline at end of file diff --git a/src/test/java/org/apache/sysds/test/functions/io/ParquetBenchmarkTest.java b/src/test/java/org/apache/sysds/test/functions/io/ParquetBenchmarkTest.java new file mode 100644 index 00000000000..9772d4b317f --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/ParquetBenchmarkTest.java @@ -0,0 +1,983 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.io; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.hops.OptimizerUtils; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.io.FrameReader; +import org.apache.sysds.runtime.io.FrameReaderParquet; +import org.apache.sysds.runtime.io.FrameReaderParquetParallel; +import org.apache.sysds.runtime.io.FrameWriter; +import org.apache.sysds.runtime.io.FrameWriterParquet; +import org.apache.sysds.runtime.io.FrameWriterParquetParallel; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.utils.stats.InfrastructureAnalyzer; +import org.junit.Assume; +import org.junit.Test; + +/** + * Manual benchmark for current Parquet frame IO implementations. + * This benchmark is intentionally disabled by default and must be enabled with: + * -Dsysds.test.parquet.benchmark=true + * + * The benchmark is diagnostic only. It is not a value-level correctness test. + * + * Default measured paths: + * - sequential Parquet frame write/read + * - current parallel Parquet frame write/read + * - raw byte streaming read as an IO-only control + * - Parquet footer-only read as a metadata-only control + * + * Default data profiles: + * - dense_double_only + * - mixed_schema + * - sparse_like_double + * + * Optional manual multipart experiment: + * - enable with -Dsysds.test.parquet.manual.parts=2,4,8 + * - this manually creates multiple Parquet part files and benchmarks the current + * parallel reader on those multi-file inputs + * + * Output: + * - target/parquet-benchmark.csv + * - target/parquet-benchmark.json + * + * Known limitations: + * - results are affected by JVM warm-up and OS page cache + * - heap_before/heap_after are rough diagnostics, not exact allocation counts + * - raw_io_read measures byte streaming only, not Parquet decoding + * - manual multipart input is performance-only and not correctness-validated + */ +public class ParquetBenchmarkTest extends AutomatedTestBase { + private static final String TEST_NAME = "ParquetBenchmarkTest"; + private static final String TEST_DIR = "functions/io/"; + private static final String TEST_CLASS_DIR = TEST_DIR + ParquetBenchmarkTest.class.getSimpleName() + "/"; + + private static final String PROFILE_DENSE_DOUBLE = "dense_double_only"; + private static final String PROFILE_MIXED_SCHEMA = "mixed_schema"; + private static final String PROFILE_SPARSE_LIKE_DOUBLE = "sparse_like_double"; + + private static volatile long _blackhole = 0; + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"Rout"})); + } + + @Test + public void benchmarkParquetReadWrite() throws Exception { + Assume.assumeTrue("Manual benchmark. Enable with -Dsysds.test.parquet.benchmark=true", Boolean.getBoolean("sysds.test.parquet.benchmark")); + + final int rows = Integer.getInteger("sysds.test.parquet.rows", 100_000); + final int cols = Integer.getInteger("sysds.test.parquet.cols", 50); + final int warmup = Integer.getInteger("sysds.test.parquet.warmup", 1); + final int reps = Integer.getInteger("sysds.test.parquet.reps", 3); + + final String[] profiles = parseProfileList(System.getProperty("sysds.test.parquet.profiles", PROFILE_DENSE_DOUBLE + "," + PROFILE_MIXED_SCHEMA + "," + PROFILE_SPARSE_LIKE_DOUBLE)); + + final int[] manualParts = parsePositiveIntList(System.getProperty("sysds.test.parquet.manual.parts", "")); + + File csvFile = new File("target/parquet-benchmark.csv"); + File jsonFile = new File("target/parquet-benchmark.json"); + + if(csvFile.getParentFile() != null) + csvFile.getParentFile().mkdirs(); + + try(PrintWriter csv = new PrintWriter(new FileWriter(csvFile)); PrintWriter json = new PrintWriter(new FileWriter(jsonFile))) { + writeCsvHeader(csv); + + json.println("["); + JsonState jsonState = new JsonState(); + + for(String dataProfile : profiles) { + FrameBlock frameBlock = generateFrame(dataProfile, rows, cols); + benchmarkProfile(csv, json, jsonState, dataProfile, frameBlock, warmup, reps); + + if(manualParts.length > 0) + benchmarkManualMultipart(csv, json, jsonState, dataProfile, frameBlock, manualParts, warmup, reps); + } + + json.println(); + json.println("]"); + } + + System.out.println("Parquet CSV benchmark results saved under: " + csvFile.getAbsolutePath()); + System.out.println("Parquet JSON benchmark results saved under: " + jsonFile.getAbsolutePath()); + } + + private void benchmarkProfile( + PrintWriter csv, + PrintWriter json, + JsonState jsonState, + String dataProfile, + FrameBlock frameBlock, + int warmup, + int reps) throws Exception { + int rows = frameBlock.getNumRows(); + int cols = frameBlock.getNumColumns(); + String profileName = safeName(dataProfile); + + String seqInput = output("parquet_" + profileName + "_seq_input"); + new FrameWriterParquet().writeFrameToHDFS(frameBlock, seqInput, rows, cols); + + String parallelInput = output("parquet_" + profileName + "_parallel_input"); + new FrameWriterParquetParallel().writeFrameToHDFS(frameBlock, parallelInput, rows, cols); + + benchmarkWrite(csv, json, jsonState, dataProfile, "seq", new FrameWriterParquet(), frameBlock, warmup, reps, ""); + benchmarkRawIORead(csv, json, jsonState, dataProfile, "seq", seqInput, frameBlock, warmup, reps, "raw_bytes_only"); + benchmarkFooterRead(csv, json, jsonState, dataProfile, "seq", seqInput, frameBlock, warmup, reps, "parquet_footer_only"); + benchmarkRead(csv, json, jsonState, dataProfile, "seq", new FrameReaderParquet(), seqInput, frameBlock, warmup, reps, ""); + + benchmarkWrite(csv, json, jsonState, dataProfile, "parallel", new FrameWriterParquetParallel(), frameBlock, warmup, reps, "current_parallel_write_path"); + benchmarkRawIORead(csv, json, jsonState, dataProfile, "parallel", parallelInput, frameBlock, warmup, reps, "raw_bytes_only"); + benchmarkFooterRead(csv, json, jsonState, dataProfile, "parallel", parallelInput, frameBlock, warmup, reps, "parquet_footer_only"); + benchmarkRead(csv, json, jsonState, dataProfile, "parallel", new FrameReaderParquetParallel(), parallelInput, frameBlock, warmup, reps, "current_parallel_read_path_not_value_validated"); + } + + private void benchmarkManualMultipart( + PrintWriter csv, + PrintWriter json, + JsonState jsonState, + String dataProfile, + FrameBlock frameBlock, + int[] manualParts, + int warmup, + int reps) throws Exception { + for(int numParts : manualParts) { + if(numParts <= 1 || numParts > frameBlock.getNumRows()) + continue; + + String manualInput = output("parquet_" + safeName(dataProfile) + "_manual_multipart_" + numParts); + + writeManualMultipartInput(frameBlock, manualInput, numParts); + + String impl = "parallel_manual_parts_" + numParts; + + benchmarkRawIORead(csv, json, jsonState, dataProfile, impl, manualInput, frameBlock, warmup, reps, "raw_bytes_only"); + benchmarkFooterRead(csv, json, jsonState, dataProfile, impl, manualInput, frameBlock, warmup, reps, "parquet_footer_only"); + benchmarkRead(csv, json, jsonState, dataProfile, impl, new FrameReaderParquetParallel(), manualInput, frameBlock, warmup, reps, "manual_multipart_input_not_value_validated"); + } + } + + private void writeCsvHeader(PrintWriter csv) { + csv.println("data_profile,impl,operation,rows,cols,cells,rep,is_warmup,wall_ms," + + "file_size_bytes,num_part_files,avg_part_file_size_bytes," + + "hdfs_block_size_bytes,current_writer_estimated_num_part_files," + + "current_writer_estimated_num_threads," + + "configured_parallel_read_parallelism,configured_parallel_write_parallelism," + + "estimated_parallel_read_tasks," + + "dense_double_reference_size_bytes,actual_file_to_dense_double_reference_ratio," + + "expected_nonzero_fraction," + + "reader_value_extraction_mode,frame_materialization," + + "heap_before_bytes,heap_after_bytes,heap_delta_bytes," + + "gc_count_delta,gc_time_delta_ms,notes"); + } + + private void benchmarkWrite( + PrintWriter csv, + PrintWriter json, + JsonState jsonState, + String dataProfile, + String impl, + FrameWriter writer, + FrameBlock frameBlock, + int warmup, + int reps, + String notes) throws Exception { + int rows = frameBlock.getNumRows(); + int cols = frameBlock.getNumColumns(); + + for(int i = 0; i < warmup + reps; i++) { + boolean isWarmup = i < warmup; + String path = output("parquet_" + safeName(dataProfile) + "_" + impl + "_write_" + i); + + long heapBefore = usedHeap(); + GcStats gcBefore = getGcStats(); + + long t0 = System.nanoTime(); + writer.writeFrameToHDFS(frameBlock, path, rows, cols); + long t1 = System.nanoTime(); + + GcStats gcAfter = getGcStats(); + long heapAfter = usedHeap(); + + PathStats pathStats = getPathStats(path); + + writeResult(csv, json, jsonState, dataProfile, impl, "write", rows, cols, i, isWarmup, t0, t1, pathStats.fileSizeBytes, pathStats.numPartFiles, heapBefore, heapAfter, gcAfter.count - gcBefore.count, gcAfter.timeMs - gcBefore.timeMs, notes); + } + } + + private void benchmarkRead( + PrintWriter csv, + PrintWriter json, + JsonState jsonState, + String dataProfile, + String impl, + FrameReader reader, + String path, + FrameBlock reference, + int warmup, + int reps, + String notes) throws Exception { + int rows = reference.getNumRows(); + int cols = reference.getNumColumns(); + ValueType[] schema = reference.getSchema(); + String[] names = reference.getColumnNames(); + + for(int i = 0; i < warmup + reps; i++) { + boolean isWarmup = i < warmup; + + PathStats pathStats = getPathStats(path); + + long heapBefore = usedHeap(); + GcStats gcBefore = getGcStats(); + + long t0 = System.nanoTime(); + FrameBlock ret = reader.readFrameFromHDFS(path, schema, names, rows, cols); + long t1 = System.nanoTime(); + + GcStats gcAfter = getGcStats(); + long heapAfter = usedHeap(); + + blackhole(ret); + + writeResult(csv, json, jsonState, dataProfile, impl, "read", rows, cols, i, isWarmup, t0, t1, pathStats.fileSizeBytes, pathStats.numPartFiles, heapBefore, heapAfter, gcAfter.count - gcBefore.count, gcAfter.timeMs - gcBefore.timeMs, notes); + } + } + + private void benchmarkRawIORead( + PrintWriter csv, + PrintWriter json, + JsonState jsonState, + String dataProfile, + String impl, + String path, + FrameBlock reference, + int warmup, + int reps, + String notes) throws Exception { + int rows = reference.getNumRows(); + int cols = reference.getNumColumns(); + + for(int i = 0; i < warmup + reps; i++) { + boolean isWarmup = i < warmup; + + PathStats pathStats = getPathStats(path); + + long heapBefore = usedHeap(); + GcStats gcBefore = getGcStats(); + + long t0 = System.nanoTime(); + long bytesRead = readRawBytes(path); + long t1 = System.nanoTime(); + + GcStats gcAfter = getGcStats(); + long heapAfter = usedHeap(); + + blackhole(bytesRead); + + writeResult(csv, json, jsonState, dataProfile, impl, "raw_io_read", rows, cols, i, isWarmup, t0, t1, pathStats.fileSizeBytes, pathStats.numPartFiles, heapBefore, heapAfter, gcAfter.count - gcBefore.count, gcAfter.timeMs - gcBefore.timeMs, notes); + } + } + + private void benchmarkFooterRead( + PrintWriter csv, + PrintWriter json, + JsonState jsonState, + String dataProfile, + String impl, + String path, + FrameBlock reference, + int warmup, + int reps, + String notes) throws Exception { + int rows = reference.getNumRows(); + int cols = reference.getNumColumns(); + + for(int i = 0; i < warmup + reps; i++) { + boolean isWarmup = i < warmup; + + PathStats pathStats = getPathStats(path); + + long heapBefore = usedHeap(); + GcStats gcBefore = getGcStats(); + + long t0 = System.nanoTime(); + long footerInfo = readParquetFooters(path); + long t1 = System.nanoTime(); + + GcStats gcAfter = getGcStats(); + long heapAfter = usedHeap(); + + blackhole(footerInfo); + + writeResult(csv, json, jsonState, dataProfile, impl, "footer_read", rows, cols, i, isWarmup, t0, t1, pathStats.fileSizeBytes, pathStats.numPartFiles, heapBefore, heapAfter, gcAfter.count - gcBefore.count, gcAfter.timeMs - gcBefore.timeMs, notes); + } + } + + private void writeManualMultipartInput(FrameBlock frameBlock, String dirName, int numParts) throws Exception { + Configuration conf = ConfigurationManager.getCachedJobConf(); + Path dir = new Path(dirName); + FileSystem fs = dir.getFileSystem(conf); + + if(fs.exists(dir)) + fs.delete(dir, true); + + fs.mkdirs(dir); + + FrameWriter writer = new FrameWriterParquet(); + + int rows = frameBlock.getNumRows(); + int cols = frameBlock.getNumColumns(); + int chunkSize = (int) Math.ceil((double) rows / numParts); + + for(int part = 0; part < numParts; part++) { + int startRow = part * chunkSize; + int endRow = Math.min((part + 1) * chunkSize, rows); + + if(startRow >= endRow) + continue; + + FrameBlock slice = frameBlock.slice(startRow, endRow - 1); + Path partPath = new Path(dir, getManualPartFileName(part)); + + writer.writeFrameToHDFS(slice, partPath.toString(), slice.getNumRows(), cols); + } + } + + private String getManualPartFileName(int part) { + return String.format(Locale.US, "part-%05d", part); + } + + private void writeResult( + PrintWriter csv, + PrintWriter json, + JsonState jsonState, + String dataProfile, + String impl, + String operation, + int rows, + int cols, + int rep, + boolean isWarmup, + long t0, + long t1, + long fileSizeBytes, + int numPartFiles, + long heapBefore, + long heapAfter, + long gcCountDelta, + long gcTimeDeltaMs, + String notes) { + double wallMs = (t1 - t0) / 1e6; + BenchmarkDiagnostics diag = getBenchmarkDiagnostics(dataProfile, impl, operation, rows, cols, fileSizeBytes, numPartFiles); + + csv.printf(Locale.US, + "%s,%s,%s,%d,%d,%d,%d,%s,%.3f,%d,%d,%.3f,%d,%d,%d,%d,%d,%d,%d,%.6f,%.6f,%s,%s,%d,%d,%d,%d,%d,%s%n", + escapeCsv(dataProfile), + escapeCsv(impl), + escapeCsv(operation), + rows, + cols, + diag.cells, + rep, + isWarmup, + wallMs, + fileSizeBytes, + numPartFiles, + diag.avgPartFileSizeBytes, + diag.hdfsBlockSizeBytes, + diag.currentWriterEstimatedNumPartFiles, + diag.currentWriterEstimatedNumThreads, + diag.configuredParallelReadParallelism, + diag.configuredParallelWriteParallelism, + diag.estimatedParallelReadTasks, + diag.denseDoubleReferenceSizeBytes, + diag.actualFileToDenseDoubleReferenceRatio, + diag.expectedNonZeroFraction, + escapeCsv(diag.readerValueExtractionMode), + escapeCsv(diag.frameMaterialization), + heapBefore, + heapAfter, + heapAfter - heapBefore, + gcCountDelta, + gcTimeDeltaMs, + escapeCsv(notes)); + csv.flush(); + + if(jsonState.hasEntries) + json.println(","); + else + jsonState.hasEntries = true; + + json.printf(Locale.US, + " {\"data_profile\":\"%s\",\"impl\":\"%s\",\"operation\":\"%s\"," + + "\"rows\":%d,\"cols\":%d,\"cells\":%d," + + "\"rep\":%d,\"is_warmup\":%s,\"wall_ms\":%.3f," + + "\"file_size_bytes\":%d,\"num_part_files\":%d," + + "\"avg_part_file_size_bytes\":%.3f," + + "\"hdfs_block_size_bytes\":%d," + + "\"current_writer_estimated_num_part_files\":%d," + + "\"current_writer_estimated_num_threads\":%d," + + "\"configured_parallel_read_parallelism\":%d," + + "\"configured_parallel_write_parallelism\":%d," + + "\"estimated_parallel_read_tasks\":%d," + + "\"dense_double_reference_size_bytes\":%d," + + "\"actual_file_to_dense_double_reference_ratio\":%.6f," + + "\"expected_nonzero_fraction\":%.6f," + + "\"reader_value_extraction_mode\":\"%s\"," + + "\"frame_materialization\":\"%s\"," + + "\"heap_before_bytes\":%d,\"heap_after_bytes\":%d," + + "\"heap_delta_bytes\":%d," + + "\"gc_count_delta\":%d,\"gc_time_delta_ms\":%d," + + "\"notes\":\"%s\"}", + escapeJson(dataProfile), + escapeJson(impl), + escapeJson(operation), + rows, + cols, + diag.cells, + rep, + isWarmup, + wallMs, + fileSizeBytes, + numPartFiles, + diag.avgPartFileSizeBytes, + diag.hdfsBlockSizeBytes, + diag.currentWriterEstimatedNumPartFiles, + diag.currentWriterEstimatedNumThreads, + diag.configuredParallelReadParallelism, + diag.configuredParallelWriteParallelism, + diag.estimatedParallelReadTasks, + diag.denseDoubleReferenceSizeBytes, + diag.actualFileToDenseDoubleReferenceRatio, + diag.expectedNonZeroFraction, + escapeJson(diag.readerValueExtractionMode), + escapeJson(diag.frameMaterialization), + heapBefore, + heapAfter, + heapAfter - heapBefore, + gcCountDelta, + gcTimeDeltaMs, + escapeJson(notes)); + json.flush(); + } + + private BenchmarkDiagnostics getBenchmarkDiagnostics( + String dataProfile, + String impl, + String operation, + int rows, + int cols, + long fileSizeBytes, + int numPartFiles) { + long cells = (long) rows * cols; + long hdfsBlockSize = InfrastructureAnalyzer.getHDFSBlockSize(); + + int configuredParallelReadParallelism = OptimizerUtils.getParallelBinaryReadParallelism(); + int configuredParallelWriteParallelism = OptimizerUtils.getParallelBinaryWriteParallelism(); + + long currentWriterEstimatedNumPartFiles = hdfsBlockSize > 0 ? Math.max(cells / hdfsBlockSize, 1) : 1; + int currentWriterEstimatedNumThreads = (int) Math.min(configuredParallelWriteParallelism, currentWriterEstimatedNumPartFiles); + + int estimatedParallelReadTasks = estimateParallelReadTasks(impl, operation, numPartFiles, configuredParallelReadParallelism); + + long denseDoubleReferenceSizeBytes = cells * 8; + double avgPartFileSizeBytes = numPartFiles > 0 ? ((double) fileSizeBytes / numPartFiles) : -1.0; + double actualFileToDenseDoubleReferenceRatio = denseDoubleReferenceSizeBytes > 0 ? ((double) fileSizeBytes / denseDoubleReferenceSizeBytes) : -1.0; + + String readerValueExtractionMode = getReaderValueExtractionMode(impl, operation); + String frameMaterialization = "FrameBlock"; + double expectedNonZeroFraction = getExpectedNonZeroFraction(dataProfile); + + return new BenchmarkDiagnostics(cells, hdfsBlockSize, currentWriterEstimatedNumPartFiles, currentWriterEstimatedNumThreads, configuredParallelReadParallelism, configuredParallelWriteParallelism, estimatedParallelReadTasks, denseDoubleReferenceSizeBytes, avgPartFileSizeBytes, actualFileToDenseDoubleReferenceRatio, expectedNonZeroFraction, readerValueExtractionMode, frameMaterialization); + } + + private int estimateParallelReadTasks( + String impl, + String operation, + int numPartFiles, + int configuredParallelReadParallelism) { + if(!"read".equals(operation)) + return 0; + + if(!impl.startsWith("parallel")) + return 1; + + if(numPartFiles <= 0) + return 0; + + return Math.min(configuredParallelReadParallelism, numPartFiles); + } + + private String getReaderValueExtractionMode(String impl, String operation) { + if(!"read".equals(operation)) + return "not_applicable"; + + if("seq".equals(impl)) + return "typed_getters"; + + if(impl.startsWith("parallel")) + return "getValueToString"; + + return "unknown"; + } + + private double getExpectedNonZeroFraction(String dataProfile) { + if(PROFILE_DENSE_DOUBLE.equals(dataProfile)) + return 1.0; + else if(PROFILE_SPARSE_LIKE_DOUBLE.equals(dataProfile)) + return 0.05; + else + return -1.0; + } + + private FrameBlock generateFrame(String dataProfile, int rows, int cols) { + if(PROFILE_DENSE_DOUBLE.equals(dataProfile)) + return generateDenseDoubleFrame(rows, cols); + else if(PROFILE_MIXED_SCHEMA.equals(dataProfile)) + return generateMixedSchemaFrame(rows, cols); + else if(PROFILE_SPARSE_LIKE_DOUBLE.equals(dataProfile)) + return generateSparseLikeDoubleFrame(rows, cols); + else + throw new RuntimeException("Unknown Parquet benchmark data profile: " + dataProfile); + } + + private FrameBlock generateDenseDoubleFrame(int rows, int cols) { + ValueType[] schema = new ValueType[cols]; + for(int j = 0; j < cols; j++) + schema[j] = ValueType.FP64; + + FrameBlock frameBlock = new FrameBlock(schema); + + for(int i = 0; i < rows; i++) { + Object[] row = new Object[cols]; + for(int j = 0; j < cols; j++) + row[j] = (double) (i * cols + j + 1); + frameBlock.appendRow(row); + } + + return frameBlock; + } + + private FrameBlock generateSparseLikeDoubleFrame(int rows, int cols) { + ValueType[] schema = new ValueType[cols]; + for(int j = 0; j < cols; j++) + schema[j] = ValueType.FP64; + + FrameBlock frameBlock = new FrameBlock(schema); + + for(int i = 0; i < rows; i++) { + Object[] row = new Object[cols]; + for(int j = 0; j < cols; j++) { + int linearIndex = i * cols + j; + if(linearIndex % 20 == 0) + row[j] = (double) (linearIndex + 1); + else + row[j] = 0.0; + } + frameBlock.appendRow(row); + } + + return frameBlock; + } + + private FrameBlock generateMixedSchemaFrame(int rows, int cols) { + ValueType[] schema = new ValueType[cols]; + for(int j = 0; j < cols; j++) { + switch(j % 5) { + case 0: + schema[j] = ValueType.FP64; + break; + case 1: + schema[j] = ValueType.INT64; + break; + case 2: + schema[j] = ValueType.BOOLEAN; + break; + case 3: + schema[j] = ValueType.STRING; + break; + default: + schema[j] = ValueType.INT32; + } + } + + FrameBlock frameBlock = new FrameBlock(schema); + + for(int i = 0; i < rows; i++) { + Object[] row = new Object[cols]; + for(int j = 0; j < cols; j++) { + switch(schema[j]) { + case FP64: + row[j] = (double) (i * cols + j + 1); + break; + case INT64: + row[j] = (long) (i * cols + j + 1); + break; + case BOOLEAN: + row[j] = ((i + j) % 2 == 0); + break; + case STRING: + row[j] = "s_" + (i % 1000) + "_" + j; + break; + case INT32: + row[j] = i + j; + break; + default: + throw new RuntimeException("Unsupported generated type: " + schema[j]); + } + } + frameBlock.appendRow(row); + } + + return frameBlock; + } + + private long readRawBytes(String fname) throws IOException { + Configuration conf = ConfigurationManager.getCachedJobConf(); + List files = listDataFiles(fname, conf); + + byte[] buffer = new byte[1024 * 1024]; + long total = 0; + + for(Path file : files) { + FileSystem fs = file.getFileSystem(conf); + + try(FSDataInputStream in = fs.open(file)) { + int n; + while((n = in.read(buffer)) != -1) + total += n; + } + } + + return total; + } + + private long readParquetFooters(String fname) throws IOException { + Configuration conf = ConfigurationManager.getCachedJobConf(); + List files = listDataFiles(fname, conf); + + long checksum = 0; + + for(Path file : files) { + try(ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(file, conf))) { + ParquetMetadata metadata = reader.getFooter(); + + checksum += metadata.getBlocks().size(); + checksum += metadata.getFileMetaData().getSchema().getFieldCount(); + + for(int i = 0; i < metadata.getBlocks().size(); i++) + checksum += metadata.getBlocks().get(i).getRowCount(); + } + } + + return checksum; + } + + private List listDataFiles(String fname, Configuration conf) throws IOException { + Path path = new Path(fname); + FileSystem fileSystem = path.getFileSystem(conf); + + List files = new ArrayList<>(); + + if(!fileSystem.exists(path)) + return files; + + FileStatus status = fileSystem.getFileStatus(path); + + if(status.isFile()) { + files.add(path); + return files; + } + + for(FileStatus child : fileSystem.listStatus(path)) { + if(child.isFile() && isDataFile(child.getPath())) + files.add(child.getPath()); + } + + return files; + } + + private boolean isDataFile(Path path) { + String name = path.getName(); + + return !name.startsWith("_") + && !name.startsWith(".") + && !name.endsWith(".crc"); + } + + private PathStats getPathStats(String fname) { + try { + Configuration conf = ConfigurationManager.getCachedJobConf(); + List files = listDataFiles(fname, conf); + + long totalSize = 0; + int numFiles = 0; + + for(Path file : files) { + FileSystem fileSystem = file.getFileSystem(conf); + FileStatus status = fileSystem.getFileStatus(file); + + if(status.isFile()) { + totalSize += status.getLen(); + numFiles++; + } + } + + return new PathStats(totalSize, numFiles); + } + catch(Exception ex) { + return new PathStats(-1, -1); + } + } + + private String[] parseProfileList(String value) { + if(value == null || value.trim().isEmpty()) + return new String[] {PROFILE_DENSE_DOUBLE, PROFILE_MIXED_SCHEMA, PROFILE_SPARSE_LIKE_DOUBLE}; + + String[] tokens = value.split(","); + List profiles = new ArrayList<>(); + + for(String token : tokens) { + String profile = token.trim(); + + if(profile.isEmpty()) + continue; + + if(!PROFILE_DENSE_DOUBLE.equals(profile) && !PROFILE_MIXED_SCHEMA.equals(profile) + && !PROFILE_SPARSE_LIKE_DOUBLE.equals(profile)) { + throw new RuntimeException("Unknown Parquet benchmark data profile: " + profile); + } + + profiles.add(profile); + } + + if(profiles.isEmpty()) + throw new RuntimeException("No valid Parquet benchmark data profiles specified."); + + return profiles.toArray(new String[0]); + } + + private int[] parsePositiveIntList(String value) { + if(value == null || value.trim().isEmpty()) + return new int[0]; + + String[] tokens = value.split(","); + int[] tmp = new int[tokens.length]; + int count = 0; + + for(String token : tokens) { + String trimmed = token.trim(); + + if(trimmed.isEmpty()) + continue; + + int v = Integer.parseInt(trimmed); + + if(v > 0) + tmp[count++] = v; + } + + int[] ret = new int[count]; + System.arraycopy(tmp, 0, ret, 0, count); + return ret; + } + + private String safeName(String s) { + return s.replaceAll("[^A-Za-z0-9_\\-]", "_"); + } + + private long usedHeap() { + Runtime rt = Runtime.getRuntime(); + return rt.totalMemory() - rt.freeMemory(); + } + + private GcStats getGcStats() { + long count = 0; + long timeMs = 0; + + for(GarbageCollectorMXBean bean : ManagementFactory.getGarbageCollectorMXBeans()) { + long c = bean.getCollectionCount(); + long t = bean.getCollectionTime(); + + if(c >= 0) + count += c; + if(t >= 0) + timeMs += t; + } + + return new GcStats(count, timeMs); + } + + private void blackhole(FrameBlock frameBlock) { + if(frameBlock == null) + throw new RuntimeException("Unexpected null FrameBlock."); + + _blackhole ^= frameBlock.getNumRows(); + _blackhole ^= frameBlock.getNumColumns(); + } + + private void blackhole(long value) { + _blackhole ^= value; + } + + private String escapeCsv(String s) { + if(s == null) + return ""; + + boolean needsQuotes = + s.indexOf(',') >= 0 || s.indexOf('"') >= 0 || s.indexOf('\n') >= 0 || s.indexOf('\r') >= 0; + + if(!needsQuotes) + return s; + + return "\"" + s.replace("\"", "\"\"") + "\""; + } + + private String escapeJson(String s) { + if(s == null) + return ""; + + StringBuilder stringBuilder = new StringBuilder(); + for(int i = 0; i < s.length(); i++) { + char c = s.charAt(i); + switch(c) { + case '"': + stringBuilder.append("\\\""); + break; + case '\\': + stringBuilder.append("\\\\"); + break; + case '\b': + stringBuilder.append("\\b"); + break; + case '\f': + stringBuilder.append("\\f"); + break; + case '\n': + stringBuilder.append("\\n"); + break; + case '\r': + stringBuilder.append("\\r"); + break; + case '\t': + stringBuilder.append("\\t"); + break; + default: + if(c < 0x20) + stringBuilder.append(String.format("\\u%04x", (int) c)); + else + stringBuilder.append(c); + } + } + return stringBuilder.toString(); + } + + private static class JsonState { + private boolean hasEntries = false; + } + + private static class GcStats { + private final long count; + private final long timeMs; + + private GcStats(long count, long timeMs) { + this.count = count; + this.timeMs = timeMs; + } + } + + private static class PathStats { + private final long fileSizeBytes; + private final int numPartFiles; + + private PathStats(long fileSizeBytes, int numPartFiles) { + this.fileSizeBytes = fileSizeBytes; + this.numPartFiles = numPartFiles; + } + } + + private static class BenchmarkDiagnostics { + private final long cells; + private final long hdfsBlockSizeBytes; + private final long currentWriterEstimatedNumPartFiles; + private final int currentWriterEstimatedNumThreads; + private final int configuredParallelReadParallelism; + private final int configuredParallelWriteParallelism; + private final int estimatedParallelReadTasks; + private final long denseDoubleReferenceSizeBytes; + private final double avgPartFileSizeBytes; + private final double actualFileToDenseDoubleReferenceRatio; + private final double expectedNonZeroFraction; + private final String readerValueExtractionMode; + private final String frameMaterialization; + + private BenchmarkDiagnostics( + long cells, + long hdfsBlockSizeBytes, + long currentWriterEstimatedNumPartFiles, + int currentWriterEstimatedNumThreads, + int configuredParallelReadParallelism, + int configuredParallelWriteParallelism, + int estimatedParallelReadTasks, + long denseDoubleReferenceSizeBytes, + double avgPartFileSizeBytes, + double actualFileToDenseDoubleReferenceRatio, + double expectedNonZeroFraction, + String readerValueExtractionMode, + String frameMaterialization) { + this.cells = cells; + this.hdfsBlockSizeBytes = hdfsBlockSizeBytes; + this.currentWriterEstimatedNumPartFiles = currentWriterEstimatedNumPartFiles; + this.currentWriterEstimatedNumThreads = currentWriterEstimatedNumThreads; + this.configuredParallelReadParallelism = configuredParallelReadParallelism; + this.configuredParallelWriteParallelism = configuredParallelWriteParallelism; + this.estimatedParallelReadTasks = estimatedParallelReadTasks; + this.denseDoubleReferenceSizeBytes = denseDoubleReferenceSizeBytes; + this.avgPartFileSizeBytes = avgPartFileSizeBytes; + this.actualFileToDenseDoubleReferenceRatio = actualFileToDenseDoubleReferenceRatio; + this.expectedNonZeroFraction = expectedNonZeroFraction; + this.readerValueExtractionMode = readerValueExtractionMode; + this.frameMaterialization = frameMaterialization; + } + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/sysds/test/functions/io/hdf5/HDF5BenchmarkTest.java b/src/test/java/org/apache/sysds/test/functions/io/hdf5/HDF5BenchmarkTest.java new file mode 100644 index 00000000000..08f2243b4dd --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/hdf5/HDF5BenchmarkTest.java @@ -0,0 +1,752 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.io.hdf5; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.stream.Stream; + +import org.apache.sysds.hops.OptimizerUtils; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.io.FileFormatPropertiesHDF5; +import org.apache.sysds.runtime.io.ReaderHDF5; +import org.apache.sysds.runtime.io.ReaderHDF5Parallel; +import org.apache.sysds.runtime.io.WriterHDF5; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.junit.Assume; +import org.junit.Test; + +/** + * Manual HDF5 I/O benchmark. + * + * This test is disabled by default and only runs with + * -Dsysds.test.hdf5.benchmark=true. It writes CSV/JSON results under target/ and is intended for local + * performance diagnostics, not CI performance checking. + */ +public class HDF5BenchmarkTest { + private static final String ENABLE_PROPERTY = "sysds.test.hdf5.benchmark"; + private static final String KEEP_FILES_PROPERTY = "sysds.test.hdf5.keep.files"; + + private static final String DATASET_NAME = "DATASET_1"; + + private static final int ROWS = Integer.getInteger("sysds.test.hdf5.rows", 10_000); + private static final int COLS = Integer.getInteger("sysds.test.hdf5.cols", 100); + private static final int BLOCK_SIZE = Integer.getInteger("sysds.test.hdf5.block.size", 1024); + + private static final int WARMUP_REPS = Integer.getInteger("sysds.test.hdf5.warmup.reps", 1); + private static final int MEASURE_REPS = Integer.getInteger("sysds.test.hdf5.measure.reps", 3); + private static final int RAW_BUFFER_SIZE = Integer.getInteger("sysds.test.hdf5.raw.buffer.bytes", 8 * 1024 * 1024); + + private static final int SPARSE_NNZ_PER_ROW = Integer.getInteger("sysds.test.hdf5.sparse.nnz.per.row", + Math.max(1, COLS / 100)); + + private static final byte RAW_BYTE_VALUE = (byte) 7; + + @Test + public void benchmarkHDF5ReadWrite() throws Exception { + Assume.assumeTrue("HDF5 benchmark disabled. Enable with -D" + ENABLE_PROPERTY + "=true", Boolean.parseBoolean(System.getProperty(ENABLE_PROPERTY, "false"))); + + validateBenchmarkProperties(); + + final long cells = Math.multiplyExact((long) ROWS, (long) COLS); + final long denseDoubleReferenceSizeBytes = Math.multiplyExact(cells, (long) Double.BYTES); + + Path targetDir = Paths.get("target").toAbsolutePath().normalize(); + Files.createDirectories(targetDir); + + Path workDir = targetDir.resolve("hdf5-benchmark-work-" + System.currentTimeMillis()); + Files.createDirectories(workDir); + + Path csvOut = targetDir.resolve("hdf5-benchmark-results.csv"); + Path jsonOut = targetDir.resolve("hdf5-benchmark-results.json"); + + boolean success = false; + try { + FileFormatPropertiesHDF5 props = new FileFormatPropertiesHDF5(DATASET_NAME); + + WriterHDF5 writer = new WriterHDF5(props); + ReaderHDF5 sequentialReader = new ReaderHDF5(props); + ReaderHDF5Parallel parallelReader = new ReaderHDF5Parallel(props); + + List profiles = new ArrayList<>(); + profiles.add(createDenseProfile(cells)); + profiles.add(createSparseLikeProfile()); + + List results = new ArrayList<>(); + + int totalReps = WARMUP_REPS + MEASURE_REPS; + for(DataProfile profile : profiles) { + for(int rep = 0; rep < totalReps; rep++) { + boolean warmup = rep < WARMUP_REPS; + int logicalRep = warmup ? rep : rep - WARMUP_REPS; + + Path rawPath = workDir.resolve(profile.name + "_raw_rep_" + rep + ".bin").toAbsolutePath().normalize(); + Path hdf5Path = workDir.resolve(profile.name + "_hdf5_rep_" + rep + ".h5").toAbsolutePath().normalize(); + + String hdf5Filename = hdf5Path.toUri().toString(); + + Result rawWriteResult = measure( + profile.name, + "raw", + "raw_write", + warmup, + logicalRep, + rawPath, + denseDoubleReferenceSizeBytes, + profile.logicalNnz, + "Chunked raw byte write; dense-Double-size I/O control; no HDF5 encoding; no MatrixBlock materialization", + new ThrowingRunnable() { + @Override + public void run() throws Exception { + writeRawBytes(rawPath, denseDoubleReferenceSizeBytes, RAW_BUFFER_SIZE); + } + } + ); + results.add(rawWriteResult); + + assertTrue("Raw file was not created: " + rawPath, Files.exists(rawPath)); + assertEquals("Raw file size mismatch.", denseDoubleReferenceSizeBytes, Files.size(rawPath)); + + Result rawReadResult = measure( + profile.name, + "raw", + "raw_read", + warmup, + logicalRep, + rawPath, + denseDoubleReferenceSizeBytes, + profile.logicalNnz, + "Chunked raw byte read; dense-Double-size I/O control; no HDF5 decoding; no MatrixBlock materialization", + new ThrowingRunnable() { + @Override + public void run() throws Exception { + readRawBytes(rawPath, denseDoubleReferenceSizeBytes, RAW_BUFFER_SIZE); + } + } + ); + results.add(rawReadResult); + + Result hdf5WriteResult = measure( + profile.name, + "seq", + "hdf5_write", + warmup, + logicalRep, + hdf5Path, + denseDoubleReferenceSizeBytes, + profile.logicalNnz, + profile.writePathNote, + new ThrowingRunnable() { + @Override + public void run() throws Exception { + writer.writeMatrixToHDFS(profile.matrix, hdf5Filename, ROWS, COLS, BLOCK_SIZE, profile.logicalNnz, false); + } + } + ); + results.add(hdf5WriteResult); + + assertTrue("HDF5 file was not created: " + hdf5Path, Files.exists(hdf5Path)); + assertTrue("HDF5 file size must be positive", totalFileSizeBytes(hdf5Path) > 0); + + Result hdf5SeqReadResult = measure( + profile.name, + "seq", + "hdf5_read", + warmup, + logicalRep, + hdf5Path, + denseDoubleReferenceSizeBytes, + profile.logicalNnz, + profile.sequentialReadPathNote, + new ThrowingRunnable() { + @Override + public void run() throws Exception { + MatrixBlock out = sequentialReader.readMatrixFromHDFS(hdf5Filename, ROWS, COLS, BLOCK_SIZE, profile.logicalNnz); + validateProfileMatrix(profile, out); + } + } + ); + results.add(hdf5SeqReadResult); + + Result hdf5ParReadResult = measure( + profile.name, + "par", + "hdf5_read", + warmup, + logicalRep, + hdf5Path, + denseDoubleReferenceSizeBytes, + profile.logicalNnz, + profile.parallelReadPathNote, + new ThrowingRunnable() { + @Override + public void run() throws Exception { + MatrixBlock out = parallelReader.readMatrixFromHDFS(hdf5Filename, ROWS, COLS, BLOCK_SIZE, profile.logicalNnz); + validateProfileMatrix(profile, out); + } + } + ); + results.add(hdf5ParReadResult); + } + } + + writeCsv(results, csvOut); + writeJson(results, jsonOut); + + System.out.println("HDF5 benchmark CSV: " + csvOut); + System.out.println("HDF5 benchmark JSON: " + jsonOut); + System.out.println("HDF5 benchmark work: " + workDir); + success = true; + } + finally { + if(success && !Boolean.parseBoolean(System.getProperty(KEEP_FILES_PROPERTY, "false"))) + deleteRecursivelySiltently(workDir); + } + } + + private static void validateBenchmarkProperties() { + if(ROWS <= 0) + throw new IllegalArgumentException("Invalid sysds.test.hdf5.rows=" + ROWS + ". Must be > 0."); + if(COLS <= 0) + throw new IllegalArgumentException("Invalid sysds.test.hdf5.cols=" + COLS + ". Must be > 0."); + if(BLOCK_SIZE <= 0) + throw new IllegalArgumentException("Invalid sysds.test.hdf5.block.size=" + BLOCK_SIZE + ". Must be > 0."); + if(WARMUP_REPS < 0) + throw new IllegalArgumentException("Invalid sysds.test.hdf5.warmup.reps=" + WARMUP_REPS + ". Must be >= 0."); + if(MEASURE_REPS <= 0) + throw new IllegalArgumentException("Invalid sysds.test.hdf5.measure.reps=" + MEASURE_REPS + ". Must be > 0."); + if(RAW_BUFFER_SIZE <= 0) + throw new IllegalArgumentException( + "Invalid sysds.test.hdf5.raw.buffer.bytes=" + RAW_BUFFER_SIZE + ". Must be > 0."); + if(SPARSE_NNZ_PER_ROW <= 0 || SPARSE_NNZ_PER_ROW > COLS) { + throw new IllegalArgumentException("Invalid sysds.test.hdf5.sparse.nnz.per.row=" + SPARSE_NNZ_PER_ROW + ". Must be in [1, COLS]. COLS=" + COLS); + } + } + + private static DataProfile createDenseProfile(long cells) { + MatrixBlock matrixBlock = new MatrixBlock(ROWS, COLS, false); + matrixBlock.allocateDenseBlockUnsafe(ROWS, COLS); + + DenseBlock db = matrixBlock.getDenseBlock(); + for(int i = 0; i < ROWS; i++) { + for(int j = 0; j < COLS; j++) + db.set(i, j, expectedDenseValue(i, j)); + } + + matrixBlock.recomputeNonZeros(); + matrixBlock.examSparsity(); + + DataProfile p = new DataProfile(); + p.name = "dense_double_only"; + p.matrix = matrixBlock; + p.logicalNnz = cells; + p.expectedSparse = false; + p.writePathNote = "WriterHDF5.writeMatrixToHDFS(MatrixBlock,...); dense MatrixBlock write path"; + p.sequentialReadPathNote = "ReaderHDF5.readMatrixFromHDFS(String,...); dense double MatrixBlock read path"; + p.parallelReadPathNote = "ReaderHDF5Parallel.readMatrixFromHDFS(String,...)"; + return p; + } + + private static DataProfile createSparseLikeProfile() { + long nnz = Math.multiplyExact((long) ROWS, (long) SPARSE_NNZ_PER_ROW); + + MatrixBlock matrixBlock = new MatrixBlock(ROWS, COLS, true, nnz); + matrixBlock.allocateSparseRowsBlock(); + + SparseBlock sparseBlock = matrixBlock.getSparseBlock(); + for(int i = 0; i < ROWS; i++) { + sparseBlock.allocate(i, SPARSE_NNZ_PER_ROW); + for(int j = 0; j < SPARSE_NNZ_PER_ROW; j++) + sparseBlock.append(i, j, expectedSparseLikeValue(i, j)); + } + + matrixBlock.setNonZeros(nnz); + matrixBlock.examSparsity(); + + assertTrue("Sparse-like input unexpectedly converted to dense. Reduce sparse.nnz.per.row.", matrixBlock.isInSparseFormat()); + + DataProfile p = new DataProfile(); + p.name = "sparse_like_double"; + p.matrix = matrixBlock; + p.logicalNnz = nnz; + p.expectedSparse = true; + p.writePathNote = "WriterHDF5.writeMatrixToHDFS(MatrixBlock,...)"; + p.sequentialReadPathNote = "ReaderHDF5.readMatrixFromHDFS(String,...)"; + p.parallelReadPathNote = "ReaderHDF5Parallel.readMatrixFromHDFS(String,...)"; + return p; + } + + private static Result measure( + String dataProfile, + String impl, + String operation, + boolean warmup, + int rep, + Path measuredPath, + long denseDoubleReferenceSizeBytes, + long logicalNnz, + String implementationPath, + ThrowingRunnable runnable + ) throws Exception { + long heapBefore = usedHeapBytes(); + GcSnapshot gcBefore = GcSnapshot.capture(); + + long t0 = System.nanoTime(); + runnable.run(); + long t1 = System.nanoTime(); + + GcSnapshot gcAfter = GcSnapshot.capture(); + long heapAfter = usedHeapBytes(); + + long fileSize = Files.exists(measuredPath) ? totalFileSizeBytes(measuredPath) : 0L; + int numPartFiles = Files.exists(measuredPath) ? countRegularFiles(measuredPath) : 0; + + Result r = new Result(); + r.dataProfile = dataProfile; + r.impl = impl; + r.operation = operation; + r.rows = ROWS; + r.cols = COLS; + r.cells = Math.multiplyExact((long) ROWS, (long) COLS); + r.logicalNnz = logicalNnz; + r.logicalSparsity = r.cells > 0 ? ((double) logicalNnz) / r.cells : 0.0; + r.sparseNnzPerRow = dataProfile.equals("sparse_like_double") ? SPARSE_NNZ_PER_ROW : COLS; + r.rep = rep; + r.isWarmup = warmup; + r.wallMs = (t1 - t0) / 1_000_000.0; + r.fileSizeBytes = fileSize; + r.numPartFiles = numPartFiles; + r.avgPartFileSizeBytes = numPartFiles > 0 ? ((double) fileSize) / numPartFiles : 0.0; + r.denseDoubleReferenceSizeBytes = denseDoubleReferenceSizeBytes; + r.actualFileToDenseDoubleReferenceRatio = denseDoubleReferenceSizeBytes > 0 ? ((double) fileSize) / denseDoubleReferenceSizeBytes : 0.0; + r.heapBeforeBytes = heapBefore; + r.heapAfterBytes = heapAfter; + r.heapDeltaBytes = heapAfter - heapBefore; + r.gcCountDelta = gcAfter.count - gcBefore.count; + r.gcTimeDeltaMs = gcAfter.timeMs - gcBefore.timeMs; + r.configuredParallelReadParallelism = OptimizerUtils.getParallelBinaryReadParallelism(); + r.configuredParallelWriteParallelism = OptimizerUtils.getParallelTextWriteParallelism(); + r.rawBufferBytesProperty = Integer.toString(RAW_BUFFER_SIZE); + r.keepFilesProperty = System.getProperty(KEEP_FILES_PROPERTY, "false"); + r.hdf5ReadParallelThreadsProperty = System.getProperty("sysds.hdf5.read.parallel.threads", ""); + r.hdf5ReadParallelMinBytesProperty = System.getProperty("sysds.hdf5.read.parallel.min.bytes", ""); + r.hdf5ReadBlockBytesProperty = System.getProperty("sysds.hdf5.read.block.bytes", ""); + r.hdf5ReadBufferBytesProperty = System.getProperty("sysds.hdf5.read.buffer.bytes", ""); + r.hdf5ReadMapBytesProperty = System.getProperty("sysds.hdf5.read.map.bytes", ""); + r.hdf5ReadMmapProperty = System.getProperty("sysds.hdf5.read.mmap", ""); + r.hdf5ReadTraceProperty = System.getProperty("sysds.hdf5.read.trace", ""); + r.hdf5SkipNnzProperty = System.getProperty("sysds.hdf5.read.skip.nnz", ""); + r.hdf5ForceDenseProperty = System.getProperty("sysds.hdf5.read.force.dense", ""); + r.implementationPath = implementationPath; + return r; + } + + private static double expectedDenseValue(int row, int col) { + return 1.0 + ((double) row * 1000.0) + (double) col; + } + + private static double expectedSparseLikeValue(int row, int col) { + if(col < SPARSE_NNZ_PER_ROW) + return 1.0 + ((double) row * 1000.0) + (double) col; + return 0.0; + } + + private static void validateProfileMatrix(DataProfile profile, MatrixBlock matrixBlock) { + assertEquals(ROWS, matrixBlock.getNumRows()); + assertEquals(COLS, matrixBlock.getNumColumns()); + assertEquals(profile.logicalNnz, matrixBlock.getNonZeros()); + + if(profile.expectedSparse) { + assertTrue("Sparse-like output unexpectedly dense.", matrixBlock.isInSparseFormat()); + + assertEquals(expectedSparseLikeValue(0, 0), getMatrixValue(matrixBlock, 0, 0), 0.0); + assertEquals(expectedSparseLikeValue(ROWS / 2, 0), getMatrixValue(matrixBlock, ROWS / 2, 0), 0.0); + assertEquals(expectedSparseLikeValue(ROWS - 1, 0), getMatrixValue(matrixBlock, ROWS - 1, 0), 0.0); + + if(SPARSE_NNZ_PER_ROW < COLS) { + assertEquals(0.0, getMatrixValue(matrixBlock, 0, COLS - 1), 0.0); + assertEquals(0.0, getMatrixValue(matrixBlock, ROWS / 2, COLS - 1), 0.0); + assertEquals(0.0, getMatrixValue(matrixBlock, ROWS - 1, COLS - 1), 0.0); + } + } + else { + assertTrue("Dense Double output unexpectedly sparse.", !matrixBlock.isInSparseFormat()); + + assertEquals(expectedDenseValue(0, 0), getMatrixValue(matrixBlock, 0, 0), 0.0); + assertEquals(expectedDenseValue(ROWS / 2, COLS / 2), getMatrixValue(matrixBlock, ROWS / 2, COLS / 2), 0.0); + assertEquals(expectedDenseValue(ROWS - 1, COLS - 1), getMatrixValue(matrixBlock, ROWS - 1, COLS - 1), 0.0); + } + } + + private static double getMatrixValue(MatrixBlock matrixBlock, int row, int col) { + if(!matrixBlock.isInSparseFormat()) + return matrixBlock.getDenseBlock().get(row, col); + + SparseBlock sparseBlock = matrixBlock.getSparseBlock(); + if(sparseBlock == null || sparseBlock.isEmpty(row)) + return 0.0; + + int rowStartPosition = sparseBlock.pos(row); + int rowNonZeroCount = sparseBlock.size(row); + int[] columnIndexes = sparseBlock.indexes(row); + double[] nonZeroValues = sparseBlock.values(row); + + for(int k = rowStartPosition; k < rowStartPosition + rowNonZeroCount; k++) { + if(columnIndexes[k] == col) + return nonZeroValues[k]; + } + + return 0.0; + } + + private static void writeRawBytes(Path path, long totalBytes, int bufferSize) throws IOException { + byte[] buffer = new byte[bufferSize]; + Arrays.fill(buffer, RAW_BYTE_VALUE); + + try(OutputStream outputStream = new BufferedOutputStream( + Files.newOutputStream( + path, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE), + bufferSize)) { + long written = 0; + while(written < totalBytes) { + int len = (int) Math.min(buffer.length, totalBytes - written); + outputStream.write(buffer, 0, len); + written += len; + } + } + } + + private static void readRawBytes(Path path, long expectedBytes, int bufferSize) throws IOException { + byte[] buffer = new byte[bufferSize]; + + try(InputStream inputStream = new BufferedInputStream(Files.newInputStream(path, StandardOpenOption.READ), bufferSize)) { + long total = 0; + int len; + while((len = inputStream.read(buffer)) >= 0) + total += len; + assertEquals("Raw byte read length mismatch.", expectedBytes, total); + } + } + + private static long usedHeapBytes() { + Runtime runtime = Runtime.getRuntime(); + return runtime.totalMemory() - runtime.freeMemory(); + } + + private static long totalFileSizeBytes(Path p) throws IOException { + if(Files.isRegularFile(p)) + return Files.size(p); + if(!Files.isDirectory(p)) + return 0L; + + final long[] size = new long[] {0L}; + try(Stream stream = Files.walk(p)) { + stream.filter(Files::isRegularFile).forEach(x -> { + try { + size[0] += Files.size(x); + } + catch(IOException exception) { + throw new RuntimeException(exception); + } + }); + } + return size[0]; + } + + private static int countRegularFiles(Path p) throws IOException { + if(Files.isRegularFile(p)) + return 1; + if(!Files.isDirectory(p)) + return 0; + + final int[] count = new int[] {0}; + try(Stream stream = Files.walk(p)) { + stream.filter(Files::isRegularFile).forEach(x -> count[0]++); + } + return count[0]; + } + + private static void deleteRecursively(Path p) throws IOException { + if(!Files.exists(p)) + return; + + try(Stream stream = Files.walk(p)) { + stream.sorted(Comparator.reverseOrder()).forEach(x -> { + try { + Files.deleteIfExists(x); + } + catch(IOException exception) { + throw new RuntimeException(exception); + } + }); + } + } + + private static void deleteRecursivelySiltently(Path p) { + final int maxAttempts = 3; + + for(int attempt = 1; attempt <= maxAttempts; attempt++) { + try { + deleteRecursively(p); + return; + } + catch(IOException | RuntimeException exception) { + if(attempt == maxAttempts) { + System.err.println("Could not delete HDF5 benchmark work directory: " + p); + System.err.println("This does not invalidate the benchmark results. " + + "On Windows, HDF5 or memory-mapped I/O can keep files locked briefly."); + System.err.println("Delete the directory manually later, or run with -D" + KEEP_FILES_PROPERTY + "=true."); + System.err.println("Cleanup error: " + exception.getMessage()); + return; + } + + System.gc(); + try { + Thread.sleep(250L); + } + catch(InterruptedException interrupted) { + Thread.currentThread().interrupt(); + return; + } + } + } + } + + private static void writeCsv(List results, Path out) throws IOException { + StringBuilder stringBuilder = new StringBuilder(); + + stringBuilder.append("data_profile,impl,operation,rows,cols,cells,logical_nnz,logical_sparsity,sparse_nnz_per_row,") + .append("rep,is_warmup,wall_ms,") + .append("file_size_bytes,num_part_files,avg_part_file_size_bytes,") + .append("dense_double_reference_size_bytes,actual_file_to_dense_double_reference_ratio,") + .append("heap_before_bytes,heap_after_bytes,heap_delta_bytes,") + .append("gc_count_delta,gc_time_delta_ms,") + .append("configured_parallel_read_parallelism,configured_parallel_write_parallelism,") + .append("raw_buffer_bytes_property,keep_files_property,") + .append("hdf5_read_parallel_threads_property,hdf5_read_parallel_min_bytes_property,") + .append("hdf5_read_block_bytes_property,hdf5_read_buffer_bytes_property,hdf5_read_map_bytes_property,") + .append("hdf5_read_mmap_property,hdf5_read_trace_property,hdf5_skip_nnz_property,hdf5_force_dense_property,") + .append("implementation_path\n"); + + for(Result r : results) { + stringBuilder.append(csv(r.dataProfile)).append(',') + .append(csv(r.impl)).append(',') + .append(csv(r.operation)).append(',') + .append(r.rows).append(',') + .append(r.cols).append(',') + .append(r.cells).append(',') + .append(r.logicalNnz).append(',') + .append(String.format(Locale.US, "%.8f", r.logicalSparsity)).append(',') + .append(r.sparseNnzPerRow).append(',') + .append(r.rep).append(',') + .append(r.isWarmup).append(',') + .append(String.format(Locale.US, "%.3f", r.wallMs)).append(',') + .append(r.fileSizeBytes).append(',') + .append(r.numPartFiles).append(',') + .append(String.format(Locale.US, "%.3f", r.avgPartFileSizeBytes)).append(',') + .append(r.denseDoubleReferenceSizeBytes).append(',') + .append(String.format(Locale.US, "%.6f", r.actualFileToDenseDoubleReferenceRatio)).append(',') + .append(r.heapBeforeBytes).append(',') + .append(r.heapAfterBytes).append(',') + .append(r.heapDeltaBytes).append(',') + .append(r.gcCountDelta).append(',') + .append(r.gcTimeDeltaMs).append(',') + .append(r.configuredParallelReadParallelism).append(',') + .append(r.configuredParallelWriteParallelism).append(',') + .append(csv(r.rawBufferBytesProperty)).append(',') + .append(csv(r.keepFilesProperty)).append(',') + .append(csv(r.hdf5ReadParallelThreadsProperty)).append(',') + .append(csv(r.hdf5ReadParallelMinBytesProperty)).append(',') + .append(csv(r.hdf5ReadBlockBytesProperty)).append(',') + .append(csv(r.hdf5ReadBufferBytesProperty)).append(',') + .append(csv(r.hdf5ReadMapBytesProperty)).append(',') + .append(csv(r.hdf5ReadMmapProperty)).append(',') + .append(csv(r.hdf5ReadTraceProperty)).append(',') + .append(csv(r.hdf5SkipNnzProperty)).append(',') + .append(csv(r.hdf5ForceDenseProperty)).append(',') + .append(csv(r.implementationPath)).append('\n'); + } + + Files.write(out, stringBuilder.toString().getBytes(StandardCharsets.UTF_8)); + } + + private static void writeJson(List results, Path out) throws IOException { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("[\n"); + for(int i = 0; i < results.size(); i++) { + if(i > 0) + stringBuilder.append(",\n"); + stringBuilder.append(results.get(i).toJson()); + } + stringBuilder.append("\n]\n"); + + Files.write(out, stringBuilder.toString().getBytes(StandardCharsets.UTF_8)); + } + + private static String csv(String s) { + if(s == null) + return ""; + return "\"" + s.replace("\"", "\"\"") + "\""; + } + + private static String json(String s) { + if(s == null) + return "null"; + return "\"" + s.replace("\\", "\\\\").replace("\"", "\\\"") + "\""; + } + + private interface ThrowingRunnable { + void run() throws Exception; + } + + private static class DataProfile { + String name; + MatrixBlock matrix; + long logicalNnz; + boolean expectedSparse; + String writePathNote; + String sequentialReadPathNote; + String parallelReadPathNote; + } + + private static class GcSnapshot { + long count; + long timeMs; + + static GcSnapshot capture() { + GcSnapshot snapshot = new GcSnapshot(); + for(GarbageCollectorMXBean bean : ManagementFactory.getGarbageCollectorMXBeans()) { + long c = bean.getCollectionCount(); + long t = bean.getCollectionTime(); + + if(c > 0) + snapshot.count += c; + if(t > 0) + snapshot.timeMs += t; + } + return snapshot; + } + } + + private static class Result { + String dataProfile; + String impl; + String operation; + int rows; + int cols; + long cells; + long logicalNnz; + double logicalSparsity; + int sparseNnzPerRow; + int rep; + boolean isWarmup; + double wallMs; + + long fileSizeBytes; + int numPartFiles; + double avgPartFileSizeBytes; + long denseDoubleReferenceSizeBytes; + double actualFileToDenseDoubleReferenceRatio; + + long heapBeforeBytes; + long heapAfterBytes; + long heapDeltaBytes; + long gcCountDelta; + long gcTimeDeltaMs; + + int configuredParallelReadParallelism; + int configuredParallelWriteParallelism; + + String rawBufferBytesProperty; + String keepFilesProperty; + String hdf5ReadParallelThreadsProperty; + String hdf5ReadParallelMinBytesProperty; + String hdf5ReadBlockBytesProperty; + String hdf5ReadBufferBytesProperty; + String hdf5ReadMapBytesProperty; + String hdf5ReadMmapProperty; + String hdf5ReadTraceProperty; + String hdf5SkipNnzProperty; + String hdf5ForceDenseProperty; + + String implementationPath; + + String toJson() { + return "{" + + "\"data_profile\":" + json(dataProfile) + + ",\"impl\":" + json(impl) + + ",\"operation\":" + json(operation) + + ",\"rows\":" + rows + + ",\"cols\":" + cols + + ",\"cells\":" + cells + + ",\"logical_nnz\":" + logicalNnz + + ",\"logical_sparsity\":" + String.format(Locale.US, "%.8f", logicalSparsity) + + ",\"sparse_nnz_per_row\":" + sparseNnzPerRow + + ",\"rep\":" + rep + + ",\"is_warmup\":" + isWarmup + + ",\"wall_ms\":" + String.format(Locale.US, "%.3f", wallMs) + + ",\"file_size_bytes\":" + fileSizeBytes + + ",\"num_part_files\":" + numPartFiles + + ",\"avg_part_file_size_bytes\":" + String.format(Locale.US, "%.3f", avgPartFileSizeBytes) + + ",\"dense_double_reference_size_bytes\":" + denseDoubleReferenceSizeBytes + + ",\"actual_file_to_dense_double_reference_ratio\":" + + String.format(Locale.US, "%.6f", actualFileToDenseDoubleReferenceRatio) + + ",\"heap_before_bytes\":" + heapBeforeBytes + + ",\"heap_after_bytes\":" + heapAfterBytes + + ",\"heap_delta_bytes\":" + heapDeltaBytes + + ",\"gc_count_delta\":" + gcCountDelta + + ",\"gc_time_delta_ms\":" + gcTimeDeltaMs + + ",\"configured_parallel_read_parallelism\":" + configuredParallelReadParallelism + + ",\"configured_parallel_write_parallelism\":" + configuredParallelWriteParallelism + + ",\"raw_buffer_bytes_property\":" + json(rawBufferBytesProperty) + + ",\"keep_files_property\":" + json(keepFilesProperty) + + ",\"hdf5_read_parallel_threads_property\":" + json(hdf5ReadParallelThreadsProperty) + + ",\"hdf5_read_parallel_min_bytes_property\":" + json(hdf5ReadParallelMinBytesProperty) + + ",\"hdf5_read_block_bytes_property\":" + json(hdf5ReadBlockBytesProperty) + + ",\"hdf5_read_buffer_bytes_property\":" + json(hdf5ReadBufferBytesProperty) + + ",\"hdf5_read_map_bytes_property\":" + json(hdf5ReadMapBytesProperty) + + ",\"hdf5_read_mmap_property\":" + json(hdf5ReadMmapProperty) + + ",\"hdf5_read_trace_property\":" + json(hdf5ReadTraceProperty) + + ",\"hdf5_skip_nnz_property\":" + json(hdf5SkipNnzProperty) + + ",\"hdf5_force_dense_property\":" + json(hdf5ForceDenseProperty) + + ",\"implementation_path\":" + json(implementationPath) + + "}"; + } + } +}