Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
21fb2db
Fix pole-pole behaviour
benk-mira Nov 16, 2022
31d4c3f
Better handling of alphas in stashed reg operators
fourndo Nov 30, 2022
e6493f1
Simplify computation or reg2Deriv
fourndo Nov 30, 2022
5b2531c
Parallelize loop over receiver evals
fourndo Dec 1, 2022
3086e42
Increase number of sub-block computed
fourndo Dec 3, 2022
0641afa
Add printing
fourndo Dec 3, 2022
da18e87
Bump version
fourndo Dec 3, 2022
1d3aefa
Debug message
fourndo Dec 3, 2022
eeb79e5
Fix test string
fourndo Dec 3, 2022
d597cde
More prints
fourndo Dec 3, 2022
7418eaa
more prints
fourndo Dec 3, 2022
34f5aa9
Try parallelizing more derivs
fourndo Dec 3, 2022
59f8f58
More parallelization
fourndo Dec 3, 2022
d23569f
Fix size problems, change testing messages
fourndo Dec 4, 2022
9807cb7
Re-work of methods to reduce spin up time
fourndo Dec 4, 2022
926c381
Remove prints
fourndo Dec 5, 2022
f332694
Merge pull request #30 from MiraGeoscience/GEOPY-696
domfournier Dec 5, 2022
d804dc1
Merge pull request #31 from MiraGeoscience/release/geoapps-0.9.1
domfournier Dec 5, 2022
b59bd9f
Try fixing tests with single subprocess
fourndo Dec 12, 2022
24ce190
add n_cpu property with default to cpu_count
fourndo Dec 12, 2022
5533d33
Fix bug in total gradient with spherical
fourndo Dec 13, 2022
f2cd41b
Bump version
fourndo Dec 13, 2022
5aa2cbf
Temp removal of delay on pred
fourndo Dec 13, 2022
880295c
Revert delayed dpred
fourndo Dec 13, 2022
3d5a6ad
REmove delay on dask.simulation dpred. Move dpred from dcr to base.
fourndo Dec 17, 2022
38f7c3b
Remove commented out code
fourndo Dec 17, 2022
6942998
remove delayed on 2d dpred
fourndo Dec 17, 2022
84a51e7
Merge branch 'release/v0.15.1.dev8+geoapps.0.10.0' into GEOPY-715
domfournier Jan 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion SimPEG/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
SolverBiCG,
)

__version__ = "0.15.1.dev7+geoapps.0.9.1"

__version__ = "0.15.1.dev8+geoapps.0.10.0"
__author__ = "SimPEG Team"
__license__ = "MIT"
__copyright__ = "2013 - 2020, SimPEG Team, http://simpeg.xyz"
196 changes: 130 additions & 66 deletions SimPEG/dask/electromagnetics/frequency_domain/simulation.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from ....electromagnetics.frequency_domain.simulation import BaseFDEMSimulation as Sim
from ....utils import Zero, mkvc
from ....utils import Zero
import numpy as np
import scipy.sparse as sp
import dask.array as da

from dask import array, compute, delayed
from dask.distributed import Future
import zarr
from time import time

Sim.sensitivity_path = './sensitivity/'
Sim.gtgdiag = None
Expand All @@ -22,17 +22,17 @@ def fields(self, m=None, return_Ainv=False):
for freq in self.survey.frequencies:
A = self.getA(freq)
rhs = self.getRHS(freq)

if return_Ainv:
Ainv += [self.solver(sp.csr_matrix(A.T), **self.solver_opts)]

Ainv_solve = self.solver(sp.csr_matrix(A), **self.solver_opts)
u = Ainv_solve * rhs
Srcs = self.survey.get_sources_by_frequency(freq)
f[Srcs, self._solutionType] = u

Ainv_solve.clean()

if return_Ainv:
Ainv += [self.solver(sp.csr_matrix(A.T), **self.solver_opts)]


if return_Ainv:
return f, Ainv
else:
Expand All @@ -56,9 +56,9 @@ def dask_getJtJdiag(self, m, W=None):
else:
W = W.diagonal()

diag = da.einsum('i,ij,ij->j', W, self.Jmatrix, self.Jmatrix)
diag = array.einsum('i,ij,ij->j', W, self.Jmatrix, self.Jmatrix)

if isinstance(diag, da.Array):
if isinstance(diag, array.Array):
diag = np.asarray(diag.compute())

self.gtgdiag = diag
Expand All @@ -80,7 +80,7 @@ def dask_Jvec(self, m, v):
if isinstance(self.Jmatrix, Future):
self.Jmatrix # Wait to finish

return da.dot(self.Jmatrix, v)
return array.dot(self.Jmatrix, v)


Sim.Jvec = dask_Jvec
Expand All @@ -98,7 +98,7 @@ def dask_Jtvec(self, m, v):
if isinstance(self.Jmatrix, Future):
self.Jmatrix # Wait to finish

return da.dot(v, self.Jmatrix)
return array.dot(v, self.Jmatrix)


