|
| 1 | +# Copyright (c) 2021, NVIDIA CORPORATION. |
| 2 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 3 | +# you may not use this file except in compliance with the License. |
| 4 | +# You may obtain a copy of the License at |
| 5 | +# |
| 6 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 7 | +# |
| 8 | +# Unless required by applicable law or agreed to in writing, software |
| 9 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 10 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 11 | +# See the License for the specific language governing permissions and |
| 12 | +# limitations under the License. |
| 13 | + |
| 14 | +import sys |
| 15 | +import time |
| 16 | +from functools import wraps |
| 17 | + |
| 18 | + |
| 19 | +class BenchmarkedResult: |
| 20 | + """ |
| 21 | + Class to hold results (the return value of the callable being benchmarked |
| 22 | + and meta-data about the benchmarked function) of a benchmarked function run. |
| 23 | + """ |
| 24 | + def __init__(self, name, retval, runtime, params=None): |
| 25 | + self.name = name |
| 26 | + self.retval = retval |
| 27 | + self.runtime = runtime |
| 28 | + self.params = params or {} |
| 29 | + self.validator_result = True |
| 30 | + |
| 31 | + |
| 32 | +def benchmark(func): |
| 33 | + """ |
| 34 | + Returns a callable/closure that wraps func with code to time the func call |
| 35 | + and return a BenchmarkedResult. The resulting callable takes the same |
| 36 | + args/kwargs as func. |
| 37 | +
|
| 38 | + The BenchmarkedResult will have its params value assigned from the kwargs |
| 39 | + dictionary, but the func positional args are not captured. If a user needs |
| 40 | + the params captured for reporting purposes, they must use kwargs. This is |
| 41 | + useful since positional args can be used for args that would not be |
| 42 | + meaningful in a benchmark result as a param to the benchmark. |
| 43 | +
|
| 44 | + This can be used as a function decorator or a standalone function to wrap |
| 45 | + functions to benchmark. |
| 46 | + """ |
| 47 | + benchmark_name = getattr(func, "benchmark_name", func.__name__) |
| 48 | + @wraps(func) |
| 49 | + def benchmark_wrapper(*func_args, **func_kwargs): |
| 50 | + t1 = time.perf_counter() |
| 51 | + retval = func(*func_args, **func_kwargs) |
| 52 | + t2 = time.perf_counter() |
| 53 | + return BenchmarkedResult(name=benchmark_name, |
| 54 | + retval=retval, |
| 55 | + runtime=(t2-t1), |
| 56 | + params=func_kwargs, |
| 57 | + ) |
| 58 | + |
| 59 | + # Assign the name to the returned callable as well for use in debug prints, |
| 60 | + # etc. |
| 61 | + benchmark_wrapper.name = benchmark_name |
| 62 | + return benchmark_wrapper |
| 63 | + |
| 64 | + |
| 65 | +class BenchmarkRun: |
| 66 | + """ |
| 67 | + Represents a benchmark "run", which can be executed by calling the run() |
| 68 | + method, and results are saved as BenchmarkedResult instances in the results |
| 69 | + list member. |
| 70 | + """ |
| 71 | + def __init__(self, |
| 72 | + input_dataframe, |
| 73 | + construct_graph_func, |
| 74 | + algo_func_param_list, |
| 75 | + algo_validator_list=None |
| 76 | + ): |
| 77 | + self.input_dataframe = input_dataframe |
| 78 | + |
| 79 | + if type(construct_graph_func) is tuple: |
| 80 | + (construct_graph_func, |
| 81 | + self.construct_graph_func_args) = construct_graph_func |
| 82 | + else: |
| 83 | + self.construct_graph_func_args = None |
| 84 | + |
| 85 | + # Create benchmark instances for each algo/func to be timed. |
| 86 | + # FIXME: need to accept and save individual algo args |
| 87 | + self.construct_graph = benchmark(construct_graph_func) |
| 88 | + |
| 89 | + #add starting node to algos: BFS and SSSP |
| 90 | + for i, algo in enumerate (algo_func_param_list): |
| 91 | + if benchmark(algo).name in ["bfs", "sssp"]: |
| 92 | + param={} |
| 93 | + param["start"]=self.input_dataframe['src'].head()[0] |
| 94 | + algo_func_param_list[i]=(algo,)+(param,) |
| 95 | + |
| 96 | + self.algos = [] |
| 97 | + for item in algo_func_param_list: |
| 98 | + if type(item) is tuple: |
| 99 | + (algo, params) = item |
| 100 | + else: |
| 101 | + (algo, params) = (item, {}) |
| 102 | + self.algos.append((benchmark(algo), params)) |
| 103 | + |
| 104 | + self.validators = algo_validator_list or [None] * len(self.algos) |
| 105 | + self.results = [] |
| 106 | + |
| 107 | + |
| 108 | + @staticmethod |
| 109 | + def __log(s, end="\n"): |
| 110 | + print(s, end=end) |
| 111 | + sys.stdout.flush() |
| 112 | + |
| 113 | + |
| 114 | + def run(self): |
| 115 | + """ |
| 116 | + Run and time the graph construction step, then run and time each algo. |
| 117 | + """ |
| 118 | + self.results = [] |
| 119 | + |
| 120 | + self.__log(f"running {self.construct_graph.name}...", end="") |
| 121 | + result = self.construct_graph(self.input_dataframe, |
| 122 | + *self.construct_graph_func_args) |
| 123 | + self.__log("done.") |
| 124 | + G = result.retval |
| 125 | + self.results.append(result) |
| 126 | + |
| 127 | + #algos with transposed=True : PageRank, Katz |
| 128 | + #algos with transposed=False: BFS, SSSP, Louvain |
| 129 | + for i in range(len(self.algos)): |
| 130 | + if self.algos[i][0].name in ["pagerank", "katz"]: #set transpose=True when renumbering |
| 131 | + if self.algos[i][0].name == "katz" and self.construct_graph.name == "from_dask_cudf_edgelist": |
| 132 | + largest_out_degree = G.out_degree().compute().\ |
| 133 | + nlargest(n=1, columns="degree") #compute outdegree before renumbering because outdegree has transpose=False |
| 134 | + largest_out_degree = largest_out_degree["degree"].iloc[0] |
| 135 | + katz_alpha = 1 / (largest_out_degree + 1) |
| 136 | + self.algos[i][1]["alpha"] = katz_alpha |
| 137 | + elif self.algos[i][0].name == "katz" and self.construct_graph.name == "from_cudf_edgelist": |
| 138 | + largest_out_degree = G.out_degree().nlargest(n=1, columns="degree") |
| 139 | + largest_out_degree = largest_out_degree["degree"].iloc[0] |
| 140 | + katz_alpha = 1 / (largest_out_degree + 1) |
| 141 | + self.algos[i][1]["alpha"] = katz_alpha |
| 142 | + if hasattr(G, "compute_renumber_edge_list"): |
| 143 | + G.compute_renumber_edge_list(transposed=True) |
| 144 | + else: #set transpose=False when renumbering |
| 145 | + self.__log("running compute_renumber_edge_list...", end="") |
| 146 | + if hasattr(G, "compute_renumber_edge_list"): |
| 147 | + G.compute_renumber_edge_list(transposed=False) |
| 148 | + self.__log("done.") |
| 149 | + # FIXME: need to handle individual algo args |
| 150 | + for ((algo, params), validator) in zip(self.algos, self.validators): |
| 151 | + self.__log(f"running {algo.name} (warmup)...", end="") |
| 152 | + algo(G, **params) |
| 153 | + self.__log("done.") |
| 154 | + self.__log(f"running {algo.name}...", end="") |
| 155 | + result = algo(G, **params) |
| 156 | + self.__log("done.") |
| 157 | + |
| 158 | + if validator: |
| 159 | + result.validator_result = validator(result.retval, G) |
| 160 | + |
| 161 | + self.results.append(result) |
| 162 | + # Reclaim memory since computed algo result is no longer needed |
| 163 | + result.retval = None |
| 164 | + |
| 165 | + return False not in [r.validator_result for r in self.results] |
0 commit comments