From bc2db29569047357727477442b9fae419246c753 Mon Sep 17 00:00:00 2001 From: domfournier Date: Sun, 28 Apr 2024 15:10:04 -0700 Subject: [PATCH 01/35] Rework fem parallelization --- .../frequency_domain/simulation.py | 351 +++++++++--------- .../time_domain/simulation.py | 5 +- SimPEG/dask/utils.py | 3 +- .../natural_source/receivers.py | 12 +- 4 files changed, 193 insertions(+), 178 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index d87040bbbe..f813749636 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -2,7 +2,7 @@ from ....utils import Zero import numpy as np import scipy.sparse as sp - +from multiprocessing import cpu_count from dask import array, compute, delayed from SimPEG.dask.simulation import dask_Jvec, dask_Jtvec, dask_getJtJdiag from SimPEG.dask.utils import get_parallel_blocks @@ -18,25 +18,93 @@ Sim.clean_on_model_update = ["_Jmatrix", "_jtjdiag"] +@delayed +def evaluate_receivers(block, mesh, fields): + data = [] + for source, ind, receiver in block: + data.append(receiver.eval(source, mesh, fields).flatten()) + + return np.hstack(data) + + +def dask_dpred(self, m=None, f=None, compute_J=False): + """ + dpred(m, f=None) + Create the projected data from a model. + The fields, f, (if provided) will be used for the predicted data + instead of recalculating the fields (which may be expensive!). + + .. math:: + + d_\\text{pred} = P(f(m)) + + Where P is a projection of the fields onto the data space. + """ + if self.survey is None: + raise AttributeError( + "The survey has not yet been set and is required to compute " + "data. Please set the survey for the simulation: " + "simulation.survey = survey" + ) + + if f is None: + if m is None: + m = self.model + f, Ainv = self.fields(m, return_Ainv=compute_J) + + all_receivers = [] + + for ind, src in enumerate(self.survey.source_list): + for rx in src.receiver_list: + all_receivers.append((src, ind, rx)) + + receiver_blocks = np.array_split(all_receivers, cpu_count()) + rows = [] + for block in receiver_blocks: + n_data = np.sum(rec.nD for _, _, rec in block) + if n_data == 0: + continue + + rows.append( + array.from_delayed( + evaluate_receivers(block, self.mesh, f), + dtype=np.float64, + shape=(n_data,), + ) + ) + + data = array.hstack(rows).compute() + + if compute_J and self._Jmatrix is None: + Jmatrix = self.compute_J(f=f, Ainv=Ainv) + return data, Jmatrix + + return data + + +Sim.dpred = dask_dpred +Sim.field_derivs = None + + def fields(self, m=None, return_Ainv=False): if m is not None: self.model = m f = self.fieldsPair(self) - Ainv = [] + Ainv = {} for freq in self.survey.frequencies: A = self.getA(freq) rhs = self.getRHS(freq) Ainv_solve = self.solver(sp.csr_matrix(A), **self.solver_opts) u = Ainv_solve * rhs - Srcs = self.survey.get_sources_by_frequency(freq) - f[Srcs, self._solutionType] = u - - Ainv_solve.clean() + sources = self.survey.get_sources_by_frequency(freq) + f[sources, self._solutionType] = u if return_Ainv: - Ainv += [self.solver(sp.csr_matrix(A.T), **self.solver_opts)] + Ainv[freq] = Ainv_solve + else: + Ainv_solve.clean() if return_Ainv: return f, Ainv @@ -51,95 +119,65 @@ def compute_J(self, f=None, Ainv=None): if f is None: f, Ainv = self.fields(self.model, return_Ainv=True) + if len(Ainv) > 1: + raise NotImplementedError( + "Current implementation of parallelization assumes a single frequency per simulation. " + "Consider creating one misfit per frequency." + ) + + A_i = list(Ainv.values())[0] m_size = self.model.size - Jmatrix = np.zeros((self.survey.nD, self.model.size), dtype=np.float32) - blocks = get_parallel_blocks( - self.survey.source_list, self.model.shape[0], self.max_chunk_size - ) + + if self.store_sensitivities == "disk": + Jmatrix = zarr.open( + self.sensitivity_path, + mode="w", + shape=(self.survey.nD, m_size), + chunks=(row_chunks, m_size), + ) + else: + Jmatrix = np.zeros((self.survey.nD, m_size), dtype=np.float32) + + compute_row_size = np.ceil(self.max_chunk_size / (A_i.A.shape[0] * 32.0 * 1e-6)) + blocks = get_parallel_blocks(self.survey.source_list, compute_row_size) count = 0 - block_count = 0 - col_chunks = None - for A_i, freq in zip(Ainv, self.survey.frequencies): - sources = [] - blocks_dfduT = [] - blocks_dfdmT = [] - block_count = 0 - - for ss, src in enumerate(self.survey.get_sources_by_frequency(freq)): - u_src = f[src, self._solutionType] - - if col_chunks is None: - col_chunks = int( - np.ceil( - float(self.survey.nD) - / np.ceil( - float(u_src.shape[0]) - * self.survey.nD - * 8.0 - * 1e-6 - / self.max_chunk_size - ) - ) - ) + fields_array = f[:, self._solutionType] + addresses = [] + blocks_receiver_derivs = [] - for rx in src.receiver_list: - v = np.eye(rx.nD, dtype=float) - n_blocs = np.ceil(u_src.shape[1] * rx.nD / col_chunks) - - for block in np.array_split(v, n_blocs, axis=1): - if block.shape[1] == 0: - continue - - block_count += block.shape[1] * u_src.shape[1] - blocks_dfduT.append( - array.from_delayed( - dfduT(src, rx, self.mesh, f, block), - dtype=np.float32, - shape=(u_src.shape[0], block.shape[1] * u_src.shape[1]), - ) - ) - blocks_dfdmT.append( - dfdmT(src, rx, self.mesh, f, block), - ) - sources.append(src) - - if block_count >= (col_chunks): - count = parallel_block_compute( - self, - A_i, - Jmatrix, - freq, - f, - sources, - blocks_dfduT, - blocks_dfdmT, - count, - m_size, - u_src.shape, - self._solutionType, - ) - blocks_dfduT = [] - blocks_dfdmT = [] - sources = [] - block_count = 0 - - if blocks_dfduT: - count = parallel_block_compute( - self, - A_i, - Jmatrix, - freq, - f, - sources, - blocks_dfduT, - blocks_dfdmT, - count, - m_size, - u_src.shape, - self._solutionType, - ) + for block in blocks: + chunk_ind = [0] + + for address in block: + src = self.survey.source_list[address[0][0]] + rx = src.receiver_list[address[0][1]] + # v = sp.diags(np.ones(rx.nD), dtype=float, format="csr")[:, address[1][0]] + v = np.eye(rx.nD, dtype=float)[:, address[1][0]] + + blocks_receiver_derivs.append(receiver_derivs(src, rx, self.mesh, f, v)) + + count += len(address[1][0]) + addresses.append(address) + + if count > compute_row_size * cpu_count(): + Jmatrix = parallel_block_compute( + self, Jmatrix, blocks_receiver_derivs, A_i, fields_array, addresses + ) + addresses = [] + blocks_receiver_derivs = [] + count = 0 + + if blocks_receiver_derivs: + Jmatrix = parallel_block_compute( + self, + Jmatrix, + blocks_receiver_derivs, + Ainv[src.frequency], + fields_array, + addresses, + ) - for A in Ainv: + for A in Ainv.values(): A.clean() if self.store_sensitivities == "disk": @@ -152,26 +190,66 @@ def compute_J(self, f=None, Ainv=None): Sim.compute_J = compute_J -@delayed -def dfduT(source, receiver, mesh, fields, block): - dfduT, _ = receiver.evalDeriv(source, mesh, fields, v=block, adjoint=True) +def parallel_block_compute( + self, Jmatrix, blocks_receiver_derivs, A_i, fields_array, addresses +): + m_size = self.model.size + eval = compute(blocks_receiver_derivs)[0] + blocks_dfduT, blocks_dfdmT = [], [] + for dfduT, dfdmT in eval: + blocks_dfduT.append(dfduT) + blocks_dfdmT.append(dfdmT) + + ATinvdf_duT = (A_i * array.hstack(blocks_dfduT).compute()).reshape( + (fields_array.shape[0], -1) + ) + count = 0 + rows = [] + block_delayed = [] + for address, dfdmT in zip(addresses, blocks_dfdmT): + n_rows = len(address[1][0]) + src = self.survey.source_list[address[0][0]] + u_src = fields_array[:, address[0][0]] + block_delayed.append( + array.from_delayed( + delayed(eval_block, pure=True)( + self, ATinvdf_duT[:, count : count + n_rows], dfdmT, u_src, src + ), + dtype=np.float32, + shape=(len(address[1][1]), m_size), + ) + ) + count += n_rows + rows.append(address[1][1]) + + indices = np.hstack(rows) + + if self.store_sensitivities == "disk": + Jmatrix.set_orthogonal_selection( + (np.r_[rows], slice(None)), + array.vstack(block_delayed).compute(), + ) + else: + Jmatrix[indices, :] = array.vstack(block_delayed).compute() - return dfduT + return Jmatrix @delayed -def dfdmT(source, receiver, mesh, fields, block): - _, dfdmT = receiver.evalDeriv(source, mesh, fields, v=block, adjoint=True) +def receiver_derivs(source, receiver, mesh, fields, block): + dfduT, dfdmT = receiver.evalDeriv(source, mesh, fields, v=block, adjoint=True) - return dfdmT + return dfduT, dfdmT -def eval_block(simulation, Ainv_deriv_u, frequency, deriv_m, fields, source): +def eval_block(simulation, Ainv_deriv_u, deriv_m, fields, source): """ Evaluate the sensitivities for the block or data and store to zarr """ - dA_dmT = simulation.getADeriv(frequency, fields, Ainv_deriv_u, adjoint=True) - dRHS_dmT = simulation.getRHSDeriv(frequency, source, Ainv_deriv_u, adjoint=True) + dA_dmT = simulation.getADeriv(source.frequency, fields, Ainv_deriv_u, adjoint=True) + dRHS_dmT = simulation.getRHSDeriv( + source.frequency, source, Ainv_deriv_u, adjoint=True + ) du_dmT = -dA_dmT if not isinstance(dRHS_dmT, Zero): du_dmT += dRHS_dmT @@ -179,72 +257,3 @@ def eval_block(simulation, Ainv_deriv_u, frequency, deriv_m, fields, source): du_dmT += deriv_m return np.array(du_dmT, dtype=complex).reshape((du_dmT.shape[0], -1)).real.T - - -def parallel_block_compute( - simulation, - A_i, - Jmatrix, - freq, - fields, - sources, - blocks_deriv_u, - blocks_deriv_m, - counter, - m_size, - f_shape, - solution_type, -): - field_derivs = array.hstack(blocks_deriv_u).compute() - - # Direct-solver call - ATinvdf_duT = (A_i * field_derivs).reshape((f_shape[0], -1)) - - # Even split - split = np.cumsum([block.shape[1] for block in blocks_deriv_u])[:-1] - sub_blocks_deriv_u = np.array_split(ATinvdf_duT, split, axis=1) - - if isinstance(compute(blocks_deriv_m[0])[0], Zero): - sub_blocks_dfdmt = [Zero()] * len(sub_blocks_deriv_u) - else: - compute_blocks_deriv_m = array.hstack( - [ - array.from_delayed( - dfdmT_block, - dtype=np.float32, - shape=(f_shape[0], dfdmT_block.shape[1] * f_shape[1]), - ) - for dfdmT_block in blocks_deriv_m - ] - ).compute() - sub_blocks_dfdmt = np.array_split(compute_blocks_deriv_m, split, axis=1) - - sub_process = [] - - for sub_block_dfduT, sub_block_dfdmT, src in zip( - sub_blocks_deriv_u, sub_blocks_dfdmt, sources - ): - u_src = fields[src, solution_type] - row_size = int(sub_block_dfduT.shape[1] / f_shape[1]) - sub_process.append( - array.from_delayed( - delayed(eval_block, pure=True)( - simulation, sub_block_dfduT, freq, sub_block_dfdmT, u_src, src - ), - dtype=np.float32, - shape=(row_size, m_size), - ) - ) - - block = array.vstack(sub_process).compute() - - if simulation.store_sensitivities == "disk": - Jmatrix.set_orthogonal_selection( - (np.arange(counter, counter + block.shape[0]), slice(None)), - block.astype(np.float32), - ) - else: - Jmatrix[counter : counter + block.shape[0], :] = block.astype(np.float32) - - counter += block.shape[0] - return counter diff --git a/SimPEG/dask/electromagnetics/time_domain/simulation.py b/SimPEG/dask/electromagnetics/time_domain/simulation.py index 7307531eb8..aefff85f7f 100644 --- a/SimPEG/dask/electromagnetics/time_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/time_domain/simulation.py @@ -534,9 +534,8 @@ def compute_J(self, f=None, Ainv=None): simulation_times = np.r_[0, np.cumsum(self.time_steps)] + self.t0 data_times = self.survey.source_list[0].receiver_list[0].times - blocks = get_parallel_blocks( - self.survey.source_list, self.model.shape[0], self.max_chunk_size - ) + compute_row_size = np.ceil(self.max_chunk_size / (self.model.shape[0] * 8.0 * 1e-6)) + blocks = get_parallel_blocks(self.survey.source_list, compute_row_size) fields_array = f[:, ftype, :] if len(self.survey.source_list) == 1: diff --git a/SimPEG/dask/utils.py b/SimPEG/dask/utils.py index 558fb08aa3..7e211710d8 100644 --- a/SimPEG/dask/utils.py +++ b/SimPEG/dask/utils.py @@ -40,7 +40,7 @@ def compute(self, job): return job.compute() -def get_parallel_blocks(source_list: list, m_size: int, max_chunk_size: int) -> list: +def get_parallel_blocks(source_list: list, data_block_size) -> list: """ Get the blocks of sources and receivers to be computed in parallel. @@ -48,7 +48,6 @@ def get_parallel_blocks(source_list: list, m_size: int, max_chunk_size: int) -> (source, receiver, block index) and array of indices for the rows of the sensitivity matrix. """ - data_block_size = np.ceil(max_chunk_size / (m_size * 8.0 * 1e-6)) row_count = 0 row_index = 0 block_count = 0 diff --git a/SimPEG/electromagnetics/natural_source/receivers.py b/SimPEG/electromagnetics/natural_source/receivers.py index 9aadd894a9..669450ab91 100644 --- a/SimPEG/electromagnetics/natural_source/receivers.py +++ b/SimPEG/electromagnetics/natural_source/receivers.py @@ -489,7 +489,11 @@ def orientation(self, var): def _eval_tipper(self, src, mesh, f): # will grab both primary and secondary and sum them! - h = f[src, "h"] + + if not isinstance(f, np.ndarray): + h = f[src, "h"] + else: + h = f hx = self.getP(mesh, "Fx", "h") @ h hy = self.getP(mesh, "Fy", "h") @ h @@ -506,7 +510,11 @@ def _eval_tipper(self, src, mesh, f): def _eval_tipper_deriv(self, src, mesh, f, du_dm_v=None, v=None, adjoint=False): # will grab both primary and secondary and sum them! - h = f[src, "h"] + + if not isinstance(f, np.ndarray): + h = f[src, "h"] + else: + h = f Phx = self.getP(mesh, "Fx", "h") Phy = self.getP(mesh, "Fy", "h") From bf3b1e4c7b53b2a807bc0a67e1f01d16200a54cc Mon Sep 17 00:00:00 2001 From: domfournier Date: Sun, 28 Apr 2024 16:02:15 -0700 Subject: [PATCH 02/35] Continue work, still not right --- .../dask/electromagnetics/frequency_domain/simulation.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index f813749636..3115bc6f67 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -6,6 +6,7 @@ from dask import array, compute, delayed from SimPEG.dask.simulation import dask_Jvec, dask_Jtvec, dask_getJtJdiag from SimPEG.dask.utils import get_parallel_blocks +from SimPEG.electromagnetics.natural_source.sources import PlanewaveXYPrimary import zarr Sim.sensitivity_path = "./sensitivity/" @@ -209,7 +210,11 @@ def parallel_block_compute( for address, dfdmT in zip(addresses, blocks_dfdmT): n_rows = len(address[1][0]) src = self.survey.source_list[address[0][0]] - u_src = fields_array[:, address[0][0]] + if isinstance(src, PlanewaveXYPrimary): + u_src = fields_array + else: + u_src = fields_array[:, address[0][0]] + block_delayed.append( array.from_delayed( delayed(eval_block, pure=True)( From e19785b52f7fa073b301254a5e29b573f64565e7 Mon Sep 17 00:00:00 2001 From: domfournier Date: Sun, 28 Apr 2024 18:11:11 -0700 Subject: [PATCH 03/35] Fix issue wit dimensions. All tests pass --- .../dask/electromagnetics/frequency_domain/simulation.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 3115bc6f67..9d8b403e54 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -207,8 +207,8 @@ def parallel_block_compute( count = 0 rows = [] block_delayed = [] - for address, dfdmT in zip(addresses, blocks_dfdmT): - n_rows = len(address[1][0]) + for address, dfdmT, dfduT in zip(addresses, blocks_dfdmT, blocks_dfduT): + n_cols = dfduT.shape[1] src = self.survey.source_list[address[0][0]] if isinstance(src, PlanewaveXYPrimary): u_src = fields_array @@ -218,13 +218,13 @@ def parallel_block_compute( block_delayed.append( array.from_delayed( delayed(eval_block, pure=True)( - self, ATinvdf_duT[:, count : count + n_rows], dfdmT, u_src, src + self, ATinvdf_duT[:, count : count + n_cols], dfdmT, u_src, src ), dtype=np.float32, shape=(len(address[1][1]), m_size), ) ) - count += n_rows + count += n_cols rows.append(address[1][1]) indices = np.hstack(rows) From 236224f794e043e5ceb4be1cb67f116b06e5d05b Mon Sep 17 00:00:00 2001 From: domfournier Date: Sun, 28 Apr 2024 18:36:55 -0700 Subject: [PATCH 04/35] Reduce chunksize estimate --- SimPEG/dask/electromagnetics/frequency_domain/simulation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 9d8b403e54..38bd2d139f 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -139,7 +139,7 @@ def compute_J(self, f=None, Ainv=None): else: Jmatrix = np.zeros((self.survey.nD, m_size), dtype=np.float32) - compute_row_size = np.ceil(self.max_chunk_size / (A_i.A.shape[0] * 32.0 * 1e-6)) + compute_row_size = np.ceil(self.max_chunk_size / (A_i.A.shape[0] * 16.0 * 1e-6)) blocks = get_parallel_blocks(self.survey.source_list, compute_row_size) count = 0 fields_array = f[:, self._solutionType] From 8a3dc9f001a0063de8cd606c5f988dd89d2fe781 Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 07:57:44 -0700 Subject: [PATCH 05/35] Add prints for profiling --- .../frequency_domain/simulation.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 38bd2d139f..859797c34a 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -8,6 +8,7 @@ from SimPEG.dask.utils import get_parallel_blocks from SimPEG.electromagnetics.natural_source.sources import PlanewaveXYPrimary import zarr +from time import time Sim.sensitivity_path = "./sensitivity/" Sim.gtgdiag = None @@ -194,19 +195,29 @@ def compute_J(self, f=None, Ainv=None): def parallel_block_compute( self, Jmatrix, blocks_receiver_derivs, A_i, fields_array, addresses ): + print("In parallel block") m_size = self.model.size + + tc = time() + print(f"Compute blocks_receiver_derivs") eval = compute(blocks_receiver_derivs)[0] + print(f"Compute blocks_receiver_derivs time: {time() - tc}") blocks_dfduT, blocks_dfdmT = [], [] for dfduT, dfdmT in eval: blocks_dfduT.append(dfduT) blocks_dfdmT.append(dfdmT) + tc = time() + print(f"Compute direct solver") ATinvdf_duT = (A_i * array.hstack(blocks_dfduT).compute()).reshape( (fields_array.shape[0], -1) ) + print(f"Compute direct solver time: {time() - tc}") count = 0 rows = [] block_delayed = [] + tc = time() + print("Loop over addresses") for address, dfdmT, dfduT in zip(addresses, blocks_dfdmT, blocks_dfduT): n_cols = dfduT.shape[1] src = self.survey.source_list[address[0][0]] @@ -227,6 +238,8 @@ def parallel_block_compute( count += n_cols rows.append(address[1][1]) + print(f"Loop over addresses time: {time() - tc}") + indices = np.hstack(rows) if self.store_sensitivities == "disk": @@ -235,7 +248,10 @@ def parallel_block_compute( array.vstack(block_delayed).compute(), ) else: + tc = time() + print("Compute Jmatrix") Jmatrix[indices, :] = array.vstack(block_delayed).compute() + print(f"Compute Jmatrix time: {time() - tc}") return Jmatrix From 1fb556b0a44acac3a9e9cc7a9c2ea70ea7929afa Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 08:25:35 -0700 Subject: [PATCH 06/35] More prints --- .../electromagnetics/frequency_domain/simulation.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 859797c34a..6505896ef2 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -143,7 +143,7 @@ def compute_J(self, f=None, Ainv=None): compute_row_size = np.ceil(self.max_chunk_size / (A_i.A.shape[0] * 16.0 * 1e-6)) blocks = get_parallel_blocks(self.survey.source_list, compute_row_size) count = 0 - fields_array = f[:, self._solutionType] + fields_array = delayed(f[:, self._solutionType]) addresses = [] blocks_receiver_derivs = [] @@ -207,11 +207,14 @@ def parallel_block_compute( blocks_dfduT.append(dfduT) blocks_dfdmT.append(dfdmT) + tc = time() + print(f"Compute block stack") + block_stack = array.hstack(blocks_dfduT).compute() + print(f"Compute block stack time: {time() - tc}") + tc = time() print(f"Compute direct solver") - ATinvdf_duT = (A_i * array.hstack(blocks_dfduT).compute()).reshape( - (fields_array.shape[0], -1) - ) + ATinvdf_duT = (A_i * block_stack).reshape((fields_array.shape[0], -1)) print(f"Compute direct solver time: {time() - tc}") count = 0 rows = [] From c3d5da96335b02aa3e464713c061e2d7bd25e14b Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 09:45:33 -0700 Subject: [PATCH 07/35] Remove indexing of array in delayed. Move prints --- .../frequency_domain/simulation.py | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 6505896ef2..be19d0fdfb 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -209,12 +209,12 @@ def parallel_block_compute( tc = time() print(f"Compute block stack") - block_stack = array.hstack(blocks_dfduT).compute() + block_stack = np.hstack(blocks_dfduT) print(f"Compute block stack time: {time() - tc}") tc = time() print(f"Compute direct solver") - ATinvdf_duT = (A_i * block_stack).reshape((fields_array.shape[0], -1)) + ATinvdf_duT = (A_i * block_stack).reshape((A_i.A.shape[0], -1)) print(f"Compute direct solver time: {time() - tc}") count = 0 rows = [] @@ -224,15 +224,16 @@ def parallel_block_compute( for address, dfdmT, dfduT in zip(addresses, blocks_dfdmT, blocks_dfduT): n_cols = dfduT.shape[1] src = self.survey.source_list[address[0][0]] - if isinstance(src, PlanewaveXYPrimary): - u_src = fields_array - else: - u_src = fields_array[:, address[0][0]] - block_delayed.append( array.from_delayed( delayed(eval_block, pure=True)( - self, ATinvdf_duT[:, count : count + n_cols], dfdmT, u_src, src + self, + ATinvdf_duT, + np.arange(count, count + n_cols), + dfdmT, + fields_array, + src, + address[0][0], ), dtype=np.float32, shape=(len(address[1][1]), m_size), @@ -266,13 +267,22 @@ def receiver_derivs(source, receiver, mesh, fields, block): return dfduT, dfdmT -def eval_block(simulation, Ainv_deriv_u, deriv_m, fields, source): +def eval_block( + simulation, Ainv_deriv_u, deriv_indices, deriv_m, fields, source, source_ind +): """ Evaluate the sensitivities for the block or data and store to zarr """ - dA_dmT = simulation.getADeriv(source.frequency, fields, Ainv_deriv_u, adjoint=True) + if isinstance(src, PlanewaveXYPrimary): + source_fields = fields + else: + source_fields = fields[:, source_ind] + + dA_dmT = simulation.getADeriv( + source.frequency, source_fields, Ainv_deriv_u[:, deriv_indices], adjoint=True + ) dRHS_dmT = simulation.getRHSDeriv( - source.frequency, source, Ainv_deriv_u, adjoint=True + source.frequency, source, Ainv_deriv_u[:, deriv_indices], adjoint=True ) du_dmT = -dA_dmT if not isinstance(dRHS_dmT, Zero): From 658eca3fc2f72747c36da7b9e72c6a1ce83970a1 Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 09:51:06 -0700 Subject: [PATCH 08/35] Avoid reshape if already 2d array --- SimPEG/dask/electromagnetics/frequency_domain/simulation.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index be19d0fdfb..aff61b83c2 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -214,7 +214,11 @@ def parallel_block_compute( tc = time() print(f"Compute direct solver") - ATinvdf_duT = (A_i * block_stack).reshape((A_i.A.shape[0], -1)) + ATinvdf_duT = A_i * block_stack + + if ATinvdf_duT.ndim == 1: + ATinvdf_duT = ATinvdf_duT.reshape((A_i.A.shape[0], -1)) + print(f"Compute direct solver time: {time() - tc}") count = 0 rows = [] From 2370cdc010fdbad35b538681f54bbe7c89116527 Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 10:07:42 -0700 Subject: [PATCH 09/35] Pre-delay eval_block --- SimPEG/dask/electromagnetics/frequency_domain/simulation.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index aff61b83c2..6ab0da24c0 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -230,7 +230,7 @@ def parallel_block_compute( src = self.survey.source_list[address[0][0]] block_delayed.append( array.from_delayed( - delayed(eval_block, pure=True)( + eval_block( self, ATinvdf_duT, np.arange(count, count + n_cols), @@ -271,6 +271,7 @@ def receiver_derivs(source, receiver, mesh, fields, block): return dfduT, dfdmT +@delayed def eval_block( simulation, Ainv_deriv_u, deriv_indices, deriv_m, fields, source, source_ind ): From a9687c7d8953222afaf005599fec8f92b443b376 Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 10:08:45 -0700 Subject: [PATCH 10/35] Delay fields deriv array --- SimPEG/dask/electromagnetics/frequency_domain/simulation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 6ab0da24c0..f5e94859a7 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -219,6 +219,7 @@ def parallel_block_compute( if ATinvdf_duT.ndim == 1: ATinvdf_duT = ATinvdf_duT.reshape((A_i.A.shape[0], -1)) + ATinvdf_duT = delayed(ATinvdf_duT) print(f"Compute direct solver time: {time() - tc}") count = 0 rows = [] From 8d38bf850c3b2d2dedce31717abac8d41ed1f46e Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 10:21:45 -0700 Subject: [PATCH 11/35] Fix bug in variable name --- .../frequency_domain/simulation.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index f5e94859a7..fff5078f35 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -214,12 +214,7 @@ def parallel_block_compute( tc = time() print(f"Compute direct solver") - ATinvdf_duT = A_i * block_stack - - if ATinvdf_duT.ndim == 1: - ATinvdf_duT = ATinvdf_duT.reshape((A_i.A.shape[0], -1)) - - ATinvdf_duT = delayed(ATinvdf_duT) + ATinvdf_duT = delayed(A_i * block_stack) print(f"Compute direct solver time: {time() - tc}") count = 0 rows = [] @@ -279,16 +274,21 @@ def eval_block( """ Evaluate the sensitivities for the block or data and store to zarr """ - if isinstance(src, PlanewaveXYPrimary): + if isinstance(source, PlanewaveXYPrimary): source_fields = fields else: source_fields = fields[:, source_ind] + if Ainv_deriv_u.ndim == 1: + deriv_columns = Ainv_deriv_u[:, np.newaxis] + else: + deriv_columns = Ainv_deriv_u[:, deriv_indices] + dA_dmT = simulation.getADeriv( - source.frequency, source_fields, Ainv_deriv_u[:, deriv_indices], adjoint=True + source.frequency, source_fields, deriv_columns, adjoint=True ) dRHS_dmT = simulation.getRHSDeriv( - source.frequency, source, Ainv_deriv_u[:, deriv_indices], adjoint=True + source.frequency, source, deriv_columns, adjoint=True ) du_dmT = -dA_dmT if not isinstance(dRHS_dmT, Zero): From 8ee714e727b44d953dccc4fa4ca839761f0699e2 Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 11:36:18 -0700 Subject: [PATCH 12/35] Create array inside delayed call --- .../electromagnetics/frequency_domain/simulation.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index fff5078f35..c1346b23c1 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -153,11 +153,9 @@ def compute_J(self, f=None, Ainv=None): for address in block: src = self.survey.source_list[address[0][0]] rx = src.receiver_list[address[0][1]] - # v = sp.diags(np.ones(rx.nD), dtype=float, format="csr")[:, address[1][0]] - v = np.eye(rx.nD, dtype=float)[:, address[1][0]] - - blocks_receiver_derivs.append(receiver_derivs(src, rx, self.mesh, f, v)) - + blocks_receiver_derivs.append( + receiver_derivs(src, rx, self.mesh, f, address[1][0]) + ) count += len(address[1][0]) addresses.append(address) @@ -199,7 +197,7 @@ def parallel_block_compute( m_size = self.model.size tc = time() - print(f"Compute blocks_receiver_derivs") + print(f"Compute blocks_receiver_derivs {len(blocks_receiver_derivs)}") eval = compute(blocks_receiver_derivs)[0] print(f"Compute blocks_receiver_derivs time: {time() - tc}") blocks_dfduT, blocks_dfdmT = [], [] @@ -262,7 +260,8 @@ def parallel_block_compute( @delayed def receiver_derivs(source, receiver, mesh, fields, block): - dfduT, dfdmT = receiver.evalDeriv(source, mesh, fields, v=block, adjoint=True) + v = np.eye(receiver.nD, dtype=float)[:, block] + dfduT, dfdmT = receiver.evalDeriv(source, mesh, fields, v=v, adjoint=True) return dfduT, dfdmT From 550146d54692e1c9e1effd59ab470687c023faf4 Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 13:20:32 -0700 Subject: [PATCH 13/35] Keep sparse operations in derivative comps --- .../dask/electromagnetics/frequency_domain/simulation.py | 2 +- SimPEG/electromagnetics/natural_source/receivers.py | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index c1346b23c1..1d2f8ff25b 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -263,7 +263,7 @@ def receiver_derivs(source, receiver, mesh, fields, block): v = np.eye(receiver.nD, dtype=float)[:, block] dfduT, dfdmT = receiver.evalDeriv(source, mesh, fields, v=v, adjoint=True) - return dfduT, dfdmT + return dfduT.toarray(), dfdmT @delayed diff --git a/SimPEG/electromagnetics/natural_source/receivers.py b/SimPEG/electromagnetics/natural_source/receivers.py index 669450ab91..7ddb73b212 100644 --- a/SimPEG/electromagnetics/natural_source/receivers.py +++ b/SimPEG/electromagnetics/natural_source/receivers.py @@ -2,7 +2,7 @@ import numpy as np from scipy.constants import mu_0 - +import scipy.sparse as sp from ...survey import BaseRx @@ -555,7 +555,11 @@ def _eval_tipper_deriv(self, src, mesh, f, du_dm_v=None, v=None, adjoint=False): else: ghx_v += gh_v - gh_v = Phx.T @ ghx_v + Phy.T @ ghy_v + Phz.T @ ghz_v + gh_v = ( + Phx.T @ sp.csr_matrix(ghx_v) + + Phy.T @ sp.csr_matrix(ghy_v) + + Phz.T @ sp.csr_matrix(ghz_v) + ) return f._hDeriv(src, None, gh_v, adjoint=True) dh_v = f._hDeriv(src, du_dm_v, v, adjoint=False) From df7c58c3fde37c55ecd0768b5c77ba18805835e0 Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 13:26:10 -0700 Subject: [PATCH 14/35] Apply mod for AFEM --- .../dask/electromagnetics/frequency_domain/simulation.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 1d2f8ff25b..920b016939 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -260,8 +260,12 @@ def parallel_block_compute( @delayed def receiver_derivs(source, receiver, mesh, fields, block): - v = np.eye(receiver.nD, dtype=float)[:, block] - dfduT, dfdmT = receiver.evalDeriv(source, mesh, fields, v=v, adjoint=True) + if isinstance(source, PlanewaveXYPrimary): + v = np.eye(receiver.nD, dtype=float) + else: + v = sp.csr_matrix(np.ones(receiver.nD), dtype=float) + + dfduT, dfdmT = receiver.evalDeriv(source, mesh, fields, v=v[:, block], adjoint=True) return dfduT.toarray(), dfdmT From 76603a338429f58363a32da2f21716bc626fbdbf Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 14:07:00 -0700 Subject: [PATCH 15/35] TEst parallelization with returning Zero --- SimPEG/electromagnetics/frequency_domain/fields.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SimPEG/electromagnetics/frequency_domain/fields.py b/SimPEG/electromagnetics/frequency_domain/fields.py index d39dd3b039..a71477e0bb 100644 --- a/SimPEG/electromagnetics/frequency_domain/fields.py +++ b/SimPEG/electromagnetics/frequency_domain/fields.py @@ -255,7 +255,7 @@ def _hDeriv(self, src, du_dm_v, v, adjoint=False): ) if adjoint: - return (self._hDeriv_u(src, v, adjoint), self._hDeriv_m(src, v, adjoint)) + return (self._hDeriv_u(src, v, adjoint), Zero()) return np.array( self._hDeriv_u(src, du_dm_v, adjoint) + self._hDeriv_m(src, v, adjoint), dtype=complex, From 3612224591f8f66cf88955700e07ff249e10d1bf Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 14:53:13 -0700 Subject: [PATCH 16/35] Skip over derivatives on model --- .../frequency_domain/simulation.py | 40 ++++++++++++------- .../frequency_domain/fields.py | 9 ++++- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 920b016939..a86f410449 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -153,9 +153,20 @@ def compute_J(self, f=None, Ainv=None): for address in block: src = self.survey.source_list[address[0][0]] rx = src.receiver_list[address[0][1]] + + if isinstance(src, PlanewaveXYPrimary): + shape = (A_i.A.shape[0], len(address[1][0]) * 2) + else: + shape = (A_i.A.shape[0], len(address[1][0])) + blocks_receiver_derivs.append( - receiver_derivs(src, rx, self.mesh, f, address[1][0]) + array.from_delayed( + receiver_derivs(src, rx, self.mesh, f, address[1][0]), + dtype=np.complex128, + shape=shape, + ) ) + count += len(address[1][0]) addresses.append(address) @@ -196,18 +207,18 @@ def parallel_block_compute( print("In parallel block") m_size = self.model.size - tc = time() - print(f"Compute blocks_receiver_derivs {len(blocks_receiver_derivs)}") - eval = compute(blocks_receiver_derivs)[0] - print(f"Compute blocks_receiver_derivs time: {time() - tc}") - blocks_dfduT, blocks_dfdmT = [], [] - for dfduT, dfdmT in eval: - blocks_dfduT.append(dfduT) - blocks_dfdmT.append(dfdmT) + # tc = time() + # print(f"Compute blocks_receiver_derivs {len(blocks_receiver_derivs)}") + # eval = compute(blocks_receiver_derivs)[0] + # print(f"Compute blocks_receiver_derivs time: {time() - tc}") + # blocks_dfduT, blocks_dfdmT = [], [] + # for dfduT, dfdmT in eval: + # blocks_dfduT.append(dfduT) + # blocks_dfdmT.append(dfdmT) tc = time() print(f"Compute block stack") - block_stack = np.hstack(blocks_dfduT) + block_stack = array.hstack(blocks_receiver_derivs).compute() print(f"Compute block stack time: {time() - tc}") tc = time() @@ -219,7 +230,7 @@ def parallel_block_compute( block_delayed = [] tc = time() print("Loop over addresses") - for address, dfdmT, dfduT in zip(addresses, blocks_dfdmT, blocks_dfduT): + for address, dfduT in zip(addresses, blocks_receiver_derivs): n_cols = dfduT.shape[1] src = self.survey.source_list[address[0][0]] block_delayed.append( @@ -228,7 +239,7 @@ def parallel_block_compute( self, ATinvdf_duT, np.arange(count, count + n_cols), - dfdmT, + Zero(), fields_array, src, address[0][0], @@ -265,9 +276,10 @@ def receiver_derivs(source, receiver, mesh, fields, block): else: v = sp.csr_matrix(np.ones(receiver.nD), dtype=float) - dfduT, dfdmT = receiver.evalDeriv(source, mesh, fields, v=v[:, block], adjoint=True) + # Assume the derivatives in terms of model are Zero (seems to always be case) + dfduT, _ = receiver.evalDeriv(source, mesh, fields, v=v[:, block], adjoint=True) - return dfduT.toarray(), dfdmT + return dfduT.toarray() @delayed diff --git a/SimPEG/electromagnetics/frequency_domain/fields.py b/SimPEG/electromagnetics/frequency_domain/fields.py index a71477e0bb..740dd10dac 100644 --- a/SimPEG/electromagnetics/frequency_domain/fields.py +++ b/SimPEG/electromagnetics/frequency_domain/fields.py @@ -255,7 +255,14 @@ def _hDeriv(self, src, du_dm_v, v, adjoint=False): ) if adjoint: - return (self._hDeriv_u(src, v, adjoint), Zero()) + if ( + isinstance(src.s_mDeriv(self.simulation, v, adjoint), Zero) + and isinstance(src.bPrimaryDeriv(self.simulation, v, adjoint), Zero) + and isinstance(self._MfMuiDeriv(v), Zero) + ): + return self._hDeriv_u(src, v, adjoint), Zero() + + return (self._hDeriv_u(src, v, adjoint), self._hDeriv_m(src, v, adjoint)) return np.array( self._hDeriv_u(src, du_dm_v, adjoint) + self._hDeriv_m(src, v, adjoint), dtype=complex, From 3a7e4156c583a83c86ca4fa981b005b43e2f3451 Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 15:09:23 -0700 Subject: [PATCH 17/35] Increase chunk size estimate --- SimPEG/dask/electromagnetics/frequency_domain/simulation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index a86f410449..f1cb6b03d8 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -140,7 +140,7 @@ def compute_J(self, f=None, Ainv=None): else: Jmatrix = np.zeros((self.survey.nD, m_size), dtype=np.float32) - compute_row_size = np.ceil(self.max_chunk_size / (A_i.A.shape[0] * 16.0 * 1e-6)) + compute_row_size = np.ceil(self.max_chunk_size / (A_i.A.shape[0] * 32.0 * 1e-6)) blocks = get_parallel_blocks(self.survey.source_list, compute_row_size) count = 0 fields_array = delayed(f[:, self._solutionType]) @@ -217,7 +217,7 @@ def parallel_block_compute( # blocks_dfdmT.append(dfdmT) tc = time() - print(f"Compute block stack") + print(f"Compute block stack {len(blocks_receiver_derivs)}") block_stack = array.hstack(blocks_receiver_derivs).compute() print(f"Compute block stack time: {time() - tc}") From 0aed3f531cf6ab8dded7a468cac0ed4820636f03 Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 19:52:56 -0700 Subject: [PATCH 18/35] Attempt shorter blocks of derivs calcs --- .../frequency_domain/simulation.py | 86 +++++++++++-------- SimPEG/dask/utils.py | 4 +- 2 files changed, 50 insertions(+), 40 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index f1cb6b03d8..bec94c0795 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -141,52 +141,53 @@ def compute_J(self, f=None, Ainv=None): Jmatrix = np.zeros((self.survey.nD, m_size), dtype=np.float32) compute_row_size = np.ceil(self.max_chunk_size / (A_i.A.shape[0] * 32.0 * 1e-6)) - blocks = get_parallel_blocks(self.survey.source_list, compute_row_size) + blocks = get_parallel_blocks( + self.survey.source_list, compute_row_size, optimize=False + ) count = 0 fields_array = delayed(f[:, self._solutionType]) - addresses = [] - blocks_receiver_derivs = [] for block in blocks: - chunk_ind = [0] + addresses = [] + blocks_receiver_derivs = [] + chunks = np.array_split(block, np.floor(cpu_count() / 2)) + + for chunk in chunks: + if len(chunk) == 0: + continue + + n_fields = np.sum([len(block[1][0]) for block in chunk]) - for address in block: - src = self.survey.source_list[address[0][0]] - rx = src.receiver_list[address[0][1]] + shape = [A_i.A.shape[0], n_fields] - if isinstance(src, PlanewaveXYPrimary): - shape = (A_i.A.shape[0], len(address[1][0]) * 2) - else: - shape = (A_i.A.shape[0], len(address[1][0])) + if isinstance(self.survey.source_list[0], PlanewaveXYPrimary): + shape[1] *= 2 blocks_receiver_derivs.append( array.from_delayed( - receiver_derivs(src, rx, self.mesh, f, address[1][0]), + receiver_derivs(self.survey, self.mesh, f, chunk), dtype=np.complex128, shape=shape, ) ) + addresses += chunk.tolist() - count += len(address[1][0]) - addresses.append(address) - - if count > compute_row_size * cpu_count(): - Jmatrix = parallel_block_compute( - self, Jmatrix, blocks_receiver_derivs, A_i, fields_array, addresses - ) - addresses = [] - blocks_receiver_derivs = [] - count = 0 - - if blocks_receiver_derivs: Jmatrix = parallel_block_compute( - self, - Jmatrix, - blocks_receiver_derivs, - Ainv[src.frequency], - fields_array, - addresses, + self, Jmatrix, blocks_receiver_derivs, A_i, fields_array, addresses ) + # addresses = [] + # blocks_receiver_derivs = [] + # count = 0 + # + # if blocks_receiver_derivs: + # Jmatrix = parallel_block_compute( + # self, + # Jmatrix, + # blocks_receiver_derivs, + # Ainv[src.frequency], + # fields_array, + # addresses, + # ) for A in Ainv.values(): A.clean() @@ -270,16 +271,25 @@ def parallel_block_compute( @delayed -def receiver_derivs(source, receiver, mesh, fields, block): - if isinstance(source, PlanewaveXYPrimary): - v = np.eye(receiver.nD, dtype=float) - else: - v = sp.csr_matrix(np.ones(receiver.nD), dtype=float) +def receiver_derivs(survey, mesh, fields, blocks): + field_derivatives = [] + for address in blocks: + source = survey.source_list[address[0][0]] + receiver = source.receiver_list[address[0][1]] + + if isinstance(source, PlanewaveXYPrimary): + v = np.eye(receiver.nD, dtype=float) + else: + v = sp.csr_matrix(np.ones(receiver.nD), dtype=float) - # Assume the derivatives in terms of model are Zero (seems to always be case) - dfduT, _ = receiver.evalDeriv(source, mesh, fields, v=v[:, block], adjoint=True) + # Assume the derivatives in terms of model are Zero (seems to always be case) + dfduT, _ = receiver.evalDeriv( + source, mesh, fields, v=v[:, address[1][0]], adjoint=True + ) + field_derivatives.append(dfduT) - return dfduT.toarray() + field_derivatives = sp.hstack(field_derivatives) + return field_derivatives.toarray() @delayed diff --git a/SimPEG/dask/utils.py b/SimPEG/dask/utils.py index 7e211710d8..ad292dc9a2 100644 --- a/SimPEG/dask/utils.py +++ b/SimPEG/dask/utils.py @@ -40,7 +40,7 @@ def compute(self, job): return job.compute() -def get_parallel_blocks(source_list: list, data_block_size) -> list: +def get_parallel_blocks(source_list: list, data_block_size, optimize=True) -> list: """ Get the blocks of sources and receivers to be computed in parallel. @@ -83,7 +83,7 @@ def get_parallel_blocks(source_list: list, data_block_size) -> list: row_count += chunk_size # Re-split over cpu_count if too few blocks - if len(blocks) < cpu_count(): + if len(blocks) < cpu_count() and optimize: flatten_blocks = [] for block in blocks: flatten_blocks += block From c3913bd6897ef95bb906714ea047a3e3567123fa Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 21:07:36 -0700 Subject: [PATCH 19/35] Fix indexing --- .../frequency_domain/simulation.py | 78 ++++++++++++------- 1 file changed, 49 insertions(+), 29 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index bec94c0795..18b699508b 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -150,7 +150,7 @@ def compute_J(self, f=None, Ainv=None): for block in blocks: addresses = [] blocks_receiver_derivs = [] - chunks = np.array_split(block, np.floor(cpu_count() / 2)) + chunks = np.array_split(block, cpu_count()) for chunk in chunks: if len(chunk) == 0: @@ -170,7 +170,7 @@ def compute_J(self, f=None, Ainv=None): shape=shape, ) ) - addresses += chunk.tolist() + addresses.append(chunk.tolist()) Jmatrix = parallel_block_compute( self, Jmatrix, blocks_receiver_derivs, A_i, fields_array, addresses @@ -231,9 +231,9 @@ def parallel_block_compute( block_delayed = [] tc = time() print("Loop over addresses") - for address, dfduT in zip(addresses, blocks_receiver_derivs): + for block_addresses, dfduT in zip(addresses, blocks_receiver_derivs): n_cols = dfduT.shape[1] - src = self.survey.source_list[address[0][0]] + n_rows = np.sum([address[1][2] for address in block_addresses]) block_delayed.append( array.from_delayed( eval_block( @@ -242,15 +242,16 @@ def parallel_block_compute( np.arange(count, count + n_cols), Zero(), fields_array, - src, - address[0][0], + block_addresses + # src, + # address[0][0], ), dtype=np.float32, - shape=(len(address[1][1]), m_size), + shape=(n_rows, m_size), ) ) count += n_cols - rows.append(address[1][1]) + rows += [address[1][1] for address in block_addresses] print(f"Loop over addresses time: {time() - tc}") @@ -293,32 +294,51 @@ def receiver_derivs(survey, mesh, fields, blocks): @delayed -def eval_block( - simulation, Ainv_deriv_u, deriv_indices, deriv_m, fields, source, source_ind -): +def eval_block(simulation, Ainv_deriv_u, deriv_indices, deriv_m, fields, addresses): """ - Evaluate the sensitivities for the block or data and store to zarr + Evaluate the sensitivities for the block or data """ - if isinstance(source, PlanewaveXYPrimary): - source_fields = fields - else: - source_fields = fields[:, source_ind] - + count = 0 + rows = [] if Ainv_deriv_u.ndim == 1: deriv_columns = Ainv_deriv_u[:, np.newaxis] else: deriv_columns = Ainv_deriv_u[:, deriv_indices] - dA_dmT = simulation.getADeriv( - source.frequency, source_fields, deriv_columns, adjoint=True - ) - dRHS_dmT = simulation.getRHSDeriv( - source.frequency, source, deriv_columns, adjoint=True - ) - du_dmT = -dA_dmT - if not isinstance(dRHS_dmT, Zero): - du_dmT += dRHS_dmT - if not isinstance(deriv_m, Zero): - du_dmT += deriv_m + for address in addresses: + n_receivers = address[1][2] + source = simulation.survey.source_list[address[0][0]] + + if isinstance(source, PlanewaveXYPrimary): + source_fields = fields + n_cols = 2 + else: + source_fields = fields[:, address[0][0]] + n_cols = 1 + + n_cols *= n_receivers + + dA_dmT = simulation.getADeriv( + source.frequency, + source_fields, + deriv_columns[:, count : count + n_cols], + adjoint=True, + ) + dRHS_dmT = simulation.getRHSDeriv( + source.frequency, + source, + deriv_columns[:, count : count + n_cols], + adjoint=True, + ) + du_dmT = -dA_dmT + if not isinstance(dRHS_dmT, Zero): + du_dmT += dRHS_dmT + if not isinstance(deriv_m, Zero): + du_dmT += deriv_m + + rows.append( + np.array(du_dmT, dtype=complex).reshape((du_dmT.shape[0], -1)).real.T + ) + count += n_cols - return np.array(du_dmT, dtype=complex).reshape((du_dmT.shape[0], -1)).real.T + return np.vstack(rows) From cc8d60d4bc3ff3611d86f6ba9cb62b0a343df558 Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 21:48:46 -0700 Subject: [PATCH 20/35] Fix warning of ragged array --- .../frequency_domain/simulation.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 18b699508b..ff9aec8e43 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -150,13 +150,15 @@ def compute_J(self, f=None, Ainv=None): for block in blocks: addresses = [] blocks_receiver_derivs = [] - chunks = np.array_split(block, cpu_count()) + chunks = np.array_split(np.arange(len(block)), cpu_count()) for chunk in chunks: if len(chunk) == 0: continue - n_fields = np.sum([len(block[1][0]) for block in chunk]) + n_fields = np.sum( + [len(elem[1][0]) for elem in block[chunk[0] : chunk[0] + len(chunk)]] + ) shape = [A_i.A.shape[0], n_fields] @@ -165,12 +167,17 @@ def compute_J(self, f=None, Ainv=None): blocks_receiver_derivs.append( array.from_delayed( - receiver_derivs(self.survey, self.mesh, f, chunk), + receiver_derivs( + self.survey, + self.mesh, + f, + block[chunk[0] : chunk[0] + len(chunk)], + ), dtype=np.complex128, shape=shape, ) ) - addresses.append(chunk.tolist()) + addresses.append(block[chunk[0] : chunk[0] + len(chunk)]) Jmatrix = parallel_block_compute( self, Jmatrix, blocks_receiver_derivs, A_i, fields_array, addresses From 23a270d0b3d564924769bf19530fc7d9d72a44a5 Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 22:14:58 -0700 Subject: [PATCH 21/35] Add print --- SimPEG/dask/electromagnetics/frequency_domain/simulation.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index ff9aec8e43..705f27fd64 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -150,7 +150,8 @@ def compute_J(self, f=None, Ainv=None): for block in blocks: addresses = [] blocks_receiver_derivs = [] - chunks = np.array_split(np.arange(len(block)), cpu_count()) + print(f"Ncpu: {cpu_count()}") + chunks = np.array_split(np.arange(len(block)), int(cpu_count() / 2)) for chunk in chunks: if len(chunk) == 0: From 2cc036bd566691f7d3c545a48622051e70715bdc Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 22:18:37 -0700 Subject: [PATCH 22/35] More prints --- SimPEG/dask/electromagnetics/frequency_domain/simulation.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 705f27fd64..6f86eaca90 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -9,6 +9,7 @@ from SimPEG.electromagnetics.natural_source.sources import PlanewaveXYPrimary import zarr from time import time +from tqdm import tqdm Sim.sensitivity_path = "./sensitivity/" Sim.gtgdiag = None @@ -147,10 +148,10 @@ def compute_J(self, f=None, Ainv=None): count = 0 fields_array = delayed(f[:, self._solutionType]) - for block in blocks: + for block in tqdm(blocks): addresses = [] blocks_receiver_derivs = [] - print(f"Ncpu: {cpu_count()}") + print(f"Ncpu: {cpu_count()}. N data per chunk: {len(block[0][0][1])}") chunks = np.array_split(np.arange(len(block)), int(cpu_count() / 2)) for chunk in chunks: From 35c504be8db54f5418c3e904a1eb8dc6e6cf9de8 Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 29 Apr 2024 22:38:49 -0700 Subject: [PATCH 23/35] Delay fields, survey and mesh --- .../electromagnetics/frequency_domain/simulation.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 6f86eaca90..2f7f8ab1d5 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -147,11 +147,14 @@ def compute_J(self, f=None, Ainv=None): ) count = 0 fields_array = delayed(f[:, self._solutionType]) + fields = delayed(f) + survey = delayed(self.survey) + mesh = delayed(self.mesh) for block in tqdm(blocks): addresses = [] blocks_receiver_derivs = [] - print(f"Ncpu: {cpu_count()}. N data per chunk: {len(block[0][0][1])}") + print(f"Ncpu: {cpu_count()}. N data per chunk: {len(block[0][1][1])}") chunks = np.array_split(np.arange(len(block)), int(cpu_count() / 2)) for chunk in chunks: @@ -170,9 +173,9 @@ def compute_J(self, f=None, Ainv=None): blocks_receiver_derivs.append( array.from_delayed( receiver_derivs( - self.survey, - self.mesh, - f, + survey, + mesh, + fields, block[chunk[0] : chunk[0] + len(chunk)], ), dtype=np.complex128, From bc880b5724e79ae7e965312cf5ee812248b1bd14 Mon Sep 17 00:00:00 2001 From: domfournier Date: Tue, 30 Apr 2024 08:02:22 -0700 Subject: [PATCH 24/35] Block derivs as single process --- .../frequency_domain/simulation.py | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 2f7f8ab1d5..7174747612 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -4,6 +4,7 @@ import scipy.sparse as sp from multiprocessing import cpu_count from dask import array, compute, delayed +from dask.distributed import get_client from SimPEG.dask.simulation import dask_Jvec, dask_Jtvec, dask_getJtJdiag from SimPEG.dask.utils import get_parallel_blocks from SimPEG.electromagnetics.natural_source.sources import PlanewaveXYPrimary @@ -119,6 +120,7 @@ def fields(self, m=None, return_Ainv=False): def compute_J(self, f=None, Ainv=None): + # client = get_client() if f is None: f, Ainv = self.fields(self.model, return_Ainv=True) @@ -150,12 +152,14 @@ def compute_J(self, f=None, Ainv=None): fields = delayed(f) survey = delayed(self.survey) mesh = delayed(self.mesh) + addresses = [] + blocks_receiver_derivs = [] - for block in tqdm(blocks): - addresses = [] - blocks_receiver_derivs = [] + for block in blocks: print(f"Ncpu: {cpu_count()}. N data per chunk: {len(block[0][1][1])}") - chunks = np.array_split(np.arange(len(block)), int(cpu_count() / 2)) + chunks = np.array_split(np.arange(len(block)), cpu_count()) + addresses_chunks = [] + block_derivs_chunks = [] for chunk in chunks: if len(chunk) == 0: @@ -170,7 +174,7 @@ def compute_J(self, f=None, Ainv=None): if isinstance(self.survey.source_list[0], PlanewaveXYPrimary): shape[1] *= 2 - blocks_receiver_derivs.append( + block_derivs_chunks.append( array.from_delayed( receiver_derivs( survey, @@ -182,10 +186,21 @@ def compute_J(self, f=None, Ainv=None): shape=shape, ) ) - addresses.append(block[chunk[0] : chunk[0] + len(chunk)]) + addresses_chunks.append(block[chunk[0] : chunk[0] + len(chunk)]) + addresses.append(addresses_chunks) + blocks_receiver_derivs.append(block_derivs_chunks) + + tc = time() + print("Derivative blocks") + blocks_receiver_derivs = compute(blocks_receiver_derivs)[0] + print(f"Derivative blocks time: {time() - tc}") + + for block_derivs_chunks, addresses_chunks in tqdm( + zip(blocks_receiver_derivs, addresses), desc="Sensitivity rows" + ): Jmatrix = parallel_block_compute( - self, Jmatrix, blocks_receiver_derivs, A_i, fields_array, addresses + self, Jmatrix, block_derivs_chunks, A_i, fields_array, addresses_chunks ) # addresses = [] # blocks_receiver_derivs = [] @@ -217,9 +232,8 @@ def compute_J(self, f=None, Ainv=None): def parallel_block_compute( self, Jmatrix, blocks_receiver_derivs, A_i, fields_array, addresses ): - print("In parallel block") m_size = self.model.size - + # client = get_client() # tc = time() # print(f"Compute blocks_receiver_derivs {len(blocks_receiver_derivs)}") # eval = compute(blocks_receiver_derivs)[0] @@ -231,7 +245,7 @@ def parallel_block_compute( tc = time() print(f"Compute block stack {len(blocks_receiver_derivs)}") - block_stack = array.hstack(blocks_receiver_derivs).compute() + block_stack = sp.hstack(blocks_receiver_derivs).toarray() print(f"Compute block stack time: {time() - tc}") tc = time() @@ -277,7 +291,7 @@ def parallel_block_compute( else: tc = time() print("Compute Jmatrix") - Jmatrix[indices, :] = array.vstack(block_delayed).compute() + Jmatrix[indices, :] = compute(array.vstack(block_delayed))[0] print(f"Compute Jmatrix time: {time() - tc}") return Jmatrix @@ -301,8 +315,8 @@ def receiver_derivs(survey, mesh, fields, blocks): ) field_derivatives.append(dfduT) - field_derivatives = sp.hstack(field_derivatives) - return field_derivatives.toarray() + # field_derivatives = sp.hstack(field_derivatives) + return sp.hstack(field_derivatives) @delayed From 8314d89e89db7ef9b44a31d9d033c04afadca2b8 Mon Sep 17 00:00:00 2001 From: domfournier Date: Tue, 30 Apr 2024 10:25:41 -0700 Subject: [PATCH 25/35] Use array instead of fields in dpred --- .../electromagnetics/frequency_domain/simulation.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 7174747612..b126d83430 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -3,8 +3,8 @@ import numpy as np import scipy.sparse as sp from multiprocessing import cpu_count -from dask import array, compute, delayed -from dask.distributed import get_client +from dask import array, compute, delayed, config +from dask.distributed import get_client, Client from SimPEG.dask.simulation import dask_Jvec, dask_Jtvec, dask_getJtJdiag from SimPEG.dask.utils import get_parallel_blocks from SimPEG.electromagnetics.natural_source.sources import PlanewaveXYPrimary @@ -64,6 +64,8 @@ def dask_dpred(self, m=None, f=None, compute_J=False): receiver_blocks = np.array_split(all_receivers, cpu_count()) rows = [] + fields_array = delayed(f[:, "h"]) + mesh = delayed(self.mesh) for block in receiver_blocks: n_data = np.sum(rec.nD for _, _, rec in block) if n_data == 0: @@ -71,13 +73,13 @@ def dask_dpred(self, m=None, f=None, compute_J=False): rows.append( array.from_delayed( - evaluate_receivers(block, self.mesh, f), + evaluate_receivers(block, mesh, fields_array), dtype=np.float64, shape=(n_data,), ) ) - data = array.hstack(rows).compute() + data = compute(array.hstack(rows))[0] if compute_J and self._Jmatrix is None: Jmatrix = self.compute_J(f=f, Ainv=Ainv) @@ -156,7 +158,7 @@ def compute_J(self, f=None, Ainv=None): blocks_receiver_derivs = [] for block in blocks: - print(f"Ncpu: {cpu_count()}. N data per chunk: {len(block[0][1][1])}") + # print(f"Ncpu: {cpu_count()}. N data per chunk: {len(block[0][1][1])}") chunks = np.array_split(np.arange(len(block)), cpu_count()) addresses_chunks = [] block_derivs_chunks = [] From a959a5f76e9ba83958cc18b2d3eccf0d16f898a4 Mon Sep 17 00:00:00 2001 From: domfournier Date: Tue, 30 Apr 2024 10:58:08 -0700 Subject: [PATCH 26/35] Paralleize RHD creation for FEM. --- .../frequency_domain/simulation.py | 55 ++++++++++++++++++- .../time_domain/simulation.py | 24 ++++---- 2 files changed, 64 insertions(+), 15 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index b126d83430..89e4925783 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -22,6 +22,56 @@ Sim.clean_on_model_update = ["_Jmatrix", "_jtjdiag"] +@delayed +def source_evaluation(simulation, sources): + s_m, s_e = [], [] + for source in sources: + sm, se = source.eval(simulation) + s_m.append(sm) + s_e.append(se) + + return s_m, s_e + + +def dask_getSourceTerm(self, freq): + """ + Assemble the source term. This ensures that the RHS is a vector / array + of the correct size + """ + source_list = self.survey.get_sources_by_frequency(freq) + source_block = np.array_split(source_list, cpu_count()) + + block_compute = [] + for block in source_block: + if len(block) == 0: + continue + + block_compute.append(source_evaluation(self, block)) + + eval = compute(block_compute)[0] + + s_m, s_e = [], [] + for block in eval: + if block[0]: + s_m.append(block[0]) + s_e.append(block[1]) + + if isinstance(s_m[0][0], Zero): + s_m = Zero() + else: + s_m = np.vstack(s_m).T + + if isinstance(s_e[0][0], Zero): + s_e = Zero() + else: + s_e = np.vstack(s_e).T + + return s_m, s_e + + +Sim.getSourceTerm = dask_getSourceTerm + + @delayed def evaluate_receivers(block, mesh, fields): data = [] @@ -64,7 +114,6 @@ def dask_dpred(self, m=None, f=None, compute_J=False): receiver_blocks = np.array_split(all_receivers, cpu_count()) rows = [] - fields_array = delayed(f[:, "h"]) mesh = delayed(self.mesh) for block in receiver_blocks: n_data = np.sum(rec.nD for _, _, rec in block) @@ -73,7 +122,7 @@ def dask_dpred(self, m=None, f=None, compute_J=False): rows.append( array.from_delayed( - evaluate_receivers(block, mesh, fields_array), + evaluate_receivers(block, mesh, f), dtype=np.float64, shape=(n_data,), ) @@ -97,7 +146,7 @@ def fields(self, m=None, return_Ainv=False): self.model = m f = self.fieldsPair(self) - + print("Computing fields") Ainv = {} for freq in self.survey.frequencies: A = self.getA(freq) diff --git a/SimPEG/dask/electromagnetics/time_domain/simulation.py b/SimPEG/dask/electromagnetics/time_domain/simulation.py index aefff85f7f..bcca19adbe 100644 --- a/SimPEG/dask/electromagnetics/time_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/time_domain/simulation.py @@ -135,6 +135,17 @@ def fields(self, m=None, return_Ainv=False): Sim.fields = fields +@delayed +def source_evaluation(simulation, sources, time): + s_m, s_e = [], [] + for source in sources: + sm, se = source.eval(simulation, time) + s_m.append(sm) + s_e.append(se) + + return s_m, s_e + + def dask_getSourceTerm(self, tInd): """ Assemble the source term. This ensures that the RHS is a vector / array @@ -143,20 +154,9 @@ def dask_getSourceTerm(self, tInd): source_list = self.survey.source_list source_block = np.array_split(source_list, cpu_count()) - def source_evaluation(simulation, sources, time): - s_m, s_e = [], [] - for source in sources: - sm, se = source.eval(simulation, time) - s_m.append(sm) - s_e.append(se) - - return s_m, s_e - block_compute = [] for block in source_block: - block_compute.append( - delayed(source_evaluation, pure=True)(self, block, self.times[tInd]) - ) + block_compute.append(source_evaluation(self, block, self.times[tInd])) eval = dask.compute(block_compute)[0] From 938e4ffb067237334f6e154b639897935576ac61 Mon Sep 17 00:00:00 2001 From: domfournier Date: Tue, 30 Apr 2024 11:10:10 -0700 Subject: [PATCH 27/35] Add prints with some cleanups --- .../frequency_domain/simulation.py | 24 +------------------ 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 89e4925783..e2f8c62d45 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -253,19 +253,6 @@ def compute_J(self, f=None, Ainv=None): Jmatrix = parallel_block_compute( self, Jmatrix, block_derivs_chunks, A_i, fields_array, addresses_chunks ) - # addresses = [] - # blocks_receiver_derivs = [] - # count = 0 - # - # if blocks_receiver_derivs: - # Jmatrix = parallel_block_compute( - # self, - # Jmatrix, - # blocks_receiver_derivs, - # Ainv[src.frequency], - # fields_array, - # addresses, - # ) for A in Ainv.values(): A.clean() @@ -284,20 +271,11 @@ def parallel_block_compute( self, Jmatrix, blocks_receiver_derivs, A_i, fields_array, addresses ): m_size = self.model.size - # client = get_client() - # tc = time() - # print(f"Compute blocks_receiver_derivs {len(blocks_receiver_derivs)}") - # eval = compute(blocks_receiver_derivs)[0] - # print(f"Compute blocks_receiver_derivs time: {time() - tc}") - # blocks_dfduT, blocks_dfdmT = [], [] - # for dfduT, dfdmT in eval: - # blocks_dfduT.append(dfduT) - # blocks_dfdmT.append(dfdmT) tc = time() print(f"Compute block stack {len(blocks_receiver_derivs)}") block_stack = sp.hstack(blocks_receiver_derivs).toarray() - print(f"Compute block stack time: {time() - tc}") + print(f"Compute block stack time: {time() - tc}: shape{block_stack.shape}") tc = time() print(f"Compute direct solver") From 77592c167b4a6d5bc752207e111156d7eb756d01 Mon Sep 17 00:00:00 2001 From: domfournier Date: Tue, 30 Apr 2024 13:51:54 -0700 Subject: [PATCH 28/35] Fix dimension issues --- .../frequency_domain/simulation.py | 45 +++++++------------ 1 file changed, 15 insertions(+), 30 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index e2f8c62d45..25b12688d4 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -9,7 +9,6 @@ from SimPEG.dask.utils import get_parallel_blocks from SimPEG.electromagnetics.natural_source.sources import PlanewaveXYPrimary import zarr -from time import time from tqdm import tqdm Sim.sensitivity_path = "./sensitivity/" @@ -53,19 +52,22 @@ def dask_getSourceTerm(self, freq): s_m, s_e = [], [] for block in eval: if block[0]: - s_m.append(block[0]) - s_e.append(block[1]) + s_m += block[0] + s_e += block[1] if isinstance(s_m[0][0], Zero): s_m = Zero() else: - s_m = np.vstack(s_m).T + s_m = np.vstack(s_m) + if s_m.shape[0] < s_m.shape[1]: + s_m = s_m.T if isinstance(s_e[0][0], Zero): s_e = Zero() else: - s_e = np.vstack(s_e).T - + s_e = np.vstack(s_e) + if s_e.shape[0] < s_e.shape[1]: + s_e = s_e.T return s_m, s_e @@ -146,7 +148,6 @@ def fields(self, m=None, return_Ainv=False): self.model = m f = self.fieldsPair(self) - print("Computing fields") Ainv = {} for freq in self.survey.frequencies: A = self.getA(freq) @@ -189,7 +190,7 @@ def compute_J(self, f=None, Ainv=None): self.sensitivity_path, mode="w", shape=(self.survey.nD, m_size), - chunks=(row_chunks, m_size), + chunks=(self.max_chunk_size, m_size), ) else: Jmatrix = np.zeros((self.survey.nD, m_size), dtype=np.float32) @@ -207,7 +208,6 @@ def compute_J(self, f=None, Ainv=None): blocks_receiver_derivs = [] for block in blocks: - # print(f"Ncpu: {cpu_count()}. N data per chunk: {len(block[0][1][1])}") chunks = np.array_split(np.arange(len(block)), cpu_count()) addresses_chunks = [] block_derivs_chunks = [] @@ -242,10 +242,8 @@ def compute_J(self, f=None, Ainv=None): addresses.append(addresses_chunks) blocks_receiver_derivs.append(block_derivs_chunks) - tc = time() - print("Derivative blocks") + # Dask process for all derivatives blocks_receiver_derivs = compute(blocks_receiver_derivs)[0] - print(f"Derivative blocks time: {time() - tc}") for block_derivs_chunks, addresses_chunks in tqdm( zip(blocks_receiver_derivs, addresses), desc="Sensitivity rows" @@ -259,7 +257,7 @@ def compute_J(self, f=None, Ainv=None): if self.store_sensitivities == "disk": del Jmatrix - return array.from_zarr(self.sensitivity_path + f"J.zarr") + return array.from_zarr(self.sensitivity_path) else: return Jmatrix @@ -271,21 +269,12 @@ def parallel_block_compute( self, Jmatrix, blocks_receiver_derivs, A_i, fields_array, addresses ): m_size = self.model.size - - tc = time() - print(f"Compute block stack {len(blocks_receiver_derivs)}") block_stack = sp.hstack(blocks_receiver_derivs).toarray() - print(f"Compute block stack time: {time() - tc}: shape{block_stack.shape}") - - tc = time() - print(f"Compute direct solver") ATinvdf_duT = delayed(A_i * block_stack) - print(f"Compute direct solver time: {time() - tc}") count = 0 rows = [] block_delayed = [] - tc = time() - print("Loop over addresses") + for block_addresses, dfduT in zip(addresses, blocks_receiver_derivs): n_cols = dfduT.shape[1] n_rows = np.sum([address[1][2] for address in block_addresses]) @@ -308,20 +297,16 @@ def parallel_block_compute( count += n_cols rows += [address[1][1] for address in block_addresses] - print(f"Loop over addresses time: {time() - tc}") - indices = np.hstack(rows) if self.store_sensitivities == "disk": Jmatrix.set_orthogonal_selection( - (np.r_[rows], slice(None)), - array.vstack(block_delayed).compute(), + (indices, slice(None)), + compute(array.vstack(block_delayed))[0], ) else: - tc = time() - print("Compute Jmatrix") + # Dask process to compute row and store Jmatrix[indices, :] = compute(array.vstack(block_delayed))[0] - print(f"Compute Jmatrix time: {time() - tc}") return Jmatrix From 0bb7053e152210446e9f8061edd85fb09edc50f4 Mon Sep 17 00:00:00 2001 From: domfournier Date: Tue, 30 Apr 2024 15:59:22 -0700 Subject: [PATCH 29/35] FIx issue with compute inside delayed function --- .../frequency_domain/simulation.py | 36 ++++++++++--------- .../frequency_domain/simulation.py | 12 ++++--- .../natural_source/receivers.py | 4 +-- 3 files changed, 30 insertions(+), 22 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 25b12688d4..5731b75945 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -32,37 +32,41 @@ def source_evaluation(simulation, sources): return s_m, s_e -def dask_getSourceTerm(self, freq): +def dask_getSourceTerm(self, freq, source=None): """ Assemble the source term. This ensures that the RHS is a vector / array of the correct size """ - source_list = self.survey.get_sources_by_frequency(freq) - source_block = np.array_split(source_list, cpu_count()) + if source is None: + source_list = self.survey.get_sources_by_frequency(freq) + source_block = np.array_split(source_list, cpu_count()) - block_compute = [] - for block in source_block: - if len(block) == 0: - continue + block_compute = [] + for block in source_block: + if len(block) == 0: + continue - block_compute.append(source_evaluation(self, block)) + block_compute.append(source_evaluation(self, block)) - eval = compute(block_compute)[0] + eval = compute(block_compute)[0] + s_m, s_e = [], [] + for block in eval: + if block[0]: + s_m += block[0] + s_e += block[1] - s_m, s_e = [], [] - for block in eval: - if block[0]: - s_m += block[0] - s_e += block[1] + else: + sm, se = source.eval(self) + s_m, s_e = [sm], [se] - if isinstance(s_m[0][0], Zero): + if isinstance(s_m[0][0], Zero): # Assume the rest is all Zero s_m = Zero() else: s_m = np.vstack(s_m) if s_m.shape[0] < s_m.shape[1]: s_m = s_m.T - if isinstance(s_e[0][0], Zero): + if isinstance(s_e[0][0], Zero): # Assume the rest is all Zero s_e = Zero() else: s_e = np.vstack(s_e) diff --git a/SimPEG/electromagnetics/frequency_domain/simulation.py b/SimPEG/electromagnetics/frequency_domain/simulation.py index 9c1372b80f..4a13430f6a 100644 --- a/SimPEG/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/electromagnetics/frequency_domain/simulation.py @@ -200,7 +200,7 @@ def Jtvec(self, m, v, f=None): return mkvc(Jtv) # @profile - def getSourceTerm(self, freq): + def getSourceTerm(self, freq, source=None): """ Evaluates the sources for a given frequency and puts them in matrix form @@ -209,7 +209,11 @@ def getSourceTerm(self, freq): :rtype: tuple :return: (s_m, s_e) (nE or nF, nSrc) """ - Srcs = self.survey.get_sources_by_frequency(freq) + if source is not None: + Srcs = [source] + else: + Srcs = self.survey.get_sources_by_frequency(freq) + n_fields = sum(src._fields_per_source for src in Srcs) if self._formulation == "EB": s_m = np.zeros((self.mesh.nF, n_fields), dtype=complex, order="F") @@ -362,7 +366,7 @@ def getRHSDeriv(self, freq, src, v, adjoint=False): C = self.mesh.edge_curl MfMui = self.MfMui - s_m, s_e = self.getSourceTerm(freq) + s_m, s_e = self.getSourceTerm(freq, source=src) s_mDeriv, s_eDeriv = src.evalDeriv(self, adjoint=adjoint) MfMuiDeriv = self.MfMuiDeriv(s_m) @@ -716,7 +720,7 @@ def getRHSDeriv(self, freq, src, v, adjoint=False): MeMuI = self.MeMuI MeMuIDeriv = self.MeMuIDeriv s_mDeriv, s_eDeriv = src.evalDeriv(self, adjoint=adjoint) - s_m, _ = self.getSourceTerm(freq) + s_m, _ = self.getSourceTerm(freq, source=src) if adjoint: if self._makeASymmetric: diff --git a/SimPEG/electromagnetics/natural_source/receivers.py b/SimPEG/electromagnetics/natural_source/receivers.py index 7ddb73b212..b9e822e4fc 100644 --- a/SimPEG/electromagnetics/natural_source/receivers.py +++ b/SimPEG/electromagnetics/natural_source/receivers.py @@ -315,8 +315,8 @@ def _eval_impedance_deriv(self, src, mesh, f, du_dm_v=None, v=None, adjoint=Fals else: ghx_v -= gh_v - gh_v = Phx.T @ ghx_v + Phy.T @ ghy_v - ge_v = Pe.T @ ge_v + gh_v = Phx.T @ sp.csr_matrix(ghx_v) + Phy.T @ sp.csr_matrix(ghy_v) + ge_v = Pe.T @ sp.csr_matrix(ge_v) else: if mesh.dim == 1 and self.orientation != f.field_directions: gbot_v = -gbot_v From 1aec17aae250af0c4ca000e405058a42922ceb75 Mon Sep 17 00:00:00 2001 From: domfournier Date: Tue, 30 Apr 2024 16:15:32 -0700 Subject: [PATCH 30/35] Add dask report for testing in J calcs --- .../dask/electromagnetics/frequency_domain/simulation.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 5731b75945..ac7976c201 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -4,7 +4,7 @@ import scipy.sparse as sp from multiprocessing import cpu_count from dask import array, compute, delayed, config -from dask.distributed import get_client, Client +from dask.distributed import get_client, Client, performance_report from SimPEG.dask.simulation import dask_Jvec, dask_Jtvec, dask_getJtJdiag from SimPEG.dask.utils import get_parallel_blocks from SimPEG.electromagnetics.natural_source.sources import PlanewaveXYPrimary @@ -246,8 +246,10 @@ def compute_J(self, f=None, Ainv=None): addresses.append(addresses_chunks) blocks_receiver_derivs.append(block_derivs_chunks) - # Dask process for all derivatives - blocks_receiver_derivs = compute(blocks_receiver_derivs)[0] + with Client(processes=False) as client: + with performance_report(filename="dask-report.html"): + # Dask process for all derivatives + blocks_receiver_derivs = compute(blocks_receiver_derivs)[0] for block_derivs_chunks, addresses_chunks in tqdm( zip(blocks_receiver_derivs, addresses), desc="Sensitivity rows" From 9f4015476372953538be05873028c5db41654cbb Mon Sep 17 00:00:00 2001 From: domfournier Date: Tue, 30 Apr 2024 16:22:52 -0700 Subject: [PATCH 31/35] Comment out dask profiler --- .../dask/electromagnetics/frequency_domain/simulation.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index ac7976c201..83d553cf18 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -246,10 +246,11 @@ def compute_J(self, f=None, Ainv=None): addresses.append(addresses_chunks) blocks_receiver_derivs.append(block_derivs_chunks) - with Client(processes=False) as client: - with performance_report(filename="dask-report.html"): - # Dask process for all derivatives - blocks_receiver_derivs = compute(blocks_receiver_derivs)[0] + # with Client(processes=False) as client: + # with performance_report(filename="dask-report.html"): + + # Dask process for all derivatives + blocks_receiver_derivs = compute(blocks_receiver_derivs)[0] for block_derivs_chunks, addresses_chunks in tqdm( zip(blocks_receiver_derivs, addresses), desc="Sensitivity rows" From 84529e915564855771820cb8a14586b259415bd4 Mon Sep 17 00:00:00 2001 From: domfournier Date: Tue, 30 Apr 2024 20:23:18 -0700 Subject: [PATCH 32/35] Try flatter implementation --- .../frequency_domain/simulation.py | 158 +++++++++--------- 1 file changed, 79 insertions(+), 79 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 83d553cf18..86c4831ffd 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -212,48 +212,48 @@ def compute_J(self, f=None, Ainv=None): blocks_receiver_derivs = [] for block in blocks: - chunks = np.array_split(np.arange(len(block)), cpu_count()) - addresses_chunks = [] - block_derivs_chunks = [] - - for chunk in chunks: - if len(chunk) == 0: - continue - - n_fields = np.sum( - [len(elem[1][0]) for elem in block[chunk[0] : chunk[0] + len(chunk)]] + # chunks = np.array_split(np.arange(len(block)), cpu_count()) + # addresses_chunks = [] + # block_derivs_chunks = [] + + # for chunk in chunks: + # if len(chunk) == 0: + # continue + # + # n_fields = np.sum( + # [len(elem[1][0]) for elem in block[chunk[0] : chunk[0] + len(chunk)]] + # ) + # + # shape = [A_i.A.shape[0], n_fields] + # + # if isinstance(self.survey.source_list[0], PlanewaveXYPrimary): + # shape[1] *= 2 + + blocks_receiver_derivs.append( + # array.from_delayed( + receiver_derivs( + survey, + mesh, + fields, + block, ) + # ), + # dtype=np.complex128, + # shape=shape, + # ) + ) + # addresses_chunks.append(block[chunk[0] : chunk[0] + len(chunk)]) - shape = [A_i.A.shape[0], n_fields] - - if isinstance(self.survey.source_list[0], PlanewaveXYPrimary): - shape[1] *= 2 - - block_derivs_chunks.append( - array.from_delayed( - receiver_derivs( - survey, - mesh, - fields, - block[chunk[0] : chunk[0] + len(chunk)], - ), - dtype=np.complex128, - shape=shape, - ) - ) - addresses_chunks.append(block[chunk[0] : chunk[0] + len(chunk)]) - - addresses.append(addresses_chunks) - blocks_receiver_derivs.append(block_derivs_chunks) - - # with Client(processes=False) as client: - # with performance_report(filename="dask-report.html"): + # addresses.append(addresses_chunks) + # blocks_receiver_derivs.append(block_derivs_chunks) - # Dask process for all derivatives - blocks_receiver_derivs = compute(blocks_receiver_derivs)[0] + with Client(processes=False) as client: + with performance_report(filename="dask-report.html"): + # Dask process for all derivatives + blocks_receiver_derivs = compute(blocks_receiver_derivs)[0] for block_derivs_chunks, addresses_chunks in tqdm( - zip(blocks_receiver_derivs, addresses), desc="Sensitivity rows" + zip(blocks_receiver_derivs, blocks), desc="Sensitivity rows" ): Jmatrix = parallel_block_compute( self, Jmatrix, block_derivs_chunks, A_i, fields_array, addresses_chunks @@ -282,9 +282,9 @@ def parallel_block_compute( rows = [] block_delayed = [] - for block_addresses, dfduT in zip(addresses, blocks_receiver_derivs): + for address, dfduT in zip(addresses, blocks_receiver_derivs): n_cols = dfduT.shape[1] - n_rows = np.sum([address[1][2] for address in block_addresses]) + n_rows = address[1][2] block_delayed.append( array.from_delayed( eval_block( @@ -293,7 +293,7 @@ def parallel_block_compute( np.arange(count, count + n_cols), Zero(), fields_array, - block_addresses + address # src, # address[0][0], ), @@ -302,7 +302,7 @@ def parallel_block_compute( ) ) count += n_cols - rows += [address[1][1] for address in block_addresses] + rows += address[1][1].tolist() indices = np.hstack(rows) @@ -337,55 +337,55 @@ def receiver_derivs(survey, mesh, fields, blocks): field_derivatives.append(dfduT) # field_derivatives = sp.hstack(field_derivatives) - return sp.hstack(field_derivatives) + return field_derivatives @delayed -def eval_block(simulation, Ainv_deriv_u, deriv_indices, deriv_m, fields, addresses): +def eval_block(simulation, Ainv_deriv_u, deriv_indices, deriv_m, fields, address): """ Evaluate the sensitivities for the block or data """ - count = 0 - rows = [] + # count = 0 + # rows = [] if Ainv_deriv_u.ndim == 1: deriv_columns = Ainv_deriv_u[:, np.newaxis] else: deriv_columns = Ainv_deriv_u[:, deriv_indices] - for address in addresses: - n_receivers = address[1][2] - source = simulation.survey.source_list[address[0][0]] + # for address in addresses: + n_receivers = address[1][2] + source = simulation.survey.source_list[address[0][0]] - if isinstance(source, PlanewaveXYPrimary): - source_fields = fields - n_cols = 2 - else: - source_fields = fields[:, address[0][0]] - n_cols = 1 - - n_cols *= n_receivers - - dA_dmT = simulation.getADeriv( - source.frequency, - source_fields, - deriv_columns[:, count : count + n_cols], - adjoint=True, - ) - dRHS_dmT = simulation.getRHSDeriv( - source.frequency, - source, - deriv_columns[:, count : count + n_cols], - adjoint=True, - ) - du_dmT = -dA_dmT - if not isinstance(dRHS_dmT, Zero): - du_dmT += dRHS_dmT - if not isinstance(deriv_m, Zero): - du_dmT += deriv_m + if isinstance(source, PlanewaveXYPrimary): + source_fields = fields + n_cols = 2 + else: + source_fields = fields[:, address[0][0]] + n_cols = 1 - rows.append( - np.array(du_dmT, dtype=complex).reshape((du_dmT.shape[0], -1)).real.T - ) - count += n_cols + n_cols *= n_receivers - return np.vstack(rows) + dA_dmT = simulation.getADeriv( + source.frequency, + source_fields, + deriv_columns, + adjoint=True, + ) + dRHS_dmT = simulation.getRHSDeriv( + source.frequency, + source, + deriv_columns, + adjoint=True, + ) + du_dmT = -dA_dmT + if not isinstance(dRHS_dmT, Zero): + du_dmT += dRHS_dmT + if not isinstance(deriv_m, Zero): + du_dmT += deriv_m + + # rows.append( + # np.array(du_dmT, dtype=complex).reshape((du_dmT.shape[0], -1)).real.T + # ) + # count += n_cols + + return np.array(du_dmT, dtype=complex).reshape((du_dmT.shape[0], -1)).real.T From 0a5919185cf1be7449d421d161e3900d9d13a67b Mon Sep 17 00:00:00 2001 From: domfournier Date: Tue, 30 Apr 2024 22:00:34 -0700 Subject: [PATCH 33/35] Cleanups --- .../frequency_domain/simulation.py | 48 +++---------------- 1 file changed, 6 insertions(+), 42 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 86c4831ffd..4947df591b 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -212,48 +212,23 @@ def compute_J(self, f=None, Ainv=None): blocks_receiver_derivs = [] for block in blocks: - # chunks = np.array_split(np.arange(len(block)), cpu_count()) - # addresses_chunks = [] - # block_derivs_chunks = [] - - # for chunk in chunks: - # if len(chunk) == 0: - # continue - # - # n_fields = np.sum( - # [len(elem[1][0]) for elem in block[chunk[0] : chunk[0] + len(chunk)]] - # ) - # - # shape = [A_i.A.shape[0], n_fields] - # - # if isinstance(self.survey.source_list[0], PlanewaveXYPrimary): - # shape[1] *= 2 - blocks_receiver_derivs.append( - # array.from_delayed( receiver_derivs( survey, mesh, fields, block, ) - # ), - # dtype=np.complex128, - # shape=shape, - # ) ) - # addresses_chunks.append(block[chunk[0] : chunk[0] + len(chunk)]) - # addresses.append(addresses_chunks) - # blocks_receiver_derivs.append(block_derivs_chunks) + # with Client(processes=False) as client: + # with performance_report(filename="dask-report.html"): - with Client(processes=False) as client: - with performance_report(filename="dask-report.html"): - # Dask process for all derivatives - blocks_receiver_derivs = compute(blocks_receiver_derivs)[0] + # Dask process for all derivatives + blocks_receiver_derivs = compute(blocks_receiver_derivs)[0] for block_derivs_chunks, addresses_chunks in tqdm( - zip(blocks_receiver_derivs, blocks), desc="Sensitivity rows" + zip(blocks_receiver_derivs, blocks), desc=f"Sensitivities at {list(Ainv)} Hz" ): Jmatrix = parallel_block_compute( self, Jmatrix, block_derivs_chunks, A_i, fields_array, addresses_chunks @@ -293,9 +268,7 @@ def parallel_block_compute( np.arange(count, count + n_cols), Zero(), fields_array, - address - # src, - # address[0][0], + address, ), dtype=np.float32, shape=(n_rows, m_size), @@ -336,7 +309,6 @@ def receiver_derivs(survey, mesh, fields, blocks): ) field_derivatives.append(dfduT) - # field_derivatives = sp.hstack(field_derivatives) return field_derivatives @@ -345,14 +317,11 @@ def eval_block(simulation, Ainv_deriv_u, deriv_indices, deriv_m, fields, address """ Evaluate the sensitivities for the block or data """ - # count = 0 - # rows = [] if Ainv_deriv_u.ndim == 1: deriv_columns = Ainv_deriv_u[:, np.newaxis] else: deriv_columns = Ainv_deriv_u[:, deriv_indices] - # for address in addresses: n_receivers = address[1][2] source = simulation.survey.source_list[address[0][0]] @@ -383,9 +352,4 @@ def eval_block(simulation, Ainv_deriv_u, deriv_indices, deriv_m, fields, address if not isinstance(deriv_m, Zero): du_dmT += deriv_m - # rows.append( - # np.array(du_dmT, dtype=complex).reshape((du_dmT.shape[0], -1)).real.T - # ) - # count += n_cols - return np.array(du_dmT, dtype=complex).reshape((du_dmT.shape[0], -1)).real.T From 67ad2b30914f9bed1902d925bc6d0ddd9cfd77b1 Mon Sep 17 00:00:00 2001 From: domfournier Date: Tue, 30 Apr 2024 22:16:24 -0700 Subject: [PATCH 34/35] Improve progress bar --- SimPEG/dask/electromagnetics/frequency_domain/simulation.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 4947df591b..50ef866fa9 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -228,7 +228,9 @@ def compute_J(self, f=None, Ainv=None): blocks_receiver_derivs = compute(blocks_receiver_derivs)[0] for block_derivs_chunks, addresses_chunks in tqdm( - zip(blocks_receiver_derivs, blocks), desc=f"Sensitivities at {list(Ainv)} Hz" + zip(blocks_receiver_derivs, blocks), + ncols=len(blocks_receiver_derivs), + desc=f"Sensitivities at {list(Ainv)[0]} Hz", ): Jmatrix = parallel_block_compute( self, Jmatrix, block_derivs_chunks, A_i, fields_array, addresses_chunks From 43f83942837b4092f72fd9fecd5ab353f8b7f654 Mon Sep 17 00:00:00 2001 From: domfournier Date: Wed, 1 May 2024 14:29:51 -0700 Subject: [PATCH 35/35] Clean ups --- SimPEG/dask/electromagnetics/frequency_domain/simulation.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 50ef866fa9..a93884556d 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -176,7 +176,6 @@ def fields(self, m=None, return_Ainv=False): def compute_J(self, f=None, Ainv=None): - # client = get_client() if f is None: f, Ainv = self.fields(self.model, return_Ainv=True) @@ -203,12 +202,10 @@ def compute_J(self, f=None, Ainv=None): blocks = get_parallel_blocks( self.survey.source_list, compute_row_size, optimize=False ) - count = 0 fields_array = delayed(f[:, self._solutionType]) fields = delayed(f) survey = delayed(self.survey) mesh = delayed(self.mesh) - addresses = [] blocks_receiver_derivs = [] for block in blocks: