|
| 1 | +# Copyright (c) 2023, NVIDIA CORPORATION. |
| 2 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 3 | +# you may not use this file except in compliance with the License. |
| 4 | +# You may obtain a copy of the License at |
| 5 | +# |
| 6 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 7 | +# |
| 8 | +# Unless required by applicable law or agreed to in writing, software |
| 9 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 10 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 11 | +# See the License for the specific language governing permissions and |
| 12 | +# limitations under the License. |
| 13 | + |
| 14 | + |
| 15 | +import dgl |
| 16 | +import torch |
| 17 | +import pandas as pd |
| 18 | +import os |
| 19 | +import time |
| 20 | +import json |
| 21 | +import random |
| 22 | +import numpy as np |
| 23 | +from argparse import ArgumentParser |
| 24 | + |
| 25 | + |
| 26 | +def load_edges_from_disk(parquet_path, replication_factor, input_meta): |
| 27 | + """ |
| 28 | + Load the edges from disk into a graph data dictionary. |
| 29 | + Args: |
| 30 | + parquet_path: Path to the parquet directory. |
| 31 | + replication_factor: Number of times to replicate the edges. |
| 32 | + input_meta: Input meta data. |
| 33 | + Returns: |
| 34 | + dict: Dictionary of edge types to a tuple of (src, dst) |
| 35 | + """ |
| 36 | + graph_data = {} |
| 37 | + for edge_type in input_meta["num_edges"].keys(): |
| 38 | + print( |
| 39 | + f"Loading edge index for edge type {edge_type}" |
| 40 | + f"for replication factor = {replication_factor}" |
| 41 | + ) |
| 42 | + can_edge_type = tuple(edge_type.split("__")) |
| 43 | + # TODO: Rename `edge_index` to a better name |
| 44 | + ei = pd.read_parquet( |
| 45 | + os.path.join(parquet_path, edge_type, "edge_index.parquet") |
| 46 | + ) |
| 47 | + ei = { |
| 48 | + "src": torch.from_numpy(ei.src.values), |
| 49 | + "dst": torch.from_numpy(ei.dst.values), |
| 50 | + } |
| 51 | + if replication_factor > 1: |
| 52 | + src_ls = [ei["src"]] |
| 53 | + dst_ls = [ei["dst"]] |
| 54 | + for r in range(1, replication_factor): |
| 55 | + new_src = ei["src"] + ( |
| 56 | + r * input_meta["num_nodes"][can_edge_type[0]] |
| 57 | + ) |
| 58 | + src_ls.append(new_src) |
| 59 | + new_dst = ei["dst"] + ( |
| 60 | + r * input_meta["num_nodes"][can_edge_type[2]] |
| 61 | + ) |
| 62 | + dst_ls.append(new_dst) |
| 63 | + |
| 64 | + ei["src"] = torch.cat(src_ls).contiguous() |
| 65 | + ei["dst"] = torch.cat(dst_ls).contiguous() |
| 66 | + graph_data[can_edge_type] = ei["src"], ei["dst"] |
| 67 | + print("Graph Data compiled") |
| 68 | + return graph_data |
| 69 | + |
| 70 | + |
| 71 | +def load_node_labels(dataset_path, replication_factor, input_meta): |
| 72 | + num_nodes_dict = { |
| 73 | + node_type: t * replication_factor |
| 74 | + for node_type, t in input_meta["num_nodes"].items() |
| 75 | + } |
| 76 | + node_data = {} |
| 77 | + for node_type in input_meta["num_nodes"].keys(): |
| 78 | + node_data[node_type] = {} |
| 79 | + label_path = os.path.join( |
| 80 | + dataset_path, "parquet", node_type, "node_label.parquet" |
| 81 | + ) |
| 82 | + if os.path.exists(label_path): |
| 83 | + node_label = pd.read_parquet(label_path) |
| 84 | + if replication_factor > 1: |
| 85 | + base_num_nodes = input_meta["num_nodes"][node_type] |
| 86 | + dfr = pd.DataFrame( |
| 87 | + { |
| 88 | + "node": pd.concat( |
| 89 | + [ |
| 90 | + node_label.node + (r * base_num_nodes) |
| 91 | + for r in range(1, replication_factor) |
| 92 | + ] |
| 93 | + ), |
| 94 | + "label": pd.concat( |
| 95 | + [ |
| 96 | + node_label.label |
| 97 | + for r in range(1, replication_factor) |
| 98 | + ] |
| 99 | + ), |
| 100 | + } |
| 101 | + ) |
| 102 | + node_label = pd.concat([node_label, dfr]).reset_index( |
| 103 | + drop=True |
| 104 | + ) |
| 105 | + |
| 106 | + node_label_tensor = torch.full( |
| 107 | + (num_nodes_dict[node_type],), -1, dtype=torch.float32 |
| 108 | + ) |
| 109 | + node_label_tensor[ |
| 110 | + torch.as_tensor(node_label.node.values) |
| 111 | + ] = torch.as_tensor(node_label.label.values) |
| 112 | + |
| 113 | + del node_label |
| 114 | + node_data[node_type]["train_idx"] = ( |
| 115 | + (node_label_tensor > -1).contiguous().nonzero().view(-1) |
| 116 | + ) |
| 117 | + node_data[node_type]["y"] = node_label_tensor.contiguous() |
| 118 | + else: |
| 119 | + node_data[node_type]["num_nodes"] = num_nodes_dict[node_type] |
| 120 | + return node_data |
| 121 | + |
| 122 | + |
| 123 | +def create_dgl_graph_from_disk(dataset_path, replication_factor=1): |
| 124 | + """ |
| 125 | + Create a DGL graph from a dataset on disk. |
| 126 | + Args: |
| 127 | + dataset_path: Path to the dataset on disk. |
| 128 | + replication_factor: Number of times to replicate the edges. |
| 129 | + Returns: |
| 130 | + DGLGraph: DGLGraph with the loaded dataset. |
| 131 | + """ |
| 132 | + with open(os.path.join(dataset_path, "meta.json"), "r") as f: |
| 133 | + input_meta = json.load(f) |
| 134 | + |
| 135 | + parquet_path = os.path.join(dataset_path, "parquet") |
| 136 | + graph_data = load_edges_from_disk( |
| 137 | + parquet_path, replication_factor, input_meta |
| 138 | + ) |
| 139 | + node_data = load_node_labels(dataset_path, replication_factor, input_meta) |
| 140 | + g = dgl.heterograph(graph_data) |
| 141 | + |
| 142 | + return g, node_data |
| 143 | + |
| 144 | + |
| 145 | +def create_dataloader(g, train_idx, batch_size, fanouts, use_uva): |
| 146 | + """ |
| 147 | + Create a DGL dataloader from a DGL graph. |
| 148 | + Args: |
| 149 | + g: DGLGraph to create the dataloader from. |
| 150 | + train_idx: Tensor containing the training indices. |
| 151 | + batch_size: Batch size to use for the dataloader. |
| 152 | + fanouts: List of fanouts to use for the dataloader. |
| 153 | + use_uva: Whether to use unified virtual address space. |
| 154 | + Returns: |
| 155 | + DGLGraph: DGLGraph with the loaded dataset. |
| 156 | + """ |
| 157 | + |
| 158 | + print("Creating dataloader", flush=True) |
| 159 | + st = time.time() |
| 160 | + if use_uva: |
| 161 | + train_idx = {k: v.to("cuda") for k, v in train_idx.items()} |
| 162 | + sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts=fanouts) |
| 163 | + dataloader = dgl.dataloading.DataLoader( |
| 164 | + g, |
| 165 | + train_idx, |
| 166 | + sampler, |
| 167 | + num_workers=0, |
| 168 | + batch_size=batch_size, |
| 169 | + use_uva=use_uva, |
| 170 | + shuffle=False, |
| 171 | + drop_last=False, |
| 172 | + ) |
| 173 | + et = time.time() |
| 174 | + print(f"Time to create dataloader = {et - st:.2f} seconds") |
| 175 | + return dataloader |
| 176 | + |
| 177 | + |
| 178 | +def dataloading_benchmark(g, train_idx, fanouts, batch_sizes, use_uva): |
| 179 | + """ |
| 180 | + Run the dataloading benchmark. |
| 181 | + Args: |
| 182 | + g: DGLGraph |
| 183 | + train_idx: Tensor containing the training indices. |
| 184 | + fanouts: List of fanouts to use for the dataloader. |
| 185 | + batch_sizes: List of batch sizes to use for the dataloader. |
| 186 | + use_uva: Whether to use unified virtual address space. |
| 187 | + """ |
| 188 | + time_ls = [] |
| 189 | + for fanout in fanouts: |
| 190 | + for batch_size in batch_sizes: |
| 191 | + dataloader = create_dataloader( |
| 192 | + g, |
| 193 | + train_idx, |
| 194 | + batch_size=batch_size, |
| 195 | + fanouts=fanout, |
| 196 | + use_uva=use_uva, |
| 197 | + ) |
| 198 | + dataloading_st = time.time() |
| 199 | + for input_nodes, output_nodes, blocks in dataloader: |
| 200 | + pass |
| 201 | + dataloading_et = time.time() |
| 202 | + dataloading_time = dataloading_et - dataloading_st |
| 203 | + time_d = { |
| 204 | + "fanout": fanout, |
| 205 | + "batch_size": batch_size, |
| 206 | + "dataloading_time_per_epoch": dataloading_time, |
| 207 | + "dataloading_time_per_batch": dataloading_time / len(dataloader), |
| 208 | + "num_edges": g.num_edges(), |
| 209 | + "num_batches": len(dataloader), |
| 210 | + } |
| 211 | + time_ls.append(time_d) |
| 212 | + |
| 213 | + print("Dataloading completed") |
| 214 | + print(f"Fanout = {fanout}, batch_size = {batch_size}") |
| 215 | + print( |
| 216 | + f"Time taken {dataloading_time:.2f} ", |
| 217 | + f"seconds for num batches {len(dataloader)}", |
| 218 | + flush=True, |
| 219 | + ) |
| 220 | + print("==============================================") |
| 221 | + return time_ls |
| 222 | + |
| 223 | +def set_seed(seed): |
| 224 | + random.seed(seed) |
| 225 | + np.random.seed(seed) |
| 226 | + torch.manual_seed(seed) |
| 227 | + torch.cuda.manual_seed_all(seed) |
| 228 | + |
| 229 | +if __name__ == "__main__": |
| 230 | + parser = ArgumentParser() |
| 231 | + parser.add_argument( |
| 232 | + "--dataset_path", type=str, default="/datasets/abarghi/ogbn_papers100M" |
| 233 | + ) |
| 234 | + parser.add_argument("--replication_factors", type=str, default="1,2,4,8") |
| 235 | + parser.add_argument( |
| 236 | + "--fanouts", type=str, default="25_25,10_10_10,5_10_20" |
| 237 | + ) |
| 238 | + parser.add_argument("--batch_sizes", type=str, default="512,1024") |
| 239 | + parser.add_argument("--do_not_use_uva", action="store_true") |
| 240 | + parser.add_argument("--seed", type=int, default=42) |
| 241 | + args = parser.parse_args() |
| 242 | + |
| 243 | + if args.do_not_use_uva: |
| 244 | + use_uva = False |
| 245 | + else: |
| 246 | + use_uva = True |
| 247 | + set_seed(args.seed) |
| 248 | + replication_factors = [int(x) for x in args.replication_factors.split(",")] |
| 249 | + fanouts = [[int(y) for y in x.split("_")] for x in args.fanouts.split(",")] |
| 250 | + batch_sizes = [int(x) for x in args.batch_sizes.split(",")] |
| 251 | + |
| 252 | + print("Running dgl dataloading benchmark with the following parameters:") |
| 253 | + print(f"Dataset path = {args.dataset_path}") |
| 254 | + print(f"Replication factors = {replication_factors}") |
| 255 | + print(f"Fanouts = {fanouts}") |
| 256 | + print(f"Batch sizes = {batch_sizes}") |
| 257 | + print(f"Use UVA = {use_uva}") |
| 258 | + print("==============================================") |
| 259 | + |
| 260 | + time_ls = [] |
| 261 | + for replication_factor in replication_factors: |
| 262 | + st = time.time() |
| 263 | + g, node_data = create_dgl_graph_from_disk( |
| 264 | + dataset_path=args.dataset_path, |
| 265 | + replication_factor=replication_factor, |
| 266 | + ) |
| 267 | + et = time.time() |
| 268 | + print(f"Replication factor = {replication_factor}") |
| 269 | + print( |
| 270 | + f"G has {g.num_edges()} edges and took", |
| 271 | + f" {et - st:.2f} seconds to load" |
| 272 | + ) |
| 273 | + train_idx = {"paper": node_data["paper"]["train_idx"]} |
| 274 | + r_time_ls = dataloading_benchmark( |
| 275 | + g, train_idx, fanouts, batch_sizes, use_uva=use_uva |
| 276 | + ) |
| 277 | + print( |
| 278 | + "Benchmark completed for replication factor = ", replication_factor |
| 279 | + ) |
| 280 | + print("==============================================") |
| 281 | + # Add replication factor to the time list |
| 282 | + [ |
| 283 | + x.update({"replication_factor": replication_factor}) |
| 284 | + for x in r_time_ls |
| 285 | + ] |
| 286 | + time_ls.extend(r_time_ls) |
| 287 | + |
| 288 | + df = pd.DataFrame(time_ls) |
| 289 | + df.to_csv("dgl_dataloading_benchmark.csv", index=False) |
| 290 | + print("Benchmark completed for all replication factors") |
| 291 | + print("==============================================") |
0 commit comments