Skip to content
Closed
201 changes: 201 additions & 0 deletions dev/archery/archery/benchmark/jmh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from itertools import filterfalse, groupby, tee
import json
import subprocess
from tempfile import NamedTemporaryFile

from .core import Benchmark
from ..utils.command import Command
from ..utils.maven import Maven


def partition(pred, iterable):
# adapted from python's examples
t1, t2 = tee(iterable)
return list(filter(pred, t1)), list(filterfalse(pred, t2))


class JavaMicrobenchmarkHarnessCommand(Command):
""" Run a Java Micro Benchmark Harness

This assumes the binary supports the standard command line options,
notably `-Dbenchmark_filter`
"""

def __init__(self, build, benchmark_filter=None):
self.benchmark_filter = benchmark_filter
self.build = build
self.maven = Maven()

""" Extract benchmark names from output between "Benchmarks:" and "[INFO]".
Assume the following output:
...
Benchmarks:
org.apache.arrow.vector.IntBenchmarks.setIntDirectly
...
org.apache.arrow.vector.IntBenchmarks.setWithValueHolder
org.apache.arrow.vector.IntBenchmarks.setWithWriter
...
[INFO]
"""

def list_benchmarks(self):
argv = []
if self.benchmark_filter:
argv.append("-Dbenchmark.filter={}".format(self.benchmark_filter))
result = self.build.list(
*argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

lists = []
benchmarks = False
for line in str.splitlines(result.stdout.decode("utf-8")):
if not benchmarks:
if line.startswith("Benchmarks:"):
benchmarks = True
else:
if line.startswith("org.apache.arrow"):
lists.append(line)
if line.startswith("[INFO]"):
break
return lists

def results(self, repetitions):
with NamedTemporaryFile(suffix=".json") as out:
argv = ["-Dbenchmark.runs={}".format(repetitions),
"-Dbenchmark.resultfile={}".format(out.name),
"-Dbenchmark.resultformat=json"]
if self.benchmark_filter:
argv.append(
"-Dbenchmark.filter={}".format(self.benchmark_filter)
)

self.build.benchmark(*argv, check=True)
return json.load(out)


class JavaMicrobenchmarkHarnessObservation:
""" Represents one run of a single Java Microbenchmark Harness
"""

def __init__(self, benchmark, primaryMetric,
forks, warmupIterations, measurementIterations, **counters):
self.name = benchmark
self.primaryMetric = primaryMetric
self.score = primaryMetric["score"]
self.score_unit = primaryMetric["scoreUnit"]
self.forks = forks
self.warmups = warmupIterations
self.runs = measurementIterations
self.counters = {
"mode": counters["mode"],
"threads": counters["threads"],
"warmups": warmupIterations,
"warmupTime": counters["warmupTime"],
"measurements": measurementIterations,
"measurementTime": counters["measurementTime"],
"jvmArgs": counters["jvmArgs"]
}
self.reciprocal_value = True if self.score_unit.endswith(
"/op") else False
if self.score_unit.startswith("ops/"):
idx = self.score_unit.find("/")
self.normalizePerSec(self.score_unit[idx+1:])
elif self.score_unit.endswith("/op"):
idx = self.score_unit.find("/")
self.normalizePerSec(self.score_unit[:idx])
else:
self.normalizeFactor = 1

@property
def value(self):
""" Return the benchmark value."""
val = 1 / self.score if self.reciprocal_value else self.score
return val * self.normalizeFactor

def normalizePerSec(self, unit):
if unit == "ns":
self.normalizeFactor = 1000 * 1000 * 1000
elif unit == "us":
self.normalizeFactor = 1000 * 1000
elif unit == "ms":
self.normalizeFactor = 1000
elif unit == "min":
self.normalizeFactor = 1 / 60
elif unit == "hr":
self.normalizeFactor = 1 / (60 * 60)
elif unit == "day":
self.normalizeFactor = 1 / (60 * 60 * 24)
else:
self.normalizeFactor = 1

@property
def unit(self):
if self.score_unit.startswith("ops/"):
return "items_per_second"
elif self.score_unit.endswith("/op"):
return "items_per_second"
else:
return "?"

def __repr__(self):
return str(self.value)


class JavaMicrobenchmarkHarness(Benchmark):
""" A set of JavaMicrobenchmarkHarnessObservations. """

def __init__(self, name, runs):
""" Initialize a JavaMicrobenchmarkHarness.

Parameters
----------
name: str
Name of the benchmark
forks: int
warmups: int
runs: int
runs: list(JavaMicrobenchmarkHarnessObservation)
Repetitions of JavaMicrobenchmarkHarnessObservation run.

"""
self.name = name
self.runs = sorted(runs, key=lambda b: b.value)
unit = self.runs[0].unit
time_unit = "N/A"
less_is_better = not unit.endswith("per_second")
values = [b.value for b in self.runs]
times = []
# Slight kludge to extract the UserCounters for each benchmark
counters = self.runs[0].counters
super().__init__(name, unit, less_is_better, values, time_unit, times,
counters)

def __repr__(self):
return "JavaMicrobenchmark[name={},runs={}]".format(
self.name, self.runs)

@classmethod
def from_json(cls, payload):
def group_key(x):
return x.name

benchmarks = map(
lambda x: JavaMicrobenchmarkHarnessObservation(**x), payload)
groups = groupby(sorted(benchmarks, key=group_key), group_key)
return [cls(k, list(bs)) for k, bs in groups]
169 changes: 135 additions & 34 deletions dev/archery/archery/benchmark/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@

from .core import BenchmarkSuite
from .google import GoogleBenchmarkCommand, GoogleBenchmark
from .jmh import JavaMicrobenchmarkHarnessCommand, JavaMicrobenchmarkHarness
from ..lang.cpp import CppCMakeDefinition, CppConfiguration
from ..lang.java import JavaMavenDefinition, JavaConfiguration
from ..utils.cmake import CMakeBuild
from ..utils.maven import MavenBuild
from ..utils.logger import logger


Expand All @@ -50,40 +53,8 @@ def suites(self):

@staticmethod
def from_rev_or_path(src, root, rev_or_path, cmake_conf, **kwargs):
""" Returns a BenchmarkRunner from a path or a git revision.

First, it checks if `rev_or_path` is a valid path (or string) of a json
object that can deserialize to a BenchmarkRunner. If so, it initialize
a StaticBenchmarkRunner from it. This allows memoizing the result of a
run in a file or a string.

Second, it checks if `rev_or_path` points to a valid CMake build
directory. If so, it creates a CppBenchmarkRunner with this existing
CMakeBuild.

Otherwise, it assumes `rev_or_path` is a revision and clone/checkout
the given revision and create a fresh CMakeBuild.
"""
build = None
if StaticBenchmarkRunner.is_json_result(rev_or_path):
return StaticBenchmarkRunner.from_json(rev_or_path, **kwargs)
elif CMakeBuild.is_build_dir(rev_or_path):
build = CMakeBuild.from_path(rev_or_path)
return CppBenchmarkRunner(build, **kwargs)
else:
# Revisions can references remote via the `/` character, ensure
# that the revision is path friendly
path_rev = rev_or_path.replace("/", "_")
root_rev = os.path.join(root, path_rev)
os.mkdir(root_rev)

clone_dir = os.path.join(root_rev, "arrow")
# Possibly checkout the sources at given revision, no need to
# perform cleanup on cloned repository as root_rev is reclaimed.
src_rev, _ = src.at_revision(rev_or_path, clone_dir)
cmake_def = CppCMakeDefinition(src_rev.cpp, cmake_conf)
build_dir = os.path.join(root_rev, "build")
return CppBenchmarkRunner(cmake_def.build(build_dir), **kwargs)
raise NotImplementedError(
"BenchmarkRunner must implement from_rev_or_path")


class StaticBenchmarkRunner(BenchmarkRunner):
Expand Down Expand Up @@ -210,3 +181,133 @@ def suites(self):
continue

yield suite

@staticmethod
def from_rev_or_path(src, root, rev_or_path, cmake_conf, **kwargs):
""" Returns a BenchmarkRunner from a path or a git revision.

First, it checks if `rev_or_path` is a valid path (or string) of a json
object that can deserialize to a BenchmarkRunner. If so, it initialize
a StaticBenchmarkRunner from it. This allows memoizing the result of a
run in a file or a string.

Second, it checks if `rev_or_path` points to a valid CMake build
directory. If so, it creates a CppBenchmarkRunner with this existing
CMakeBuild.

Otherwise, it assumes `rev_or_path` is a revision and clone/checkout
the given revision and create a fresh CMakeBuild.
"""
build = None
if StaticBenchmarkRunner.is_json_result(rev_or_path):
return StaticBenchmarkRunner.from_json(rev_or_path, **kwargs)
elif CMakeBuild.is_build_dir(rev_or_path):
build = CMakeBuild.from_path(rev_or_path)
return CppBenchmarkRunner(build, **kwargs)
else:
# Revisions can references remote via the `/` character, ensure
# that the revision is path friendly
path_rev = rev_or_path.replace("/", "_")
root_rev = os.path.join(root, path_rev)
os.mkdir(root_rev)

clone_dir = os.path.join(root_rev, "arrow")
# Possibly checkout the sources at given revision, no need to
# perform cleanup on cloned repository as root_rev is reclaimed.
src_rev, _ = src.at_revision(rev_or_path, clone_dir)
cmake_def = CppCMakeDefinition(src_rev.cpp, cmake_conf)
build_dir = os.path.join(root_rev, "build")
return CppBenchmarkRunner(cmake_def.build(build_dir), **kwargs)


class JavaBenchmarkRunner(BenchmarkRunner):
""" Run suites for Java. """

# default repetitions is 5 for Java microbenchmark harness
def __init__(self, build, **kwargs):
""" Initialize a JavaBenchmarkRunner. """
self.build = build
super().__init__(**kwargs)

@staticmethod
def default_configuration(**kwargs):
""" Returns the default benchmark configuration. """
return JavaConfiguration(**kwargs)

def suite(self, name):
""" Returns the resulting benchmarks for a given suite. """
# update .m2 directory, which installs target jars
self.build.build()

suite_cmd = JavaMicrobenchmarkHarnessCommand(
self.build, self.benchmark_filter)

# Ensure there will be data
benchmark_names = suite_cmd.list_benchmarks()
if not benchmark_names:
return None

results = suite_cmd.results(repetitions=self.repetitions)
benchmarks = JavaMicrobenchmarkHarness.from_json(results)
return BenchmarkSuite(name, benchmarks)

@property
def list_benchmarks(self):
""" Returns all suite names """
# Ensure build is up-to-date to run benchmarks
self.build.build()

suite_cmd = JavaMicrobenchmarkHarnessCommand(self.build)
benchmark_names = suite_cmd.list_benchmarks()
for benchmark_name in benchmark_names:
yield "{}".format(benchmark_name)

@property
def suites(self):
""" Returns all suite for a runner. """
suite_name = "JavaBenchmark"
suite = self.suite(suite_name)

# Filter may exclude all benchmarks
if not suite:
logger.debug("Suite {} executed but no results"
.format(suite_name))
return

yield suite

@staticmethod
def from_rev_or_path(src, root, rev_or_path, maven_conf, **kwargs):
""" Returns a BenchmarkRunner from a path or a git revision.

First, it checks if `rev_or_path` is a valid path (or string) of a json
object that can deserialize to a BenchmarkRunner. If so, it initialize
a StaticBenchmarkRunner from it. This allows memoizing the result of a
run in a file or a string.

Second, it checks if `rev_or_path` points to a valid Maven build
directory. If so, it creates a JavaBenchmarkRunner with this existing
MavenBuild.

Otherwise, it assumes `rev_or_path` is a revision and clone/checkout
the given revision and create a fresh MavenBuild.
"""
if StaticBenchmarkRunner.is_json_result(rev_or_path):
return StaticBenchmarkRunner.from_json(rev_or_path, **kwargs)
elif MavenBuild.is_build_dir(rev_or_path):
maven_def = JavaMavenDefinition(rev_or_path, maven_conf)
return JavaBenchmarkRunner(maven_def.build(rev_or_path), **kwargs)
else:
# Revisions can references remote via the `/` character, ensure
# that the revision is path friendly
path_rev = rev_or_path.replace("/", "_")
root_rev = os.path.join(root, path_rev)
os.mkdir(root_rev)

clone_dir = os.path.join(root_rev, "arrow")
# Possibly checkout the sources at given revision, no need to
# perform cleanup on cloned repository as root_rev is reclaimed.
src_rev, _ = src.at_revision(rev_or_path, clone_dir)
maven_def = JavaMavenDefinition(src_rev.java, maven_conf)
build_dir = os.path.join(root_rev, "arrow/java")
return JavaBenchmarkRunner(maven_def.build(build_dir), **kwargs)
Loading