diff --git a/SimPEG/__init__.py b/SimPEG/__init__.py index 6973e41fcd..4f3aa70c5b 100644 --- a/SimPEG/__init__.py +++ b/SimPEG/__init__.py @@ -30,7 +30,8 @@ SolverBiCG, ) -__version__ = "0.15.1.dev7+geoapps.0.9.1" + +__version__ = "0.15.1.dev8+geoapps.0.10.0" __author__ = "SimPEG Team" __license__ = "MIT" __copyright__ = "2013 - 2020, SimPEG Team, http://simpeg.xyz" diff --git a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py index 558eed56ba..8f9b48595a 100644 --- a/SimPEG/dask/electromagnetics/frequency_domain/simulation.py +++ b/SimPEG/dask/electromagnetics/frequency_domain/simulation.py @@ -1,11 +1,11 @@ from ....electromagnetics.frequency_domain.simulation import BaseFDEMSimulation as Sim -from ....utils import Zero, mkvc +from ....utils import Zero import numpy as np import scipy.sparse as sp -import dask.array as da + +from dask import array, compute, delayed from dask.distributed import Future import zarr -from time import time Sim.sensitivity_path = './sensitivity/' Sim.gtgdiag = None @@ -22,10 +22,6 @@ def fields(self, m=None, return_Ainv=False): for freq in self.survey.frequencies: A = self.getA(freq) rhs = self.getRHS(freq) - - if return_Ainv: - Ainv += [self.solver(sp.csr_matrix(A.T), **self.solver_opts)] - Ainv_solve = self.solver(sp.csr_matrix(A), **self.solver_opts) u = Ainv_solve * rhs Srcs = self.survey.get_sources_by_frequency(freq) @@ -33,6 +29,10 @@ def fields(self, m=None, return_Ainv=False): Ainv_solve.clean() + if return_Ainv: + Ainv += [self.solver(sp.csr_matrix(A.T), **self.solver_opts)] + + if return_Ainv: return f, Ainv else: @@ -56,9 +56,9 @@ def dask_getJtJdiag(self, m, W=None): else: W = W.diagonal() - diag = da.einsum('i,ij,ij->j', W, self.Jmatrix, self.Jmatrix) + diag = array.einsum('i,ij,ij->j', W, self.Jmatrix, self.Jmatrix) - if isinstance(diag, da.Array): + if isinstance(diag, array.Array): diag = np.asarray(diag.compute()) self.gtgdiag = diag @@ -80,7 +80,7 @@ def dask_Jvec(self, m, v): if isinstance(self.Jmatrix, Future): self.Jmatrix # Wait to finish - return da.dot(self.Jmatrix, v) + return array.dot(self.Jmatrix, v) Sim.Jvec = dask_Jvec @@ -98,7 +98,7 @@ def dask_Jtvec(self, m, v): if isinstance(self.Jmatrix, Future): self.Jmatrix # Wait to finish - return da.dot(v, self.Jmatrix) + return array.dot(v, self.Jmatrix) Sim.Jtvec = dask_Jtvec @@ -113,6 +113,7 @@ def compute_J(self, f=None, Ainv=None): row_chunks = int(np.ceil( float(self.survey.nD) / np.ceil(float(m_size) * self.survey.nD * 8. * 1e-6 / self.max_chunk_size) )) + if self.store_sensitivities == "disk": Jmatrix = zarr.open( self.sensitivity_path + f"J.zarr", @@ -123,87 +124,150 @@ def compute_J(self, f=None, Ainv=None): else: Jmatrix = np.zeros((self.survey.nD, m_size), dtype=np.float32) - def eval_store_block(A, freq, df_duT, df_dmT, u_src, src, row_count): - """ - Evaluate the sensitivities for the block or data and store to zarr - """ - df_duT = np.hstack(df_duT) - ATinvdf_duT = (A * df_duT).reshape((dfduT.shape[0], -1)) - dA_dmT = self.getADeriv(freq, u_src, ATinvdf_duT, adjoint=True) - dRHS_dmT = self.getRHSDeriv(freq, src, ATinvdf_duT, adjoint=True) - du_dmT = -dA_dmT - if not isinstance(dRHS_dmT, Zero): - du_dmT += dRHS_dmT - if not isinstance(df_dmT[0], Zero): - du_dmT += np.hstack(df_dmT) - - block = np.array(du_dmT, dtype=complex).real.T - - if self.store_sensitivities == "disk": - Jmatrix.set_orthogonal_selection( - (np.arange(row_count, row_count + block.shape[0]), slice(None)), - block.astype(np.float32) - ) - else: - Jmatrix[row_count: row_count + block.shape[0], :] = ( - block.astype(np.float32) - ) - - row_count += block.shape[0] - return row_count - - blocks = [] count = 0 block_count = 0 + for A_i, freq in zip(Ainv, self.survey.frequencies): - for src in self.survey.get_sources_by_frequency(freq): + for ss, src in enumerate(self.survey.get_sources_by_frequency(freq)): df_duT, df_dmT = [], [] + blocks_dfduT = [] + blocks_dfdmT = [] u_src = f[src, self._solutionType] + col_chunks = int(np.ceil( + float(self.survey.nD) / np.ceil(float(u_src.shape[0]) * self.survey.nD * 8. * 1e-6 / self.max_chunk_size) + )) + for rx in src.receiver_list: v = np.eye(rx.nD, dtype=float) - n_blocs = np.ceil(2 * rx.nD / row_chunks) + n_blocs = np.ceil(2 * rx.nD / col_chunks * self.n_cpu) for block in np.array_split(v, n_blocs, axis=1): - dfduT, dfdmT = rx.evalDeriv( - src, self.mesh, f, v=block, adjoint=True + block_count += block.shape[1] * 2 + blocks_dfduT.append( + array.from_delayed( + delayed(dfduT, pure=True)(src, rx, self.mesh, f, block), + dtype=np.float32, + shape=(u_src.shape[0], block.shape[1]*2) + ) + ) + blocks_dfdmT.append( + delayed(dfdmT, pure=True)(src, rx, self.mesh, f, block), ) - df_duT += [dfduT] - df_dmT += [dfdmT] - block_count += dfduT.shape[1] + if block_count >= (col_chunks * self.n_cpu): - if block_count >= row_chunks: - count = eval_store_block(A_i, freq, df_duT, df_dmT, u_src, src, count) - df_duT, df_dmT = [], [] + count = parallel_block_compute(self, A_i, Jmatrix, freq, u_src, src, blocks_dfduT, blocks_dfdmT, count, self.n_cpu, m_size) + blocks_dfduT = [] + blocks_dfdmT = [] block_count = 0 - # blocks, count = store_block(blocks, count) - if df_duT: - count = eval_store_block(A_i, freq, df_duT, df_dmT, u_src, src, count) + if blocks_dfduT: + count = parallel_block_compute( + self, A_i, Jmatrix, freq, u_src, src, blocks_dfduT, blocks_dfdmT, count, self.n_cpu, m_size) block_count = 0 - if len(blocks) != 0: - if self.store_sensitivities == "disk": - Jmatrix.set_orthogonal_selection( - (np.arange(count, self.survey.nD), slice(None)), - blocks.astype(np.float32) - ) - else: - Jmatrix[count: self.survey.nD, :] = ( - blocks.astype(np.float32) - ) - for A in Ainv: A.clean() if self.store_sensitivities == "disk": del Jmatrix - return da.from_zarr(self.sensitivity_path + f"J.zarr") + return array.from_zarr(self.sensitivity_path + f"J.zarr") else: return Jmatrix Sim.compute_J = compute_J + + +def dfduT(source, receiver, mesh, fields, block): + dfduT, _ = receiver.evalDeriv( + source, mesh, fields, v=block, adjoint=True + ) + + return dfduT + + +def dfdmT(source, receiver, mesh, fields, block): + _, dfdmT = receiver.evalDeriv( + source, mesh, fields, v=block, adjoint=True + ) + + return dfdmT + + +def eval_block(simulation, Ainv_deriv_u, frequency, deriv_m, fields, source): + """ + Evaluate the sensitivities for the block or data and store to zarr + """ + dA_dmT = simulation.getADeriv(frequency, fields, Ainv_deriv_u, adjoint=True) + dRHS_dmT = simulation.getRHSDeriv(frequency, source, Ainv_deriv_u, adjoint=True) + du_dmT = -dA_dmT + if not isinstance(dRHS_dmT, Zero): + du_dmT += dRHS_dmT + if not isinstance(deriv_m, Zero): + du_dmT += deriv_m + + return np.array(du_dmT, dtype=complex).real.T + + +def parallel_block_compute(simulation, A_i, Jmatrix, freq, u_src, src, blocks_deriv_u, blocks_deriv_m, counter, sub_threads, m_size): + + field_derivs = array.hstack(blocks_deriv_u).compute() + + # Direct-solver call + + ATinvdf_duT = A_i * field_derivs + + # Even split + + split = np.linspace(0, (ATinvdf_duT.shape[1]) / 2, sub_threads)[1:-1].astype(int) * 2 + sub_blocks_deriv_u = np.array_split(ATinvdf_duT, split, axis=1) + + if isinstance(compute(blocks_deriv_m[0])[0], Zero): + sub_blocks_dfdmt = [Zero()] * len(sub_blocks_deriv_u) + else: + compute_blocks_deriv_m = array.hstack([ + array.from_delayed( + dfdmT_block, + dtype=np.float32, + shape=(u_src.shape[0], dfdmT_block.shape[1] * 2)) + for dfdmT_block in blocks_deriv_m + ]).compute() + sub_blocks_dfdmt = np.array_split(compute_blocks_deriv_m, split, axis=1) + + sub_process = [] + + for sub_block_dfduT, sub_block_dfdmT in zip(sub_blocks_deriv_u, sub_blocks_dfdmt): + row_size = int(sub_block_dfduT.shape[1] / 2) + sub_process.append( + array.from_delayed( + delayed(eval_block, pure=True)( + simulation, + sub_block_dfduT, + freq, + sub_block_dfdmT, + u_src, + src + ), + dtype=np.float32, + shape=(row_size, m_size) + ) + ) + + block = array.vstack(sub_process).compute() + + if simulation.store_sensitivities == "disk": + Jmatrix.set_orthogonal_selection( + (np.arange(counter, counter + block.shape[0]), slice(None)), + block.astype(np.float32) + ) + else: + Jmatrix[counter: counter + block.shape[0], :] = ( + block.astype(np.float32) + ) + + counter += block.shape[0] + return counter \ No newline at end of file diff --git a/SimPEG/dask/electromagnetics/static/resistivity/receivers.py b/SimPEG/dask/electromagnetics/static/resistivity/receivers.py index 8e4c04f68e..dc02272428 100644 --- a/SimPEG/dask/electromagnetics/static/resistivity/receivers.py +++ b/SimPEG/dask/electromagnetics/static/resistivity/receivers.py @@ -43,4 +43,4 @@ def dask_Pole_getP(self, mesh, Gloc): return P -Pole.getP = dask_Dipole_getP \ No newline at end of file +Pole.getP = dask_Pole_getP \ No newline at end of file diff --git a/SimPEG/dask/electromagnetics/static/resistivity/simulation.py b/SimPEG/dask/electromagnetics/static/resistivity/simulation.py index 458ab47230..d4e1d1146c 100644 --- a/SimPEG/dask/electromagnetics/static/resistivity/simulation.py +++ b/SimPEG/dask/electromagnetics/static/resistivity/simulation.py @@ -1,22 +1,21 @@ import scipy.sparse as sp +from SimPEG.dask.simulation import dask_dpred from .....electromagnetics.static.resistivity.simulation import BaseDCSimulation as Sim -from .....utils import Zero, mkvc -from .....data import Data -from ....utils import compute_chunk_sizes -import dask +from .....utils import Zero import dask.array as da from dask.distributed import Future import numpy as np import zarr -import os -import shutil + import numcodecs numcodecs.blosc.use_threads = False Sim.sensitivity_path = './sensitivity/' +Sim.dpred = dask_dpred + def dask_fields(self, m=None, return_Ainv=False): if m is not None: @@ -195,49 +194,6 @@ def compute_J(self, f=None, Ainv=None): Sim.compute_J = compute_J -# This could technically be handled by dask.simulation, but doesn't seem to register -@dask.delayed -def dask_dpred(self, m=None, f=None, compute_J=False): - """ - dpred(m, f=None) - Create the projected data from a model. - The fields, f, (if provided) will be used for the predicted data - instead of recalculating the fields (which may be expensive!). - - .. math:: - - d_\\text{pred} = P(f(m)) - - Where P is a projection of the fields onto the data space. - """ - if self.survey is None: - raise AttributeError( - "The survey has not yet been set and is required to compute " - "data. Please set the survey for the simulation: " - "simulation.survey = survey" - ) - - if f is None: - if m is None: - m = self.model - f, Ainv = self.fields(m, return_Ainv=compute_J) - - data = Data(self.survey) - for src in self.survey.source_list: - for rx in src.receiver_list: - data[src, rx] = rx.eval(src, self.mesh, f) - - if compute_J: - self._Jmatrix = None - Jmatrix = self.compute_J(f=f, Ainv=Ainv) - return mkvc(data), Jmatrix - - return mkvc(data) - - -Sim.dpred = dask_dpred - - def dask_getSourceTerm(self): """ Evaluates the sources, and puts them in matrix form diff --git a/SimPEG/dask/electromagnetics/static/resistivity/simulation_2d.py b/SimPEG/dask/electromagnetics/static/resistivity/simulation_2d.py index 701f25cc3b..e8568c948e 100644 --- a/SimPEG/dask/electromagnetics/static/resistivity/simulation_2d.py +++ b/SimPEG/dask/electromagnetics/static/resistivity/simulation_2d.py @@ -129,8 +129,6 @@ def compute_J(self, f=None, Ainv=None): Sim.compute_J = compute_J -# This could technically be handled by dask.simulation, but doesn't seem to register -@dask.delayed def dask_dpred(self, m=None, f=None, compute_J=False): """ dpred(m, f=None) diff --git a/SimPEG/dask/inverse_problem.py b/SimPEG/dask/inverse_problem.py index 83800fd84a..990cb0ce61 100644 --- a/SimPEG/dask/inverse_problem.py +++ b/SimPEG/dask/inverse_problem.py @@ -126,15 +126,10 @@ def get_dpred(self, m, f=None, compute_J=False): else: # For locals, the future is now ct = time() - if objfct.simulation.verbose and (compute_sensitivities or objfct.simulation.store_sensitivities == "forward_only"): - with ProgressBar(): - future = da.compute(objfct.simulation.dpred( - vec, compute_J=compute_sensitivities - ))[0] - else: - future = da.compute(objfct.simulation.dpred( - vec, compute_J=compute_sensitivities - ))[0] + + future = objfct.simulation.dpred( + vec, compute_J=compute_sensitivities + ) if compute_sensitivities: runtime = time() - ct diff --git a/SimPEG/dask/simulation.py b/SimPEG/dask/simulation.py index 88beb028d9..f94e44d5f5 100644 --- a/SimPEG/dask/simulation.py +++ b/SimPEG/dask/simulation.py @@ -1,13 +1,11 @@ from ..simulation import BaseSimulation as Sim -from dask.distributed import get_client, Future, Client +from dask.distributed import get_client, Future from dask import array, delayed -from dask.delayed import Delayed +import multiprocessing import warnings from ..data import SyntheticData import numpy as np from .utils import compute -from ..utils import mkvc -from ..data import Data Sim._max_ram = 16 @@ -45,6 +43,24 @@ def max_chunk_size(self, other): Sim.max_chunk_size = max_chunk_size +@property +def n_cpu(self): + """Number of cpu's available.""" + if getattr(self, "_n_cpu", None) is None: + self._n_cpu = int(multiprocessing.cpu_count()) + return self._n_cpu + + +@n_cpu.setter +def n_cpu(self, other): + if other <= 0: + raise ValueError("n_cpu must be greater than 0") + self._n_cpu = other + + +Sim.n_cpu = n_cpu + + def make_synthetic_data( self, m, relative_error=0.05, noise_floor=0.0, f=None, add_noise=False, **kwargs ): @@ -170,7 +186,6 @@ def Jmatrix(self): Sim.Jmatrix = Jmatrix -@delayed def dask_dpred(self, m=None, f=None, compute_J=False): """ dpred(m, f=None) @@ -196,16 +211,26 @@ def dask_dpred(self, m=None, f=None, compute_J=False): m = self.model f, Ainv = self.fields(m, return_Ainv=compute_J) - data = Data(self.survey) + def evaluate_receiver(source, receiver, mesh, fields): + return receiver.eval(source, mesh, fields).flatten() + + row = delayed(evaluate_receiver, pure=True) + rows = [] for src in self.survey.source_list: for rx in src.receiver_list: - data[src, rx] = rx.eval(src, self.mesh, f) + rows.append(array.from_delayed( + row(src, rx, self.mesh, f), + dtype=np.float32, + shape=(rx.nD,), + )) + + data = array.hstack(rows).compute() - if compute_J: + if compute_J and self._Jmatrix is None: Jmatrix = self.compute_J(f=f, Ainv=Ainv) - return (mkvc(data), Jmatrix) + return data, Jmatrix - return mkvc(data) + return data Sim.dpred = dask_dpred diff --git a/SimPEG/inversion.py b/SimPEG/inversion.py index 8b4cb023e6..fb50f7d3cb 100644 --- a/SimPEG/inversion.py +++ b/SimPEG/inversion.py @@ -61,6 +61,7 @@ def run(self, m0): """ self.invProb.startup(m0) + print("Starting message") self.directiveList.call("initialize") self.m = self.opt.minimize(self.invProb.evalFunction, self.invProb.model) self.directiveList.call("finish") diff --git a/docs/conf.py b/docs/conf.py index 2a1643c424..ed033a0318 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -72,7 +72,7 @@ # The short X.Y version. version = "0.15.1" # The full version, including alpha/beta/rc tags. -release = "0.15.1.dev7+geoapps.0.9.1" +release = "0.15.1.dev8+geoapps.0.10.0" # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages.