From 21fb2db0024f63cd961a138ded3874ccd1d57af5 Mon Sep 17 00:00:00 2001 From: benjamink Date: Wed, 16 Nov 2022 15:17:24 -0800 Subject: [PATCH 01/25] Fix pole-pole behaviour --- SimPEG/dask/electromagnetics/static/resistivity/receivers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SimPEG/dask/electromagnetics/static/resistivity/receivers.py b/SimPEG/dask/electromagnetics/static/resistivity/receivers.py index 8e4c04f68e..dc02272428 100644 --- a/SimPEG/dask/electromagnetics/static/resistivity/receivers.py +++ b/SimPEG/dask/electromagnetics/static/resistivity/receivers.py @@ -43,4 +43,4 @@ def dask_Pole_getP(self, mesh, Gloc): return P -Pole.getP = dask_Dipole_getP \ No newline at end of file +Pole.getP = dask_Pole_getP \ No newline at end of file From 31d4c3fe25da0ca573921e140a4e373bc2b77ca4 Mon Sep 17 00:00:00 2001 From: fourndo Date: Wed, 30 Nov 2022 12:41:23 -0800 Subject: [PATCH 02/25] Better handling of alphas in stashed reg operators --- SimPEG/dask/inverse_problem.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/SimPEG/dask/inverse_problem.py b/SimPEG/dask/inverse_problem.py index 41b8e30d90..8bc2f2a6c6 100644 --- a/SimPEG/dask/inverse_problem.py +++ b/SimPEG/dask/inverse_problem.py @@ -9,7 +9,7 @@ import gc from ..regularization import BaseComboRegularization, Sparse from ..data_misfit import BaseDataMisfit -from ..objective_function import BaseObjectiveFunction +from ..objective_function import BaseObjectiveFunction, ComboObjectiveFunction def dask_getFields(self, m, store=False, deleteWarmstart=True): @@ -190,7 +190,17 @@ def dask_evalFunction(self, m, return_g=True, return_H=True): phi_d = np.asarray(phi_d) # print(self.dpred[0]) - self.reg2Deriv = [obj.deriv2(m) for obj in self.reg.objfcts] + + + if isinstance(self.reg, ComboObjectiveFunction) and not isinstance( + self.reg, BaseComboRegularization + ): + self.reg2Deriv = [] + for fct in self.reg.objfcts: + self.reg2Deriv += [multi * obj.deriv2(m) for multi, obj in fct] + else: + self.reg2Deriv = [multi * obj.deriv2(m) for multi, obj in self.reg] + # reg = np.linalg.norm(self.reg2Deriv * self.reg._delta_m(m)) phi_m = self.reg(m) From e6493f13f4f9057a94d4d72016ee6b9c84259ccf Mon Sep 17 00:00:00 2001 From: fourndo Date: Wed, 30 Nov 2022 15:41:05 -0800 Subject: [PATCH 03/25] Simplify computation or reg2Deriv --- SimPEG/dask/inverse_problem.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/SimPEG/dask/inverse_problem.py b/SimPEG/dask/inverse_problem.py index 8bc2f2a6c6..abfd922cce 100644 --- a/SimPEG/dask/inverse_problem.py +++ b/SimPEG/dask/inverse_problem.py @@ -195,11 +195,13 @@ def dask_evalFunction(self, m, return_g=True, return_H=True): if isinstance(self.reg, ComboObjectiveFunction) and not isinstance( self.reg, BaseComboRegularization ): - self.reg2Deriv = [] + reg2Deriv = [] for fct in self.reg.objfcts: - self.reg2Deriv += [multi * obj.deriv2(m) for multi, obj in fct] + reg2Deriv += [multi * obj.deriv2(m) for multi, obj in fct] else: - self.reg2Deriv = [multi * obj.deriv2(m) for multi, obj in self.reg] + reg2Deriv = [multi * obj.deriv2(m) for multi, obj in self.reg] + + self.reg2Deriv = np.sum(reg2Deriv) # reg = np.linalg.norm(self.reg2Deriv * self.reg._delta_m(m)) phi_m = self.reg(m) @@ -254,7 +256,7 @@ def dask_evalFunction(self, m, return_g=True, return_H=True): def H_fun(v): phi_d2Deriv = self.dmisfit.deriv2(m, v) - phi_m2Deriv = np.sum([reg2Deriv * v for reg2Deriv in self.reg2Deriv], axis=0) + phi_m2Deriv = self.reg2Deriv * v H = phi_d2Deriv + self.beta * phi_m2Deriv From 5b2531cf5d5f2eda765135331cf46ec58853b0cc Mon Sep 17 00:00:00 2001 From: fourndo Date: Thu, 1 Dec 2022 15:09:27 -0800 Subject: [PATCH 04/25] Parallelize loop over receiver evals --- .../frequency_domain/simulation.py | 83 ++++++++++++------- SimPEG/dask/simulation.py | 20 +++-- 2 files changed, 68 insertions(+), 35 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 558eed56ba..ad48dc2c13 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -2,7 +2,8 @@ from ....utils import Zero, mkvc import numpy as np import scipy.sparse as sp -import dask.array as da +import multiprocessing +from dask import array, compute, delayed from dask.distributed import Future import zarr from time import time @@ -56,9 +57,9 @@ def dask_getJtJdiag(self, m, W=None): else: W = W.diagonal() - diag = da.einsum('i,ij,ij->j', W, self.Jmatrix, self.Jmatrix) + diag = array.einsum('i,ij,ij->j', W, self.Jmatrix, self.Jmatrix) - if isinstance(diag, da.Array): + if isinstance(diag, array.Array): diag = np.asarray(diag.compute()) self.gtgdiag = diag @@ -80,7 +81,7 @@ def dask_Jvec(self, m, v): if isinstance(self.Jmatrix, Future): self.Jmatrix # Wait to finish - return da.dot(self.Jmatrix, v) + return array.dot(self.Jmatrix, v) Sim.Jvec = dask_Jvec @@ -98,7 +99,7 @@ def dask_Jtvec(self, m, v): if isinstance(self.Jmatrix, Future): self.Jmatrix # Wait to finish - return da.dot(v, self.Jmatrix) + return array.dot(v, self.Jmatrix) Sim.Jtvec = dask_Jtvec @@ -127,8 +128,7 @@ def eval_store_block(A, freq, df_duT, df_dmT, u_src, src, row_count): """ Evaluate the sensitivities for the block or data and store to zarr """ - df_duT = np.hstack(df_duT) - ATinvdf_duT = (A * df_duT).reshape((dfduT.shape[0], -1)) + ATinvdf_duT = (A * np.hstack(df_duT)).reshape((df_duT[0].shape[0], -1)) dA_dmT = self.getADeriv(freq, u_src, ATinvdf_duT, adjoint=True) dRHS_dmT = self.getRHSDeriv(freq, src, ATinvdf_duT, adjoint=True) du_dmT = -dA_dmT @@ -152,56 +152,79 @@ def eval_store_block(A, freq, df_duT, df_dmT, u_src, src, row_count): row_count += block.shape[0] return row_count - blocks = [] + + def evaluate_receiver(source, receiver, mesh, fields, block): + dfduT, dfdmT = receiver.evalDeriv( + source, mesh, fields, v=block, adjoint=True + ) + + return dfduT, dfdmT + + + count = 0 block_count = 0 for A_i, freq in zip(Ainv, self.survey.frequencies): for src in self.survey.get_sources_by_frequency(freq): df_duT, df_dmT = [], [] + blocks = [] u_src = f[src, self._solutionType] for rx in src.receiver_list: v = np.eye(rx.nD, dtype=float) - n_blocs = np.ceil(2 * rx.nD / row_chunks) + n_blocs = np.ceil(2 * rx.nD / row_chunks * (int(multiprocessing.cpu_count() / 2))) for block in np.array_split(v, n_blocs, axis=1): - dfduT, dfdmT = rx.evalDeriv( - src, self.mesh, f, v=block, adjoint=True - ) - df_duT += [dfduT] - df_dmT += [dfdmT] - - block_count += dfduT.shape[1] + block_count += block.shape[1] * 2 + blocks.append(delayed(evaluate_receiver, pure=True)(src, rx, self.mesh, f, block)) if block_count >= row_chunks: - count = eval_store_block(A_i, freq, df_duT, df_dmT, u_src, src, count) - df_duT, df_dmT = [], [] + field_derivs = compute(blocks)[0] + count = eval_store_block( + A_i, + freq, + [deriv[0] for deriv in field_derivs], + [deriv[1] for deriv in field_derivs], + u_src, + src, + count + ) + blocks = [] block_count = 0 # blocks, count = store_block(blocks, count) if df_duT: - count = eval_store_block(A_i, freq, df_duT, df_dmT, u_src, src, count) + field_derivs = compute(blocks)[0] + count = eval_store_block( + A_i, + freq, + [deriv[0] for deriv in field_derivs], + [deriv[1] for deriv in field_derivs], + u_src, + src, + count + ) block_count = 0 - if len(blocks) != 0: - if self.store_sensitivities == "disk": - Jmatrix.set_orthogonal_selection( - (np.arange(count, self.survey.nD), slice(None)), - blocks.astype(np.float32) - ) - else: - Jmatrix[count: self.survey.nD, :] = ( - blocks.astype(np.float32) - ) + # if len(blocks) != 0: + # if self.store_sensitivities == "disk": + # Jmatrix.set_orthogonal_selection( + # (np.arange(count, self.survey.nD), slice(None)), + # blocks.astype(np.float32) + # ) + # else: + # Jmatrix[count: self.survey.nD, :] = ( + # blocks.astype(np.float32) + # ) for A in Ainv: A.clean() if self.store_sensitivities == "disk": del Jmatrix - return da.from_zarr(self.sensitivity_path + f"J.zarr") + return array.from_zarr(self.sensitivity_path + f"J.zarr") else: return Jmatrix diff --git a/SimPEG/dask/simulation.py b/SimPEG/dask/simulation.py index 88beb028d9..e762334fc8 100644 --- a/SimPEG/dask/simulation.py +++ b/SimPEG/dask/simulation.py @@ -196,16 +196,26 @@ def dask_dpred(self, m=None, f=None, compute_J=False): m = self.model f, Ainv = self.fields(m, return_Ainv=compute_J) - data = Data(self.survey) + def evaluate_receiver(source, receiver, mesh, fields): + return receiver.eval(source, mesh, fields) + + row = delayed(evaluate_receiver, pure=True) + rows = [] for src in self.survey.source_list: for rx in src.receiver_list: - data[src, rx] = rx.eval(src, self.mesh, f) + rows.append(array.from_delayed( + row(src, rx, self.mesh, f), + dtype=np.float32, + shape=(rx.nD,), + )) + + data = array.hstack(rows).compute() - if compute_J: + if compute_J and self._Jmatrix is None: Jmatrix = self.compute_J(f=f, Ainv=Ainv) - return (mkvc(data), Jmatrix) + return data, Jmatrix - return mkvc(data) + return data Sim.dpred = dask_dpred From 3086e422eb065fa841a05e4ac7e9ce9a4e536ee9 Mon Sep 17 00:00:00 2001 From: fourndo Date: Sat, 3 Dec 2022 09:13:07 -0800 Subject: [PATCH 05/25] Increase number of sub-block computed --- SimPEG/dask/electromagnetics/frequency_domain/simulation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index ad48dc2c13..7a4b067dcd 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -180,7 +180,7 @@ def evaluate_receiver(source, receiver, mesh, fields, block): block_count += block.shape[1] * 2 blocks.append(delayed(evaluate_receiver, pure=True)(src, rx, self.mesh, f, block)) - if block_count >= row_chunks: + if block_count >= (row_chunks * multiprocessing.cpu_count() / 2): field_derivs = compute(blocks)[0] count = eval_store_block( A_i, @@ -195,7 +195,7 @@ def evaluate_receiver(source, receiver, mesh, fields, block): block_count = 0 # blocks, count = store_block(blocks, count) - if df_duT: + if blocks: field_derivs = compute(blocks)[0] count = eval_store_block( A_i, From 0641afacdf0df976766bce7ba52e79102030f2a8 Mon Sep 17 00:00:00 2001 From: fourndo Date: Sat, 3 Dec 2022 09:21:22 -0800 Subject: [PATCH 06/25] Add printing --- SimPEG/dask/electromagnetics/frequency_domain/simulation.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 7a4b067dcd..6fda04ac49 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -1,5 +1,6 @@ from ....electromagnetics.frequency_domain.simulation import BaseFDEMSimulation as Sim from ....utils import Zero, mkvc +from time import time import numpy as np import scipy.sparse as sp import multiprocessing @@ -166,11 +167,11 @@ def evaluate_receiver(source, receiver, mesh, fields, block): block_count = 0 for A_i, freq in zip(Ainv, self.survey.frequencies): - for src in self.survey.get_sources_by_frequency(freq): + for ss, src in enumerate(self.survey.get_sources_by_frequency(freq)): df_duT, df_dmT = [], [] blocks = [] u_src = f[src, self._solutionType] - + ct = time() for rx in src.receiver_list: v = np.eye(rx.nD, dtype=float) n_blocs = np.ceil(2 * rx.nD / row_chunks * (int(multiprocessing.cpu_count() / 2))) @@ -181,6 +182,7 @@ def evaluate_receiver(source, receiver, mesh, fields, block): blocks.append(delayed(evaluate_receiver, pure=True)(src, rx, self.mesh, f, block)) if block_count >= (row_chunks * multiprocessing.cpu_count() / 2): + print(f"{ss}: Block {count}: {time()-time}") field_derivs = compute(blocks)[0] count = eval_store_block( A_i, From da18e8747acc8da4b7b937244829cc31a1036e1b Mon Sep 17 00:00:00 2001 From: fourndo Date: Sat, 3 Dec 2022 09:29:29 -0800 Subject: [PATCH 07/25] Bump version --- SimPEG/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SimPEG/__init__.py b/SimPEG/__init__.py index fa616489fe..b77e5a91ee 100644 --- a/SimPEG/__init__.py +++ b/SimPEG/__init__.py @@ -30,7 +30,7 @@ SolverBiCG, ) -__version__ = "0.15.1.dev6+geoapps.0.9.0" +__version__ = "0.15.1.dev6+geoapps.0.9.1" __author__ = "SimPEG Team" __license__ = "MIT" __copyright__ = "2013 - 2020, SimPEG Team, http://simpeg.xyz" From 1d3aefaed71ada6835a73bd4884e2f67edd88e6a Mon Sep 17 00:00:00 2001 From: fourndo Date: Sat, 3 Dec 2022 10:09:26 -0800 Subject: [PATCH 08/25] Debug message --- SimPEG/inversion.py | 1 + 1 file changed, 1 insertion(+) diff --git a/SimPEG/inversion.py b/SimPEG/inversion.py index 8b4cb023e6..fb50f7d3cb 100644 --- a/SimPEG/inversion.py +++ b/SimPEG/inversion.py @@ -61,6 +61,7 @@ def run(self, m0): """ self.invProb.startup(m0) + print("Starting message") self.directiveList.call("initialize") self.m = self.opt.minimize(self.invProb.evalFunction, self.invProb.model) self.directiveList.call("finish") From eeb79e5881befc38a8685016b756e48ecdef28a0 Mon Sep 17 00:00:00 2001 From: fourndo Date: Sat, 3 Dec 2022 10:44:45 -0800 Subject: [PATCH 09/25] Fix test string --- SimPEG/dask/electromagnetics/frequency_domain/simulation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 6fda04ac49..3a52a62465 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -182,7 +182,7 @@ def evaluate_receiver(source, receiver, mesh, fields, block): blocks.append(delayed(evaluate_receiver, pure=True)(src, rx, self.mesh, f, block)) if block_count >= (row_chunks * multiprocessing.cpu_count() / 2): - print(f"{ss}: Block {count}: {time()-time}") + print(f"{ss}: Block {count}: {time()-ct}") field_derivs = compute(blocks)[0] count = eval_store_block( A_i, From d597cde5682fe1d09ff722406b714d46269a61c6 Mon Sep 17 00:00:00 2001 From: fourndo Date: Sat, 3 Dec 2022 10:58:54 -0800 Subject: [PATCH 10/25] More prints --- SimPEG/dask/electromagnetics/frequency_domain/simulation.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 3a52a62465..6e18e7aecd 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -172,10 +172,11 @@ def evaluate_receiver(source, receiver, mesh, fields, block): blocks = [] u_src = f[src, self._solutionType] ct = time() + print("In loop over receivers") for rx in src.receiver_list: v = np.eye(rx.nD, dtype=float) n_blocs = np.ceil(2 * rx.nD / row_chunks * (int(multiprocessing.cpu_count() / 2))) - + print("In loop over blocks") for block in np.array_split(v, n_blocs, axis=1): block_count += block.shape[1] * 2 @@ -184,6 +185,7 @@ def evaluate_receiver(source, receiver, mesh, fields, block): if block_count >= (row_chunks * multiprocessing.cpu_count() / 2): print(f"{ss}: Block {count}: {time()-ct}") field_derivs = compute(blocks)[0] + print("Computing derivs") count = eval_store_block( A_i, freq, From 7418eaa0be734c475e6c019c94e248304458b337 Mon Sep 17 00:00:00 2001 From: fourndo Date: Sat, 3 Dec 2022 11:11:10 -0800 Subject: [PATCH 11/25] more prints --- SimPEG/dask/electromagnetics/frequency_domain/simulation.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 6e18e7aecd..254814ec2f 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -129,9 +129,13 @@ def eval_store_block(A, freq, df_duT, df_dmT, u_src, src, row_count): """ Evaluate the sensitivities for the block or data and store to zarr """ + print("Line 132") ATinvdf_duT = (A * np.hstack(df_duT)).reshape((df_duT[0].shape[0], -1)) + print("Line 134") dA_dmT = self.getADeriv(freq, u_src, ATinvdf_duT, adjoint=True) + print("Line 136") dRHS_dmT = self.getRHSDeriv(freq, src, ATinvdf_duT, adjoint=True) + print("Line 138") du_dmT = -dA_dmT if not isinstance(dRHS_dmT, Zero): du_dmT += dRHS_dmT @@ -139,7 +143,7 @@ def eval_store_block(A, freq, df_duT, df_dmT, u_src, src, row_count): du_dmT += np.hstack(df_dmT) block = np.array(du_dmT, dtype=complex).real.T - + print("Line 146") if self.store_sensitivities == "disk": Jmatrix.set_orthogonal_selection( (np.arange(row_count, row_count + block.shape[0]), slice(None)), From 34f5aa98a7be2b960cbba780caa83b5a61bc3144 Mon Sep 17 00:00:00 2001 From: fourndo Date: Sat, 3 Dec 2022 14:22:01 -0800 Subject: [PATCH 12/25] Try parallelizing more derivs --- .../frequency_domain/simulation.py | 101 ++++++++++++------ 1 file changed, 69 insertions(+), 32 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 254814ec2f..9868faf181 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -115,6 +115,7 @@ def compute_J(self, f=None, Ainv=None): row_chunks = int(np.ceil( float(self.survey.nD) / np.ceil(float(m_size) * self.survey.nD * 8. * 1e-6 / self.max_chunk_size) )) + sub_threads = int(multiprocessing.cpu_count() / 2) if self.store_sensitivities == "disk": Jmatrix = zarr.open( self.sensitivity_path + f"J.zarr", @@ -125,12 +126,12 @@ def compute_J(self, f=None, Ainv=None): else: Jmatrix = np.zeros((self.survey.nD, m_size), dtype=np.float32) - def eval_store_block(A, freq, df_duT, df_dmT, u_src, src, row_count): + def eval_store_block(ATinvdf_duT, freq, df_dmT, u_src, src, row_count): """ Evaluate the sensitivities for the block or data and store to zarr """ print("Line 132") - ATinvdf_duT = (A * np.hstack(df_duT)).reshape((df_duT[0].shape[0], -1)) + print("Line 134") dA_dmT = self.getADeriv(freq, u_src, ATinvdf_duT, adjoint=True) print("Line 136") @@ -154,66 +155,102 @@ def eval_store_block(A, freq, df_duT, df_dmT, u_src, src, row_count): block.astype(np.float32) ) - row_count += block.shape[0] - return row_count + # row_count += block.shape[0] + # return row_count - def evaluate_receiver(source, receiver, mesh, fields, block): - dfduT, dfdmT = receiver.evalDeriv( + def dfduT(source, receiver, mesh, fields, block): + dfduT, _ = receiver.evalDeriv( source, mesh, fields, v=block, adjoint=True ) - return dfduT, dfdmT + return dfduT + def dfdmT(source, receiver, mesh, fields, block): + _, dfdmT = receiver.evalDeriv( + source, mesh, fields, v=block, adjoint=True + ) + return dfdmT count = 0 block_count = 0 + for A_i, freq in zip(Ainv, self.survey.frequencies): for ss, src in enumerate(self.survey.get_sources_by_frequency(freq)): df_duT, df_dmT = [], [] - blocks = [] + blocks_dfduT = [] + blocks_dfdmT = [] u_src = f[src, self._solutionType] ct = time() print("In loop over receivers") for rx in src.receiver_list: v = np.eye(rx.nD, dtype=float) - n_blocs = np.ceil(2 * rx.nD / row_chunks * (int(multiprocessing.cpu_count() / 2))) + n_blocs = np.ceil(2 * rx.nD / row_chunks * sub_threads) print("In loop over blocks") for block in np.array_split(v, n_blocs, axis=1): block_count += block.shape[1] * 2 - blocks.append(delayed(evaluate_receiver, pure=True)(src, rx, self.mesh, f, block)) + blocks_dfduT.append( + array.from_delayed( + delayed(dfduT, pure=True)(src, rx, self.mesh, f, block), + dtype=np.float32, + shape=(u_src.shape[0], block.shape[1]*2) + ) + ) - if block_count >= (row_chunks * multiprocessing.cpu_count() / 2): + if block_count >= (row_chunks * sub_threads): print(f"{ss}: Block {count}: {time()-ct}") - field_derivs = compute(blocks)[0] - print("Computing derivs") - count = eval_store_block( - A_i, + field_derivs = array.hstack(blocks_dfduT).compute() + + ATinvdf_duT = (A_i * field_derivs).reshape((blocks_dfduT.shape[0], -1)) + sub_process = [] + for sub_block in np.array_split(ATinvdf_duT, sub_threads, axis=1): + print("Computing derivs") + sub_process.append( + delayed(eval_store_block, pure=True)( + freq, + sub_block, + Zero, + u_src, + src, + count + ) + ) + count += sub_block.shape[1] + # count = eval_store_block( + # A_i, + # freq, + # [deriv[0] for deriv in field_derivs], + # [deriv[1] for deriv in field_derivs], + # u_src, + # src, + # count + # ) + compute(sub_process) + blocks_dfduT = [] + block_count = 0 + # blocks_dfduT, count = store_block(blocks_dfduT, count) + + if blocks_dfduT: + field_derivs = array.hstack(blocks_dfduT).compute() + ATinvdf_duT = (A_i * field_derivs).reshape((blocks_dfduT.shape[0], -1)) + sub_process = [] + for sub_block in np.array_split(ATinvdf_duT, sub_threads, axis=1): + print("Computing derivs") + sub_process.append( + delayed(eval_store_block, pure=True)( freq, - [deriv[0] for deriv in field_derivs], - [deriv[1] for deriv in field_derivs], + sub_block, + Zero, u_src, src, count ) - blocks = [] - block_count = 0 - # blocks, count = store_block(blocks, count) - - if blocks: - field_derivs = compute(blocks)[0] - count = eval_store_block( - A_i, - freq, - [deriv[0] for deriv in field_derivs], - [deriv[1] for deriv in field_derivs], - u_src, - src, - count - ) + ) + count += sub_block.shape[1] + compute(sub_process) block_count = 0 # if len(blocks) != 0: From 59f8f58fd7d7e285a6b4ce9f05cee0bdf6f7d826 Mon Sep 17 00:00:00 2001 From: fourndo Date: Sat, 3 Dec 2022 15:48:22 -0800 Subject: [PATCH 13/25] More parallelization --- .../frequency_domain/simulation.py | 188 +++++++++--------- 1 file changed, 97 insertions(+), 91 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 9868faf181..8cf0293140 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -126,53 +126,6 @@ def compute_J(self, f=None, Ainv=None): else: Jmatrix = np.zeros((self.survey.nD, m_size), dtype=np.float32) - def eval_store_block(ATinvdf_duT, freq, df_dmT, u_src, src, row_count): - """ - Evaluate the sensitivities for the block or data and store to zarr - """ - print("Line 132") - - print("Line 134") - dA_dmT = self.getADeriv(freq, u_src, ATinvdf_duT, adjoint=True) - print("Line 136") - dRHS_dmT = self.getRHSDeriv(freq, src, ATinvdf_duT, adjoint=True) - print("Line 138") - du_dmT = -dA_dmT - if not isinstance(dRHS_dmT, Zero): - du_dmT += dRHS_dmT - if not isinstance(df_dmT[0], Zero): - du_dmT += np.hstack(df_dmT) - - block = np.array(du_dmT, dtype=complex).real.T - print("Line 146") - if self.store_sensitivities == "disk": - Jmatrix.set_orthogonal_selection( - (np.arange(row_count, row_count + block.shape[0]), slice(None)), - block.astype(np.float32) - ) - else: - Jmatrix[row_count: row_count + block.shape[0], :] = ( - block.astype(np.float32) - ) - - # row_count += block.shape[0] - # return row_count - - - def dfduT(source, receiver, mesh, fields, block): - dfduT, _ = receiver.evalDeriv( - source, mesh, fields, v=block, adjoint=True - ) - - return dfduT - - def dfdmT(source, receiver, mesh, fields, block): - _, dfdmT = receiver.evalDeriv( - source, mesh, fields, v=block, adjoint=True - ) - - return dfdmT - count = 0 block_count = 0 @@ -199,58 +152,20 @@ def dfdmT(source, receiver, mesh, fields, block): shape=(u_src.shape[0], block.shape[1]*2) ) ) + blocks_dfdmT.append( + delayed(dfdmT, pure=True)(src, rx, self.mesh, f, block), + ) if block_count >= (row_chunks * sub_threads): print(f"{ss}: Block {count}: {time()-ct}") - field_derivs = array.hstack(blocks_dfduT).compute() - - ATinvdf_duT = (A_i * field_derivs).reshape((blocks_dfduT.shape[0], -1)) - sub_process = [] - for sub_block in np.array_split(ATinvdf_duT, sub_threads, axis=1): - print("Computing derivs") - sub_process.append( - delayed(eval_store_block, pure=True)( - freq, - sub_block, - Zero, - u_src, - src, - count - ) - ) - count += sub_block.shape[1] - # count = eval_store_block( - # A_i, - # freq, - # [deriv[0] for deriv in field_derivs], - # [deriv[1] for deriv in field_derivs], - # u_src, - # src, - # count - # ) - compute(sub_process) + count = parallel_block_compute(self, Jmatrix, src, A_i, freq, blocks_dfduT, blocks_dfdmT, u_src,sub_threads, count) blocks_dfduT = [] + blocks_dfdmT = [] block_count = 0 # blocks_dfduT, count = store_block(blocks_dfduT, count) if blocks_dfduT: - field_derivs = array.hstack(blocks_dfduT).compute() - ATinvdf_duT = (A_i * field_derivs).reshape((blocks_dfduT.shape[0], -1)) - sub_process = [] - for sub_block in np.array_split(ATinvdf_duT, sub_threads, axis=1): - print("Computing derivs") - sub_process.append( - delayed(eval_store_block, pure=True)( - freq, - sub_block, - Zero, - u_src, - src, - count - ) - ) - count += sub_block.shape[1] - compute(sub_process) + count = parallel_block_compute(self, Jmatrix, src, A_i, freq, blocks_dfduT, blocks_dfdmT, u_src, sub_threads, count) block_count = 0 # if len(blocks) != 0: @@ -275,3 +190,94 @@ def dfdmT(source, receiver, mesh, fields, block): Sim.compute_J = compute_J + + +def eval_store_block(simulation, Jmatrix, ATinvdf_duT, freq, df_dmT, u_src, src, row_count): + """ + Evaluate the sensitivities for the block or data and store to zarr + """ + print("Line 132") + dA_dmT = simulation.getADeriv(freq, u_src, ATinvdf_duT, adjoint=True) + print("Line 136") + dRHS_dmT = simulation.getRHSDeriv(freq, src, ATinvdf_duT, adjoint=True) + print("Line 138") + du_dmT = -dA_dmT + if not isinstance(dRHS_dmT, Zero): + du_dmT += dRHS_dmT + if not isinstance(df_dmT, Zero): + du_dmT += df_dmT + + block = np.array(du_dmT, dtype=complex).real.T + print("Line 146") + if simulation.store_sensitivities == "disk": + Jmatrix.set_orthogonal_selection( + (np.arange(row_count, row_count + block.shape[0]), slice(None)), + block.astype(np.float32) + ) + else: + Jmatrix[row_count: row_count + block.shape[0], :] = ( + block.astype(np.float32) + ) + + # row_count += block.shape[0] + # return row_count + + +def dfduT(source, receiver, mesh, fields, block): + dfduT, _ = receiver.evalDeriv( + source, mesh, fields, v=block, adjoint=True + ) + + return dfduT + + +def dfdmT(source, receiver, mesh, fields, block): + _, dfdmT = receiver.evalDeriv( + source, mesh, fields, v=block, adjoint=True + ) + + return dfdmT + + +def parallel_block_compute(simulation, Jmatrix, src, A_i, freq, blocks_dfduT, blocks_dfdmT, u_src, sub_threads, count): + field_derivs = array.hstack(blocks_dfduT).compute() + + # Direct-solver call + ATinvdf_duT = A_i * field_derivs + + # Even split + split = np.linspace(0, (ATinvdf_duT.shape[1]) / 2, sub_threads)[1:-1].astype(int) * 2 + sub_blocks_dfduT = np.array_split(ATinvdf_duT, split, axis=1) + + if isinstance(compute(blocks_dfdmT[0])[0], Zero): + sub_blocks_dfdmt = [Zero()] * len(sub_blocks_dfduT) + else: + compute_blocks_dfdmT = array.hstack([ + array.from_delayed( + dfdmT_block, + dtype=np.float32, + shape=(u_src.shape[0], dfdmT_block.shape[1] * 2)) + for dfdmT_block in blocks_dfdmT + ]).compute() + sub_blocks_dfdmt = np.array_split(compute_blocks_dfdmT, split, axis=1) + + sub_process = [] + for sub_block_dfduT, sub_block_dfdmT in zip(sub_blocks_dfduT, sub_blocks_dfdmt): + print("Computing derivs") + sub_process.append( + delayed(eval_store_block, pure=True)( + simulation, + Jmatrix, + sub_block_dfduT, + freq, + sub_block_dfdmT, + u_src, + src, + count + ) + ) + count += int(sub_block_dfduT.shape[1] / 2) + + compute(sub_process) + + return count \ No newline at end of file From d23569fd462612b8ebc6165d528240ae26e90025 Mon Sep 17 00:00:00 2001 From: fourndo Date: Sun, 4 Dec 2022 10:23:44 -0800 Subject: [PATCH 14/25] Fix size problems, change testing messages --- .../frequency_domain/simulation.py | 43 ++++++++----------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 8cf0293140..1fc6e66335 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -24,10 +24,6 @@ def fields(self, m=None, return_Ainv=False): for freq in self.survey.frequencies: A = self.getA(freq) rhs = self.getRHS(freq) - - if return_Ainv: - Ainv += [self.solver(sp.csr_matrix(A.T), **self.solver_opts)] - Ainv_solve = self.solver(sp.csr_matrix(A), **self.solver_opts) u = Ainv_solve * rhs Srcs = self.survey.get_sources_by_frequency(freq) @@ -35,6 +31,10 @@ def fields(self, m=None, return_Ainv=False): Ainv_solve.clean() + if return_Ainv: + Ainv += [self.solver(sp.csr_matrix(A.T), **self.solver_opts)] + + if return_Ainv: return f, Ainv else: @@ -115,7 +115,7 @@ def compute_J(self, f=None, Ainv=None): row_chunks = int(np.ceil( float(self.survey.nD) / np.ceil(float(m_size) * self.survey.nD * 8. * 1e-6 / self.max_chunk_size) )) - sub_threads = int(multiprocessing.cpu_count() / 2) + sub_threads = int(multiprocessing.cpu_count()) if self.store_sensitivities == "disk": Jmatrix = zarr.open( self.sensitivity_path + f"J.zarr", @@ -136,11 +136,16 @@ def compute_J(self, f=None, Ainv=None): blocks_dfduT = [] blocks_dfdmT = [] u_src = f[src, self._solutionType] + + col_chunks = int(np.ceil( + float(self.survey.nD) / np.ceil(float(u_src.shape[0]) * self.survey.nD * 8. * 1e-6 / self.max_chunk_size) + )) + ct = time() print("In loop over receivers") for rx in src.receiver_list: v = np.eye(rx.nD, dtype=float) - n_blocs = np.ceil(2 * rx.nD / row_chunks * sub_threads) + n_blocs = np.ceil(2 * rx.nD / col_chunks * sub_threads) print("In loop over blocks") for block in np.array_split(v, n_blocs, axis=1): @@ -156,29 +161,17 @@ def compute_J(self, f=None, Ainv=None): delayed(dfdmT, pure=True)(src, rx, self.mesh, f, block), ) - if block_count >= (row_chunks * sub_threads): + if block_count >= (col_chunks * sub_threads): print(f"{ss}: Block {count}: {time()-ct}") count = parallel_block_compute(self, Jmatrix, src, A_i, freq, blocks_dfduT, blocks_dfdmT, u_src,sub_threads, count) blocks_dfduT = [] blocks_dfdmT = [] block_count = 0 - # blocks_dfduT, count = store_block(blocks_dfduT, count) if blocks_dfduT: count = parallel_block_compute(self, Jmatrix, src, A_i, freq, blocks_dfduT, blocks_dfdmT, u_src, sub_threads, count) block_count = 0 - # if len(blocks) != 0: - # if self.store_sensitivities == "disk": - # Jmatrix.set_orthogonal_selection( - # (np.arange(count, self.survey.nD), slice(None)), - # blocks.astype(np.float32) - # ) - # else: - # Jmatrix[count: self.survey.nD, :] = ( - # blocks.astype(np.float32) - # ) - for A in Ainv: A.clean() @@ -196,11 +189,8 @@ def eval_store_block(simulation, Jmatrix, ATinvdf_duT, freq, df_dmT, u_src, src, """ Evaluate the sensitivities for the block or data and store to zarr """ - print("Line 132") dA_dmT = simulation.getADeriv(freq, u_src, ATinvdf_duT, adjoint=True) - print("Line 136") dRHS_dmT = simulation.getRHSDeriv(freq, src, ATinvdf_duT, adjoint=True) - print("Line 138") du_dmT = -dA_dmT if not isinstance(dRHS_dmT, Zero): du_dmT += dRHS_dmT @@ -208,7 +198,7 @@ def eval_store_block(simulation, Jmatrix, ATinvdf_duT, freq, df_dmT, u_src, src, du_dmT += df_dmT block = np.array(du_dmT, dtype=complex).real.T - print("Line 146") + if simulation.store_sensitivities == "disk": Jmatrix.set_orthogonal_selection( (np.arange(row_count, row_count + block.shape[0]), slice(None)), @@ -240,12 +230,15 @@ def dfdmT(source, receiver, mesh, fields, block): def parallel_block_compute(simulation, Jmatrix, src, A_i, freq, blocks_dfduT, blocks_dfdmT, u_src, sub_threads, count): + print("Computing derivs blocks") field_derivs = array.hstack(blocks_dfduT).compute() # Direct-solver call + print("Direct solve") ATinvdf_duT = A_i * field_derivs # Even split + print("Splitting") split = np.linspace(0, (ATinvdf_duT.shape[1]) / 2, sub_threads)[1:-1].astype(int) * 2 sub_blocks_dfduT = np.array_split(ATinvdf_duT, split, axis=1) @@ -262,8 +255,8 @@ def parallel_block_compute(simulation, Jmatrix, src, A_i, freq, blocks_dfduT, bl sub_blocks_dfdmt = np.array_split(compute_blocks_dfdmT, split, axis=1) sub_process = [] + print("Storing blocks") for sub_block_dfduT, sub_block_dfdmT in zip(sub_blocks_dfduT, sub_blocks_dfdmt): - print("Computing derivs") sub_process.append( delayed(eval_store_block, pure=True)( simulation, @@ -277,7 +270,7 @@ def parallel_block_compute(simulation, Jmatrix, src, A_i, freq, blocks_dfduT, bl ) ) count += int(sub_block_dfduT.shape[1] / 2) - + print("Compute Storing blocks") compute(sub_process) return count \ No newline at end of file From 9807cb7447d5ffdcfe5cff9c96b13b938ea6095a Mon Sep 17 00:00:00 2001 From: fourndo Date: Sun, 4 Dec 2022 12:08:42 -0800 Subject: [PATCH 15/25] Re-work of methods to reduce spin up time --- .../frequency_domain/simulation.py | 105 +++++++++--------- 1 file changed, 54 insertions(+), 51 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 1fc6e66335..2ac4dbb4df 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -142,6 +142,7 @@ def compute_J(self, f=None, Ainv=None): )) ct = time() + print("In loop over receivers") for rx in src.receiver_list: v = np.eye(rx.nD, dtype=float) @@ -163,13 +164,14 @@ def compute_J(self, f=None, Ainv=None): if block_count >= (col_chunks * sub_threads): print(f"{ss}: Block {count}: {time()-ct}") - count = parallel_block_compute(self, Jmatrix, src, A_i, freq, blocks_dfduT, blocks_dfdmT, u_src,sub_threads, count) + count = parallel_block_compute(self, A_i, Jmatrix, freq, u_src, src, blocks_dfduT, blocks_dfdmT, count, sub_threads, m_size) blocks_dfduT = [] blocks_dfdmT = [] block_count = 0 if blocks_dfduT: - count = parallel_block_compute(self, Jmatrix, src, A_i, freq, blocks_dfduT, blocks_dfdmT, u_src, sub_threads, count) + count = parallel_block_compute( + self, A_i, Jmatrix, freq, u_src, src, blocks_dfduT, blocks_dfdmT, count, sub_threads, m_size) block_count = 0 for A in Ainv: @@ -185,34 +187,6 @@ def compute_J(self, f=None, Ainv=None): Sim.compute_J = compute_J -def eval_store_block(simulation, Jmatrix, ATinvdf_duT, freq, df_dmT, u_src, src, row_count): - """ - Evaluate the sensitivities for the block or data and store to zarr - """ - dA_dmT = simulation.getADeriv(freq, u_src, ATinvdf_duT, adjoint=True) - dRHS_dmT = simulation.getRHSDeriv(freq, src, ATinvdf_duT, adjoint=True) - du_dmT = -dA_dmT - if not isinstance(dRHS_dmT, Zero): - du_dmT += dRHS_dmT - if not isinstance(df_dmT, Zero): - du_dmT += df_dmT - - block = np.array(du_dmT, dtype=complex).real.T - - if simulation.store_sensitivities == "disk": - Jmatrix.set_orthogonal_selection( - (np.arange(row_count, row_count + block.shape[0]), slice(None)), - block.astype(np.float32) - ) - else: - Jmatrix[row_count: row_count + block.shape[0], :] = ( - block.astype(np.float32) - ) - - # row_count += block.shape[0] - # return row_count - - def dfduT(source, receiver, mesh, fields, block): dfduT, _ = receiver.evalDeriv( source, mesh, fields, v=block, adjoint=True @@ -229,9 +203,24 @@ def dfdmT(source, receiver, mesh, fields, block): return dfdmT -def parallel_block_compute(simulation, Jmatrix, src, A_i, freq, blocks_dfduT, blocks_dfdmT, u_src, sub_threads, count): +def eval_block(simulation, Ainv_deriv_u, frequency, deriv_m, fields, source): + """ + Evaluate the sensitivities for the block or data and store to zarr + """ + dA_dmT = simulation.getADeriv(frequency, fields, Ainv_deriv_u, adjoint=True) + dRHS_dmT = simulation.getRHSDeriv(frequency, source, Ainv_deriv_u, adjoint=True) + du_dmT = -dA_dmT + if not isinstance(dRHS_dmT, Zero): + du_dmT += dRHS_dmT + if not isinstance(deriv_m, Zero): + du_dmT += deriv_m + + return np.array(du_dmT, dtype=complex).real.T + + +def parallel_block_compute(simulation, A_i, Jmatrix, freq, u_src, src, blocks_deriv_u, blocks_deriv_m, counter, sub_threads, m_size): print("Computing derivs blocks") - field_derivs = array.hstack(blocks_dfduT).compute() + field_derivs = array.hstack(blocks_deriv_u).compute() # Direct-solver call print("Direct solve") @@ -240,37 +229,51 @@ def parallel_block_compute(simulation, Jmatrix, src, A_i, freq, blocks_dfduT, bl # Even split print("Splitting") split = np.linspace(0, (ATinvdf_duT.shape[1]) / 2, sub_threads)[1:-1].astype(int) * 2 - sub_blocks_dfduT = np.array_split(ATinvdf_duT, split, axis=1) + sub_blocks_deriv_u = np.array_split(ATinvdf_duT, split, axis=1) - if isinstance(compute(blocks_dfdmT[0])[0], Zero): - sub_blocks_dfdmt = [Zero()] * len(sub_blocks_dfduT) + if isinstance(compute(blocks_deriv_m[0])[0], Zero): + sub_blocks_dfdmt = [Zero()] * len(sub_blocks_deriv_u) else: - compute_blocks_dfdmT = array.hstack([ + compute_blocks_deriv_m = array.hstack([ array.from_delayed( dfdmT_block, dtype=np.float32, shape=(u_src.shape[0], dfdmT_block.shape[1] * 2)) - for dfdmT_block in blocks_dfdmT + for dfdmT_block in blocks_deriv_m ]).compute() - sub_blocks_dfdmt = np.array_split(compute_blocks_dfdmT, split, axis=1) + sub_blocks_dfdmt = np.array_split(compute_blocks_deriv_m, split, axis=1) sub_process = [] print("Storing blocks") - for sub_block_dfduT, sub_block_dfdmT in zip(sub_blocks_dfduT, sub_blocks_dfdmt): + for sub_block_dfduT, sub_block_dfdmT in zip(sub_blocks_deriv_u, sub_blocks_dfdmt): + row_size = int(sub_block_dfduT.shape[1] / 2) sub_process.append( - delayed(eval_store_block, pure=True)( - simulation, - Jmatrix, - sub_block_dfduT, - freq, - sub_block_dfdmT, - u_src, - src, - count + array.from_delayed( + delayed(eval_block, pure=True)( + simulation, + sub_block_dfduT, + freq, + sub_block_dfdmT, + u_src, + src + ), + dtype=np.float32, + shape=(row_size, m_size) ) ) - count += int(sub_block_dfduT.shape[1] / 2) + print("Compute Storing blocks") - compute(sub_process) + block = array.vstack(sub_process).compute() + + if simulation.store_sensitivities == "disk": + Jmatrix.set_orthogonal_selection( + (np.arange(counter, counter + block.shape[0]), slice(None)), + block.astype(np.float32) + ) + else: + Jmatrix[counter: counter + block.shape[0], :] = ( + block.astype(np.float32) + ) - return count \ No newline at end of file + counter += block.shape[0] + return counter \ No newline at end of file From 926c3815152d4a7d03c3f9cb7cf65da125d73c20 Mon Sep 17 00:00:00 2001 From: fourndo Date: Mon, 5 Dec 2022 09:25:21 -0800 Subject: [PATCH 16/25] Remove prints --- .../frequency_domain/simulation.py | 17 +++++++---------- SimPEG/dask/inverse_problem.py | 2 +- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 2ac4dbb4df..4ff0bf0a0b 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -141,13 +141,10 @@ def compute_J(self, f=None, Ainv=None): float(self.survey.nD) / np.ceil(float(u_src.shape[0]) * self.survey.nD * 8. * 1e-6 / self.max_chunk_size) )) - ct = time() - - print("In loop over receivers") for rx in src.receiver_list: v = np.eye(rx.nD, dtype=float) n_blocs = np.ceil(2 * rx.nD / col_chunks * sub_threads) - print("In loop over blocks") + for block in np.array_split(v, n_blocs, axis=1): block_count += block.shape[1] * 2 @@ -163,7 +160,7 @@ def compute_J(self, f=None, Ainv=None): ) if block_count >= (col_chunks * sub_threads): - print(f"{ss}: Block {count}: {time()-ct}") + count = parallel_block_compute(self, A_i, Jmatrix, freq, u_src, src, blocks_dfduT, blocks_dfdmT, count, sub_threads, m_size) blocks_dfduT = [] blocks_dfdmT = [] @@ -219,15 +216,15 @@ def eval_block(simulation, Ainv_deriv_u, frequency, deriv_m, fields, source): def parallel_block_compute(simulation, A_i, Jmatrix, freq, u_src, src, blocks_deriv_u, blocks_deriv_m, counter, sub_threads, m_size): - print("Computing derivs blocks") + field_derivs = array.hstack(blocks_deriv_u).compute() # Direct-solver call - print("Direct solve") + ATinvdf_duT = A_i * field_derivs # Even split - print("Splitting") + split = np.linspace(0, (ATinvdf_duT.shape[1]) / 2, sub_threads)[1:-1].astype(int) * 2 sub_blocks_deriv_u = np.array_split(ATinvdf_duT, split, axis=1) @@ -244,7 +241,7 @@ def parallel_block_compute(simulation, A_i, Jmatrix, freq, u_src, src, blocks_de sub_blocks_dfdmt = np.array_split(compute_blocks_deriv_m, split, axis=1) sub_process = [] - print("Storing blocks") + for sub_block_dfduT, sub_block_dfdmT in zip(sub_blocks_deriv_u, sub_blocks_dfdmt): row_size = int(sub_block_dfduT.shape[1] / 2) sub_process.append( @@ -262,7 +259,7 @@ def parallel_block_compute(simulation, A_i, Jmatrix, freq, u_src, src, blocks_de ) ) - print("Compute Storing blocks") + block = array.vstack(sub_process).compute() if simulation.store_sensitivities == "disk": diff --git a/SimPEG/dask/inverse_problem.py b/SimPEG/dask/inverse_problem.py index abfd922cce..83800fd84a 100644 --- a/SimPEG/dask/inverse_problem.py +++ b/SimPEG/dask/inverse_problem.py @@ -136,7 +136,7 @@ def get_dpred(self, m, f=None, compute_J=False): vec, compute_J=compute_sensitivities ))[0] - if compute_sensitivities and objfct.simulation.verbose: + if compute_sensitivities: runtime = time() - ct total = len(self.dmisfit.objfcts) From b59bd9f591cb73bbb5c80ad964e7d9903db2ba01 Mon Sep 17 00:00:00 2001 From: fourndo Date: Mon, 12 Dec 2022 15:05:32 -0800 Subject: [PATCH 17/25] Try fixing tests with single subprocess --- .../dask/electromagnetics/frequency_domain/simulation.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 4ff0bf0a0b..00f92ccd52 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -1,13 +1,11 @@ from ....electromagnetics.frequency_domain.simulation import BaseFDEMSimulation as Sim -from ....utils import Zero, mkvc -from time import time +from ....utils import Zero import numpy as np import scipy.sparse as sp import multiprocessing from dask import array, compute, delayed from dask.distributed import Future import zarr -from time import time Sim.sensitivity_path = './sensitivity/' Sim.gtgdiag = None @@ -115,7 +113,7 @@ def compute_J(self, f=None, Ainv=None): row_chunks = int(np.ceil( float(self.survey.nD) / np.ceil(float(m_size) * self.survey.nD * 8. * 1e-6 / self.max_chunk_size) )) - sub_threads = int(multiprocessing.cpu_count()) + sub_threads = 1 if self.store_sensitivities == "disk": Jmatrix = zarr.open( self.sensitivity_path + f"J.zarr", @@ -259,7 +257,6 @@ def parallel_block_compute(simulation, A_i, Jmatrix, freq, u_src, src, blocks_de ) ) - block = array.vstack(sub_process).compute() if simulation.store_sensitivities == "disk": From 24ce190ce303e0c0557ea71498b77dcd535e4b91 Mon Sep 17 00:00:00 2001 From: fourndo Date: Mon, 12 Dec 2022 15:46:31 -0800 Subject: [PATCH 18/25] add n_cpu property with default to cpu_count --- .../frequency_domain/simulation.py | 12 +++++----- SimPEG/dask/simulation.py | 24 +++++++++++++++---- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 00f92ccd52..8f9b48595a 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -2,7 +2,7 @@ from ....utils import Zero import numpy as np import scipy.sparse as sp -import multiprocessing + from dask import array, compute, delayed from dask.distributed import Future import zarr @@ -113,7 +113,7 @@ def compute_J(self, f=None, Ainv=None): row_chunks = int(np.ceil( float(self.survey.nD) / np.ceil(float(m_size) * self.survey.nD * 8. * 1e-6 / self.max_chunk_size) )) - sub_threads = 1 + if self.store_sensitivities == "disk": Jmatrix = zarr.open( self.sensitivity_path + f"J.zarr", @@ -141,7 +141,7 @@ def compute_J(self, f=None, Ainv=None): for rx in src.receiver_list: v = np.eye(rx.nD, dtype=float) - n_blocs = np.ceil(2 * rx.nD / col_chunks * sub_threads) + n_blocs = np.ceil(2 * rx.nD / col_chunks * self.n_cpu) for block in np.array_split(v, n_blocs, axis=1): @@ -157,16 +157,16 @@ def compute_J(self, f=None, Ainv=None): delayed(dfdmT, pure=True)(src, rx, self.mesh, f, block), ) - if block_count >= (col_chunks * sub_threads): + if block_count >= (col_chunks * self.n_cpu): - count = parallel_block_compute(self, A_i, Jmatrix, freq, u_src, src, blocks_dfduT, blocks_dfdmT, count, sub_threads, m_size) + count = parallel_block_compute(self, A_i, Jmatrix, freq, u_src, src, blocks_dfduT, blocks_dfdmT, count, self.n_cpu, m_size) blocks_dfduT = [] blocks_dfdmT = [] block_count = 0 if blocks_dfduT: count = parallel_block_compute( - self, A_i, Jmatrix, freq, u_src, src, blocks_dfduT, blocks_dfdmT, count, sub_threads, m_size) + self, A_i, Jmatrix, freq, u_src, src, blocks_dfduT, blocks_dfdmT, count, self.n_cpu, m_size) block_count = 0 for A in Ainv: diff --git a/SimPEG/dask/simulation.py b/SimPEG/dask/simulation.py index e762334fc8..84b55e9098 100644 --- a/SimPEG/dask/simulation.py +++ b/SimPEG/dask/simulation.py @@ -1,13 +1,11 @@ from ..simulation import BaseSimulation as Sim -from dask.distributed import get_client, Future, Client +from dask.distributed import get_client, Future from dask import array, delayed -from dask.delayed import Delayed +import multiprocessing import warnings from ..data import SyntheticData import numpy as np from .utils import compute -from ..utils import mkvc -from ..data import Data Sim._max_ram = 16 @@ -45,6 +43,24 @@ def max_chunk_size(self, other): Sim.max_chunk_size = max_chunk_size +@property +def n_cpu(self): + """Number of cpu's available.""" + if getattr(self, "_n_cpu", None) is None: + self._n_cpu = int(multiprocessing.cpu_count()) + return self._n_cpu + + +@n_cpu.setter +def n_cpu(self, other): + if other <= 0: + raise ValueError("n_cpu must be greater than 0") + self._n_cpu = other + + +Sim.n_cpu = n_cpu + + def make_synthetic_data( self, m, relative_error=0.05, noise_floor=0.0, f=None, add_noise=False, **kwargs ): From 5533d3328cc0845da419290d2592d237022502c7 Mon Sep 17 00:00:00 2001 From: fourndo Date: Mon, 12 Dec 2022 17:51:23 -0800 Subject: [PATCH 19/25] Fix bug in total gradient with spherical --- SimPEG/regularization/sparse.py | 56 ++++++++++++++++----------------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/SimPEG/regularization/sparse.py b/SimPEG/regularization/sparse.py index 5c691409c1..61ffd95670 100755 --- a/SimPEG/regularization/sparse.py +++ b/SimPEG/regularization/sparse.py @@ -313,45 +313,43 @@ def f_m(self): else: f_m = self.model - if self.space == "spherical": + if self.gradientType == "total": - cell_diff = getattr( - self.regmesh, "cellDiff{}Stencil".format(self.orientation) - ) - theta = cell_diff * (self.mapping * f_m) - dmdx = self.length_scales * utils.mat_utils.coterminal(theta) + dmdx = self.regmesh.cellDiffxStencil * (self.mapping * f_m) - else: + if self.space == "spherical": + dmdx = utils.mat_utils.coterminal(dmdx) - if self.gradientType == "total": - Ave = getattr(self.regmesh, "aveCC2F{}".format(self.orientation)) + dmdx = (self.regmesh.aveFx2CC * dmdx) ** 2.0 - dmdx = np.abs( - self.regmesh.aveFx2CC - * self.regmesh.cellDiffxStencil - * (self.mapping * f_m) - ) + if self.regmesh.dim > 1: - if self.regmesh.dim > 1: + dmdy = self.regmesh.cellDiffyStencil * (self.mapping * f_m) - dmdx += np.abs( - self.regmesh.aveFy2CC - * self.regmesh.cellDiffyStencil - * (self.mapping * f_m) - ) + if self.space == "spherical": + dmdy = utils.mat_utils.coterminal(dmdy) - if self.regmesh.dim > 2: + dmdy = (self.regmesh.aveFy2CC * dmdy) ** 2.0 + dmdx += dmdy - dmdx += np.abs( - self.regmesh.aveFz2CC - * self.regmesh.cellDiffzStencil - * (self.mapping * f_m) - ) + if self.regmesh.dim > 2: + dmdz = self.regmesh.cellDiffzStencil * (self.mapping * f_m) - dmdx = Ave * dmdx + if self.space == "spherical": + dmdz = utils.mat_utils.coterminal(dmdz) - else: - dmdx = self.cellDiffStencil * (self.mapping * f_m) + dmdz = (self.regmesh.aveFz2CC * dmdz) ** 2.0 + dmdx += dmdz + + Ave = getattr(self.regmesh, "aveCC2F{}".format(self.orientation)) + dmdx = (Ave * dmdx) ** 0.5 + + else: + dmdx = self.cellDiffStencil * (self.mapping * f_m) + if self.space == "spherical": + dmdx = utils.mat_utils.coterminal(dmdx) + + dmdx *= self.length_scales return dmdx From f2cd41bf22eafd43d66adbfc60051e2c2e20e199 Mon Sep 17 00:00:00 2001 From: fourndo Date: Mon, 12 Dec 2022 17:52:14 -0800 Subject: [PATCH 20/25] Bump version --- SimPEG/__init__.py | 2 +- docs/conf.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/SimPEG/__init__.py b/SimPEG/__init__.py index b77e5a91ee..c625c4e40a 100644 --- a/SimPEG/__init__.py +++ b/SimPEG/__init__.py @@ -30,7 +30,7 @@ SolverBiCG, ) -__version__ = "0.15.1.dev6+geoapps.0.9.1" +__version__ = "0.15.1.dev7+geoapps.0.10.0" __author__ = "SimPEG Team" __license__ = "MIT" __copyright__ = "2013 - 2020, SimPEG Team, http://simpeg.xyz" diff --git a/docs/conf.py b/docs/conf.py index 3ee2c2434e..0c951c9bc5 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -72,7 +72,7 @@ # The short X.Y version. version = "0.15.1" # The full version, including alpha/beta/rc tags. -release = "0.15.1.dev6+geoapps.0.9.0" +release = "0.15.1.dev7+geoapps.0.10.0" # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/pyproject.toml b/pyproject.toml index f34b660a01..587fad20a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ [tool.poetry] name = "Mira-SimPEG" -version = "0.15.1.dev6" +version = "0.15.1.dev7" license = "MIT" description = "Mira Geoscience fork of SimPEG: Simulation and Parameter Estimation in Geophysics" From 5aa2cbf1199de9121082ed0a370959c28b9e08f2 Mon Sep 17 00:00:00 2001 From: fourndo Date: Tue, 13 Dec 2022 10:02:06 -0800 Subject: [PATCH 21/25] Temp removal of delay on pred --- SimPEG/dask/simulation.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/SimPEG/dask/simulation.py b/SimPEG/dask/simulation.py index 84b55e9098..9225bdf847 100644 --- a/SimPEG/dask/simulation.py +++ b/SimPEG/dask/simulation.py @@ -219,13 +219,14 @@ def evaluate_receiver(source, receiver, mesh, fields): rows = [] for src in self.survey.source_list: for rx in src.receiver_list: - rows.append(array.from_delayed( - row(src, rx, self.mesh, f), - dtype=np.float32, - shape=(rx.nD,), - )) - - data = array.hstack(rows).compute() + # rows.append(array.from_delayed( + # row(src, rx, self.mesh, f), + # dtype=np.float32, + # shape=(rx.nD,), + # )) + rows.append(rx.eval(src, self.mesh, f)) + + data = np.r_[rows].flatten() if compute_J and self._Jmatrix is None: Jmatrix = self.compute_J(f=f, Ainv=Ainv) From 880295ca8b2ce6e8c479276171427f1422235381 Mon Sep 17 00:00:00 2001 From: fourndo Date: Tue, 13 Dec 2022 10:08:54 -0800 Subject: [PATCH 22/25] Revert delayed dpred --- SimPEG/dask/simulation.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/SimPEG/dask/simulation.py b/SimPEG/dask/simulation.py index 9225bdf847..84b55e9098 100644 --- a/SimPEG/dask/simulation.py +++ b/SimPEG/dask/simulation.py @@ -219,14 +219,13 @@ def evaluate_receiver(source, receiver, mesh, fields): rows = [] for src in self.survey.source_list: for rx in src.receiver_list: - # rows.append(array.from_delayed( - # row(src, rx, self.mesh, f), - # dtype=np.float32, - # shape=(rx.nD,), - # )) - rows.append(rx.eval(src, self.mesh, f)) - - data = np.r_[rows].flatten() + rows.append(array.from_delayed( + row(src, rx, self.mesh, f), + dtype=np.float32, + shape=(rx.nD,), + )) + + data = array.hstack(rows).compute() if compute_J and self._Jmatrix is None: Jmatrix = self.compute_J(f=f, Ainv=Ainv) From 3d5a6adde9404a01c6253678ce4c5ae82b297dd8 Mon Sep 17 00:00:00 2001 From: fourndo Date: Sat, 17 Dec 2022 10:42:21 -0800 Subject: [PATCH 23/25] REmove delay on dask.simulation dpred. Move dpred from dcr to base. --- .../static/resistivity/simulation.py | 87 +++++++++---------- SimPEG/dask/inverse_problem.py | 13 +-- SimPEG/dask/simulation.py | 3 +- 3 files changed, 48 insertions(+), 55 deletions(-) diff --git a/SimPEG/dask/electromagnetics/static/resistivity/simulation.py b/SimPEG/dask/electromagnetics/static/resistivity/simulation.py index 458ab47230..5b55c62fd5 100644 --- a/SimPEG/dask/electromagnetics/static/resistivity/simulation.py +++ b/SimPEG/dask/electromagnetics/static/resistivity/simulation.py @@ -1,22 +1,21 @@ import scipy.sparse as sp +from SimPEG.dask.simulation import dask_dpred from .....electromagnetics.static.resistivity.simulation import BaseDCSimulation as Sim -from .....utils import Zero, mkvc -from .....data import Data -from ....utils import compute_chunk_sizes -import dask +from .....utils import Zero import dask.array as da from dask.distributed import Future import numpy as np import zarr -import os -import shutil + import numcodecs numcodecs.blosc.use_threads = False Sim.sensitivity_path = './sensitivity/' +Sim.dpred = dask_dpred + def dask_fields(self, m=None, return_Ainv=False): if m is not None: @@ -195,47 +194,47 @@ def compute_J(self, f=None, Ainv=None): Sim.compute_J = compute_J -# This could technically be handled by dask.simulation, but doesn't seem to register -@dask.delayed -def dask_dpred(self, m=None, f=None, compute_J=False): - """ - dpred(m, f=None) - Create the projected data from a model. - The fields, f, (if provided) will be used for the predicted data - instead of recalculating the fields (which may be expensive!). - - .. math:: - - d_\\text{pred} = P(f(m)) - - Where P is a projection of the fields onto the data space. - """ - if self.survey is None: - raise AttributeError( - "The survey has not yet been set and is required to compute " - "data. Please set the survey for the simulation: " - "simulation.survey = survey" - ) - - if f is None: - if m is None: - m = self.model - f, Ainv = self.fields(m, return_Ainv=compute_J) +# # This could technically be handled by dask.simulation, but doesn't seem to register +# @dask.delayed +# def dask_dpred(self, m=None, f=None, compute_J=False): +# """ +# dpred(m, f=None) +# Create the projected data from a model. +# The fields, f, (if provided) will be used for the predicted data +# instead of recalculating the fields (which may be expensive!). +# +# .. math:: +# +# d_\\text{pred} = P(f(m)) +# +# Where P is a projection of the fields onto the data space. +# """ +# if self.survey is None: +# raise AttributeError( +# "The survey has not yet been set and is required to compute " +# "data. Please set the survey for the simulation: " +# "simulation.survey = survey" +# ) +# +# if f is None: +# if m is None: +# m = self.model +# f, Ainv = self.fields(m, return_Ainv=compute_J) +# +# data = Data(self.survey) +# for src in self.survey.source_list: +# for rx in src.receiver_list: +# data[src, rx] = rx.eval(src, self.mesh, f) +# +# if compute_J: +# self._Jmatrix = None +# Jmatrix = self.compute_J(f=f, Ainv=Ainv) +# return mkvc(data), Jmatrix +# +# return mkvc(data) - data = Data(self.survey) - for src in self.survey.source_list: - for rx in src.receiver_list: - data[src, rx] = rx.eval(src, self.mesh, f) - if compute_J: - self._Jmatrix = None - Jmatrix = self.compute_J(f=f, Ainv=Ainv) - return mkvc(data), Jmatrix - return mkvc(data) - - -Sim.dpred = dask_dpred def dask_getSourceTerm(self): diff --git a/SimPEG/dask/inverse_problem.py b/SimPEG/dask/inverse_problem.py index 83800fd84a..990cb0ce61 100644 --- a/SimPEG/dask/inverse_problem.py +++ b/SimPEG/dask/inverse_problem.py @@ -126,15 +126,10 @@ def get_dpred(self, m, f=None, compute_J=False): else: # For locals, the future is now ct = time() - if objfct.simulation.verbose and (compute_sensitivities or objfct.simulation.store_sensitivities == "forward_only"): - with ProgressBar(): - future = da.compute(objfct.simulation.dpred( - vec, compute_J=compute_sensitivities - ))[0] - else: - future = da.compute(objfct.simulation.dpred( - vec, compute_J=compute_sensitivities - ))[0] + + future = objfct.simulation.dpred( + vec, compute_J=compute_sensitivities + ) if compute_sensitivities: runtime = time() - ct diff --git a/SimPEG/dask/simulation.py b/SimPEG/dask/simulation.py index 84b55e9098..f94e44d5f5 100644 --- a/SimPEG/dask/simulation.py +++ b/SimPEG/dask/simulation.py @@ -186,7 +186,6 @@ def Jmatrix(self): Sim.Jmatrix = Jmatrix -@delayed def dask_dpred(self, m=None, f=None, compute_J=False): """ dpred(m, f=None) @@ -213,7 +212,7 @@ def dask_dpred(self, m=None, f=None, compute_J=False): f, Ainv = self.fields(m, return_Ainv=compute_J) def evaluate_receiver(source, receiver, mesh, fields): - return receiver.eval(source, mesh, fields) + return receiver.eval(source, mesh, fields).flatten() row = delayed(evaluate_receiver, pure=True) rows = [] From 38f7c3b2482929119acb02476703a11076a73bc7 Mon Sep 17 00:00:00 2001 From: fourndo Date: Sat, 17 Dec 2022 10:55:43 -0800 Subject: [PATCH 24/25] Remove commented out code --- .../static/resistivity/simulation.py | 43 ------------------- 1 file changed, 43 deletions(-) diff --git a/SimPEG/dask/electromagnetics/static/resistivity/simulation.py b/SimPEG/dask/electromagnetics/static/resistivity/simulation.py index 5b55c62fd5..d4e1d1146c 100644 --- a/SimPEG/dask/electromagnetics/static/resistivity/simulation.py +++ b/SimPEG/dask/electromagnetics/static/resistivity/simulation.py @@ -194,49 +194,6 @@ def compute_J(self, f=None, Ainv=None): Sim.compute_J = compute_J -# # This could technically be handled by dask.simulation, but doesn't seem to register -# @dask.delayed -# def dask_dpred(self, m=None, f=None, compute_J=False): -# """ -# dpred(m, f=None) -# Create the projected data from a model. -# The fields, f, (if provided) will be used for the predicted data -# instead of recalculating the fields (which may be expensive!). -# -# .. math:: -# -# d_\\text{pred} = P(f(m)) -# -# Where P is a projection of the fields onto the data space. -# """ -# if self.survey is None: -# raise AttributeError( -# "The survey has not yet been set and is required to compute " -# "data. Please set the survey for the simulation: " -# "simulation.survey = survey" -# ) -# -# if f is None: -# if m is None: -# m = self.model -# f, Ainv = self.fields(m, return_Ainv=compute_J) -# -# data = Data(self.survey) -# for src in self.survey.source_list: -# for rx in src.receiver_list: -# data[src, rx] = rx.eval(src, self.mesh, f) -# -# if compute_J: -# self._Jmatrix = None -# Jmatrix = self.compute_J(f=f, Ainv=Ainv) -# return mkvc(data), Jmatrix -# -# return mkvc(data) - - - - - def dask_getSourceTerm(self): """ Evaluates the sources, and puts them in matrix form From 6942998f3520a21c350e9c9f2774e4274931b62d Mon Sep 17 00:00:00 2001 From: fourndo Date: Sat, 17 Dec 2022 11:26:05 -0800 Subject: [PATCH 25/25] remove delayed on 2d dpred --- .../dask/electromagnetics/static/resistivity/simulation_2d.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/SimPEG/dask/electromagnetics/static/resistivity/simulation_2d.py b/SimPEG/dask/electromagnetics/static/resistivity/simulation_2d.py index 701f25cc3b..e8568c948e 100644 --- a/SimPEG/dask/electromagnetics/static/resistivity/simulation_2d.py +++ b/SimPEG/dask/electromagnetics/static/resistivity/simulation_2d.py @@ -129,8 +129,6 @@ def compute_J(self, f=None, Ainv=None): Sim.compute_J = compute_J -# This could technically be handled by dask.simulation, but doesn't seem to register -@dask.delayed def dask_dpred(self, m=None, f=None, compute_J=False): """ dpred(m, f=None)