Sim.Jtvec = dask_Jtvec
Expand All @@ -113,6 +113,7 @@ def compute_J(self, f=None, Ainv=None):
row_chunks = int(np.ceil(
float(self.survey.nD) / np.ceil(float(m_size) * self.survey.nD * 8. * 1e-6 / self.max_chunk_size)
))

if self.store_sensitivities == "disk":
Jmatrix = zarr.open(
self.sensitivity_path + f"J.zarr",
Expand All @@ -123,87 +124,150 @@ def compute_J(self, f=None, Ainv=None):
else:
Jmatrix = np.zeros((self.survey.nD, m_size), dtype=np.float32)

def eval_store_block(A, freq, df_duT, df_dmT, u_src, src, row_count):
"""
Evaluate the sensitivities for the block or data and store to zarr
"""
df_duT = np.hstack(df_duT)
ATinvdf_duT = (A * df_duT).reshape((dfduT.shape[0], -1))
dA_dmT = self.getADeriv(freq, u_src, ATinvdf_duT, adjoint=True)
dRHS_dmT = self.getRHSDeriv(freq, src, ATinvdf_duT, adjoint=True)
du_dmT = -dA_dmT
if not isinstance(dRHS_dmT, Zero):
du_dmT += dRHS_dmT
if not isinstance(df_dmT[0], Zero):
du_dmT += np.hstack(df_dmT)

block = np.array(du_dmT, dtype=complex).real.T

if self.store_sensitivities == "disk":
Jmatrix.set_orthogonal_selection(
(np.arange(row_count, row_count + block.shape[0]), slice(None)),
block.astype(np.float32)
)
else:
Jmatrix[row_count: row_count + block.shape[0], :] = (
block.astype(np.float32)
)

row_count += block.shape[0]
return row_count

blocks = []
count = 0
block_count = 0

for A_i, freq in zip(Ainv, self.survey.frequencies):

for src in self.survey.get_sources_by_frequency(freq):
for ss, src in enumerate(self.survey.get_sources_by_frequency(freq)):
df_duT, df_dmT = [], []
blocks_dfduT = []
blocks_dfdmT = []
u_src = f[src, self._solutionType]

col_chunks = int(np.ceil(
float(self.survey.nD) / np.ceil(float(u_src.shape[0]) * self.survey.nD * 8. * 1e-6 / self.max_chunk_size)
))

for rx in src.receiver_list:
v = np.eye(rx.nD, dtype=float)
n_blocs = np.ceil(2 * rx.nD / row_chunks)
n_blocs = np.ceil(2 * rx.nD / col_chunks * self.n_cpu)

for block in np.array_split(v, n_blocs, axis=1):

dfduT, dfdmT = rx.evalDeriv(
src, self.mesh, f, v=block, adjoint=True
block_count += block.shape[1] * 2
blocks_dfduT.append(
array.from_delayed(
delayed(dfduT, pure=True)(src, rx, self.mesh, f, block),
dtype=np.float32,
shape=(u_src.shape[0], block.shape[1]*2)
)
)
blocks_dfdmT.append(
delayed(dfdmT, pure=True)(src, rx, self.mesh, f, block),
)
df_duT += [dfduT]
df_dmT += [dfdmT]

block_count += dfduT.shape[1]
if block_count >= (col_chunks * self.n_cpu):

if block_count >= row_chunks:
count = eval_store_block(A_i, freq, df_duT, df_dmT, u_src, src, count)
df_duT, df_dmT = [], []
count = parallel_block_compute(self, A_i, Jmatrix, freq, u_src, src, blocks_dfduT, blocks_dfdmT, count, self.n_cpu, m_size)
blocks_dfduT = []
blocks_dfdmT = []
block_count = 0
# blocks, count = store_block(blocks, count)

if df_duT:
count = eval_store_block(A_i, freq, df_duT, df_dmT, u_src, src, count)
if blocks_dfduT:
count = parallel_block_compute(
self, A_i, Jmatrix, freq, u_src, src, blocks_dfduT, blocks_dfdmT, count, self.n_cpu, m_size)
block_count = 0

if len(blocks) != 0:
if self.store_sensitivities == "disk":
Jmatrix.set_orthogonal_selection(
(np.arange(count, self.survey.nD), slice(None)),
blocks.astype(np.float32)
)
else:
Jmatrix[count: self.survey.nD, :] = (
blocks.astype(np.float32)
)

for A in Ainv:
A.clean()

if self.store_sensitivities == "disk":
del Jmatrix
return da.from_zarr(self.sensitivity_path + f"J.zarr")
return array.from_zarr(self.sensitivity_path + f"J.zarr")
else:
return Jmatrix


Sim.compute_J = compute_J


def dfduT(source, receiver, mesh, fields, block):
dfduT, _ = receiver.evalDeriv(
source, mesh, fields, v=block, adjoint=True
)

return dfduT


def dfdmT(source, receiver, mesh, fields, block):
_, dfdmT = receiver.evalDeriv(
source, mesh, fields, v=block, adjoint=True
)

return dfdmT


def eval_block(simulation, Ainv_deriv_u, frequency, deriv_m, fields, source):
"""
Evaluate the sensitivities for the block or data and store to zarr
"""
dA_dmT = simulation.getADeriv(frequency, fields, Ainv_deriv_u, adjoint=True)
dRHS_dmT = simulation.getRHSDeriv(frequency, source, Ainv_deriv_u, adjoint=True)
du_dmT = -dA_dmT
if not isinstance(dRHS_dmT, Zero):
du_dmT += dRHS_dmT
if not isinstance(deriv_m, Zero):
du_dmT += deriv_m

return np.array(du_dmT, dtype=complex).real.T


def parallel_block_compute(simulation, A_i, Jmatrix, freq, u_src, src, blocks_deriv_u, blocks_deriv_m, counter, sub_threads, m_size):

field_derivs = array.hstack(blocks_deriv_u).compute()

# Direct-solver call

ATinvdf_duT = A_i * field_derivs

# Even split

split = np.linspace(0, (ATinvdf_duT.shape[1]) / 2, sub_threads)[1:-1].astype(int) * 2
sub_blocks_deriv_u = np.array_split(ATinvdf_duT, split, axis=1)

if isinstance(compute(blocks_deriv_m[0])[0], Zero):
sub_blocks_dfdmt = [Zero()] * len(sub_blocks_deriv_u)
else:
compute_blocks_deriv_m = array.hstack([
array.from_delayed(
dfdmT_block,
dtype=np.float32,
shape=(u_src.shape[0], dfdmT_block.shape[1] * 2))
for dfdmT_block in blocks_deriv_m
]).compute()
sub_blocks_dfdmt = np.array_split(compute_blocks_deriv_m, split, axis=1)

sub_process = []

for sub_block_dfduT, sub_block_dfdmT in zip(sub_blocks_deriv_u, sub_blocks_dfdmt):
row_size = int(sub_block_dfduT.shape[1] / 2)
sub_process.append(
array.from_delayed(
delayed(eval_block, pure=True)(
simulation,
sub_block_dfduT,
freq,
sub_block_dfdmT,
u_src,
src
),
dtype=np.float32,
shape=(row_size, m_size)
)
)

block = array.vstack(sub_process).compute()

if simulation.store_sensitivities == "disk":
Jmatrix.set_orthogonal_selection(
(np.arange(counter, counter + block.shape[0]), slice(None)),
block.astype(np.float32)
)
else:
Jmatrix[counter: counter + block.shape[0], :] = (
block.astype(np.float32)
)

counter += block.shape[0]
return counter
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ def dask_Pole_getP(self, mesh, Gloc):
return P


Pole.getP = dask_Dipole_getP
Pole.getP = dask_Pole_getP
54 changes: 5 additions & 49 deletions SimPEG/dask/electromagnetics/static/resistivity/simulation.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import scipy.sparse as sp

from SimPEG.dask.simulation import dask_dpred
from .....electromagnetics.static.resistivity.simulation import BaseDCSimulation as Sim
from .....utils import Zero, mkvc
from .....data import Data
from ....utils import compute_chunk_sizes
import dask
from .....utils import Zero
import dask.array as da
from dask.distributed import Future
import numpy as np
import zarr
import os
import shutil

import numcodecs

numcodecs.blosc.use_threads = False

Sim.sensitivity_path = './sensitivity/'

Sim.dpred = dask_dpred


def dask_fields(self, m=None, return_Ainv=False):
if m is not None:
Expand Down Expand Up @@ -195,49 +194,6 @@ def compute_J(self, f=None, Ainv=None):
Sim.compute_J = compute_J


# This could technically be handled by dask.simulation, but doesn't seem to register
@dask.delayed
def dask_dpred(self, m=None, f=None, compute_J=False):
"""
dpred(m, f=None)
Create the projected data from a model.
The fields, f, (if provided) will be used for the predicted data
instead of recalculating the fields (which may be expensive!).

.. math::

d_\\text{pred} = P(f(m))

Where P is a projection of the fields onto the data space.
"""
if self.survey is None:
raise AttributeError(
"The survey has not yet been set and is required to compute "
"data. Please set the survey for the simulation: "
"simulation.survey = survey"
)

if f is None:
if m is None:
m = self.model
f, Ainv = self.fields(m, return_Ainv=compute_J)

data = Data(self.survey)
for src in self.survey.source_list:
for rx in src.receiver_list:
data[src, rx] = rx.eval(src, self.mesh, f)

if compute_J:
self._Jmatrix = None
Jmatrix = self.compute_J(f=f, Ainv=Ainv)
return mkvc(data), Jmatrix

return mkvc(data)


Sim.dpred = dask_dpred


def dask_getSourceTerm(self):
"""
Evaluates the sources, and puts them in matrix form
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ def compute_J(self, f=None, Ainv=None):
Sim.compute_J = compute_J


# This could technically be handled by dask.simulation, but doesn't seem to register
@dask.delayed
def dask_dpred(self, m=None, f=None, compute_J=False):
"""
dpred(m, f=None)
Expand Down
Loading