Skip to content

Commit

Permalink
forward solver works in serial and with source parallelisation
Browse files Browse the repository at this point in the history
  • Loading branch information
Ig-dolci committed Jan 3, 2024
1 parent 9c63d6b commit f18c01e
Show file tree
Hide file tree
Showing 18 changed files with 108 additions and 224 deletions.
68 changes: 52 additions & 16 deletions demos/with_automatic_differentiation/run_forward.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from firedrake import *
import spyro
from spyro.io import front_ensemble_run_forward
import matplotlib.pyplot as plt
import numpy as np

Expand All @@ -16,7 +17,10 @@
}

model["parallelism"] = {
"type": "spatial", # options: automatic (same number of cores for evey processor) or spatial
# options: shots_parallelism (same number of cores for every processor)
# or automatic (same number of cores for every processor) or
# or spatial.
"type": "none",
}

# Define the domain size without the ABL.
Expand Down Expand Up @@ -59,13 +63,16 @@
"fspool": 1, # how frequently to save solution to RAM
}

comm = spyro.utils.mpi_init(model)
mesh = UnitSquareMesh(100, 100) # to test FWI, mesh aligned with interface
comm, spatial_comm = spyro.utils.mpi_init(model)
if model["parallelism"]["type"] == "shots_parallelism":
# Only shots parallelism.
mesh = UnitSquareMesh(50, 50, comm=spatial_comm)
else:
mesh = UnitSquareMesh(50, 50)

element = spyro.domains.space.FE_method(
mesh, model["opts"]["method"], model["opts"]["degree"]
)

V = FunctionSpace(mesh, element)


Expand All @@ -89,19 +96,48 @@ def make_vp_circle(vp_guess=False, plot_vp=False):
tf=model["timeaxis"]["tf"],
freq=model["acquisition"]["frequency"],
)

# True acoustic velocity model
vp_exact = make_vp_circle(plot_vp=True)
if comm.comm.rank == 0:
for sn in range(len(model["acquisition"]["source_pos"])):
receiver_data = spyro.solvers.forward_AD(model, mesh, comm, vp_exact,
wavelet, debug=True,
source_number=sn)

# --- Plot the receiver data --- #
data = []
for _, rec in enumerate(receiver_data):
data.append(rec.dat.data_ro[:])

spyro.plots.plot_shots(model, comm, data, vmax=1e-08, vmin=-1e-08)
def run_forward(source_number):
"""Execute a acoustic wave equation.
Parameters
----------
source_number: `int`, optional
The source number defined by the user.
Notes
-----
The forward solver (`forward_AD`) is implemented in spyro using firedrake's
functions that can be annotated by the algorithimic differentiation (AD).
This is because spyro is capable of executing Full Waveform Inversion (FWI),
which needs the computation of the gradient of the objective function with
respect to the velocity model through (AD).
"""
receiver_data = spyro.solvers.forward_AD(model, mesh, comm, vp_exact,
wavelet, debug=True,
source_number=source_number)
# --- Plot the receiver data --- #
data = []
for _, rec in enumerate(receiver_data):
data.append(rec.dat.data_ro[:])

spyro.plots.plot_shots(model, comm, data, vmax=1e-08, vmin=-1e-08)


# Processor number.
rank = comm.ensemble_comm.rank
# Number of processors used in the simulation.
size = comm.ensemble_comm.size
if size == 1:
for sn in range(len(model["acquisition"]["source_pos"])):
run_forward(sn)
elif size == len(model["acquisition"]["source_pos"]):
# Only run the forward simulation for the source number that matches the
# processor number.
run_forward(rank)
else:
raise NotImplementedError("Only implemented for 1 processor")
raise NotImplementedError("`size` must be 1 or equal to `num_sources`."
"Different values are not supported yet.")
2 changes: 1 addition & 1 deletion demos/without_automatic_differentiation/run_forward.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"nspool": 100, # how frequently to output solution to pvds
"fspool": 99999, # how frequently to save solution to RAM
}
comm = spyro.utils.mpi_init(model)
comm, _ = spyro.utils.mpi_init(model)
mesh, V = spyro.io.read_mesh(model, comm)
vp = spyro.io.interpolate(model, mesh, V, guess=False)
sources = spyro.Sources(model, mesh, V, comm)
Expand Down
2 changes: 1 addition & 1 deletion demos/without_automatic_differentiation/run_fwi.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
"nspool": 1000, # how frequently to output solution to pvds
"fspool": 10, # how frequently to save solution to RAM
}
comm = spyro.utils.mpi_init(model)
comm, _ = spyro.utils.mpi_init(model)
# if comm.comm.rank == 0 and comm.ensemble_comm.rank == 0:
# fil = open("FUNCTIONAL_FWI_P5.txt", "w")
mesh, V = spyro.io.read_mesh(model, comm)
Expand Down
2 changes: 0 additions & 2 deletions spyro/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from .io import (
interpolate,
ensemble_forward,
ensemble_forward_ad,
ensemble_forward_elastic_waves,
ensemble_gradient,
ensemble_gradient_elastic_waves,
Expand All @@ -26,7 +25,6 @@
"read_mesh",
"interpolate",
"ensemble_forward",
"ensemble_forward_ad",
"ensemble_forward_elastic_waves",
"ensemble_gradient",
"ensemble_gradient_elastic_waves",
Expand Down
19 changes: 0 additions & 19 deletions spyro/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,25 +115,6 @@ def wrapper(*args, **kwargs):
return wrapper


def ensemble_forward_ad(func):
"""Decorator for forward_ad to distribute shots for ensemble parallelism"""

def wrapper(*args, **kwargs):
acq = args[0].get("acquisition")
num = len(acq["source_pos"])
fwi = kwargs.get("fwi")
_comm = args[2]
for snum in range(num):
if is_owner(_comm, snum):
if fwi:
u_r, J = func(*args, **dict(kwargs, source_num=snum))
return u_r, J
else:
u_r = func(*args, **dict(kwargs, source_num=snum))

return wrapper


def ensemble_forward_elastic_waves(func):
"""Decorator for forward elastic waves to distribute shots for ensemble parallelism"""

Expand Down
1 change: 0 additions & 1 deletion spyro/solvers/forward_AD.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
set_log_level(ERROR)


# @ensemble_forward
def forward(model, mesh, comm, c, wavelet, source_number=0, fwi=False, **kwargs):
"""Secord-order in time fully-explicit scheme.
Expand Down
47 changes: 40 additions & 7 deletions spyro/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,37 @@ def mysize(COMM=COMM_SELF):
return COMM.Get_size()


def mpi_init(model):
def mpi_init(model, spatial_core_parallelism=None):
"""Initialize computing environment"""
# rank = myrank()
# size = mysize()
available_cores = COMM_WORLD.size
if model["parallelism"]["type"] == "automatic":
num_cores_per_shot = available_cores / len(model["acquisition"]["source_pos"])
if available_cores % len(model["acquisition"]["source_pos"]) != 0:
if (
model["parallelism"]["type"] == "shots_parallelism"
or model["parallelism"]["type"] == "automatic"
):

num_cores_per_shot = COMM_WORLD.size / len(model["acquisition"]["source_pos"])
if COMM_WORLD.size % len(model["acquisition"]["source_pos"]) != 0:
raise ValueError(
"Available cores cannot be divided between sources equally."
)
elif model["parallelism"]["type"] == "spatial":
num_cores_per_shot = available_cores
# Parallellism is only over spatial domain. No shots parallelism.
num_cores_per_shot = COMM_WORLD.size
elif model["parallelism"]["type"] == "custom":
raise ValueError("Custom parallelism not yet implemented")
elif model["parallelism"]["type"] == "none":
num_cores_per_shot = 1

if model["parallelism"]["type"] == "shots_parallelism":
# Parrallellism is over shots. No spatial parallelism.
spatial_comm = COMM_WORLD.Split(
COMM_WORLD.rank % len(model["acquisition"]["source_pos"])
)
else:
spatial_comm = None
comm_ens = Ensemble(COMM_WORLD, num_cores_per_shot)
return comm_ens
return comm_ens, spatial_comm


def communicate(array, my_ensemble):
Expand Down Expand Up @@ -137,6 +150,26 @@ def communicate(array, my_ensemble):
return array_reduced


def communicate_receiver_vom(array, comm):
"""Gather all coordinates from all ranks.
Parameters
----------
array: numpy array
Array of data to gather across all ranks.
comm: Firedrake.comm
Firedrake ensemble communicator.
Returns
-------
array: numpy array
Array of data gathered across all ranks.
"""
array = array.copy()
array = comm.allgather(array)
array = np.concatenate(array)
return array

def analytical_solution_for_pressure_based_on_MMS(model, mesh, time):
degree = model["opts"]["degree"]
V = FunctionSpace(mesh, "CG", degree)
Expand Down
2 changes: 1 addition & 1 deletion test/not_a_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_gradient_talyor_remainder_v2():
from ROL.firedrake_vector import FiredrakeVector as FeVector
import ROL

comm = spyro.utils.mpi_init(model)
comm, _ = spyro.utils.mpi_init(model)

mesh, V = spyro.io.read_mesh(model, comm)

Expand Down
2 changes: 1 addition & 1 deletion test/test_MMS.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def run_solve(timestep_method, method, model, mesh, expr):
elif method == "KMV":
variant = "KMV"

comm = spyro.utils.mpi_init(testmodel)
comm, _ = spyro.utils.mpi_init(testmodel)

element = FiniteElement(method, mesh.ufl_cell(), degree=1, variant=variant)
V = FunctionSpace(mesh, element)
Expand Down
2 changes: 1 addition & 1 deletion test/test_gradient.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_gradient_pml():

def _test_gradient(options, pml=False):

comm = spyro.utils.mpi_init(options)
comm, _ = spyro.utils.mpi_init(options)

mesh, V = spyro.io.read_mesh(options, comm)

Expand Down
2 changes: 1 addition & 1 deletion test/test_gradient_3d.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_gradient_3d():

def _test_gradient(options, pml=False):

comm = spyro.utils.mpi_init(options)
comm, _ = spyro.utils.mpi_init(options)

mesh, V = spyro.io.read_mesh(options, comm)

Expand Down
14 changes: 7 additions & 7 deletions test/test_newat.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def triangle_area(p1, p2, p3):

def test_correct_receiver_location_generation2D():
"""Tests if receiver locations where generated correctly"""
comm = spyro.utils.mpi_init(model)
comm, _ = spyro.utils.mpi_init(model)
mesh, V = spyro.io.read_mesh(model, comm)

receivers = spyro.create_transect((-0.1, 0.3), (-0.1, 0.9), 3)
Expand All @@ -30,7 +30,7 @@ def test_correct_receiver_location_generation2D():

def test_correct_receiver_to_cell_location2D():
"""Tests if the receivers where located in the correct cell"""
comm = spyro.utils.mpi_init(model)
comm, _ = spyro.utils.mpi_init(model)
model["opts"]["degree"] = 3
mesh, V = spyro.io.read_mesh(model, comm)

Expand Down Expand Up @@ -89,7 +89,7 @@ def test_correct_receiver_to_cell_location2D():


def test_correct_at_value2D():
comm = spyro.utils.mpi_init(model)
comm, _ = spyro.utils.mpi_init(model)
model["opts"]["degree"] = 3
mesh, V = spyro.io.read_mesh(model, comm)
pz = -0.1
Expand Down Expand Up @@ -122,7 +122,7 @@ def test_correct_at_value2D():

def test_correct_at_value2D_quad():
model_quad = deepcopy(model)
comm = spyro.utils.mpi_init(model_quad)
comm, _ = spyro.utils.mpi_init(model_quad)
model_quad["opts"]["degree"] = 3
model_quad["opts"]["degree"] = 3
mesh, V = spyro.io.read_mesh(model_quad, comm)
Expand Down Expand Up @@ -174,7 +174,7 @@ def test_correct_receiver_location_generation3D():
"""Tests if receiver locations where generated correctly"""

test_model = deepcopy(model3D)
comm = spyro.utils.mpi_init(test_model)
comm, _ = spyro.utils.mpi_init(test_model)
mesh, V = spyro.io.read_mesh(test_model, comm)
test_model["acquisition"]["num_receivers"] = 3
receivers = spyro.create_transect((-0.05, 0.3, 0.5), (-0.05, 0.9, 0.5), 3)
Expand All @@ -189,7 +189,7 @@ def test_correct_receiver_to_cell_location3D():
"""Tests if the receivers where located in the correct cell"""

test_model1 = deepcopy(model3D)
comm = spyro.utils.mpi_init(test_model1)
comm, _ = spyro.utils.mpi_init(test_model1)
mesh, V = spyro.io.read_mesh(test_model1, comm)
rec = spyro.create_transect((-0.05, 0.1, 0.5), (-0.05, 0.9, 0.5), 3)
test_model1["acquisition"]["receiver_locations"] = rec
Expand Down Expand Up @@ -263,7 +263,7 @@ def test_correct_at_value3D():
test_model2 = deepcopy(model3D)
test_model2["acquisition"]["num_receivers"] = 3
test_model2["opts"]["degree"] = 3
comm = spyro.utils.mpi_init(test_model2)
comm, _ = spyro.utils.mpi_init(test_model2)
mesh, V = spyro.io.read_mesh(test_model2, comm)
x_start = 0.09153949331982138
x_end = 0.09153949331982138
Expand Down
2 changes: 1 addition & 1 deletion test/test_parallel_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
@pytest.mark.skip(reason="no way of currently testing this")
def test_parallel_source():

comm = spyro.utils.mpi_init(options)
comm, _ = spyro.utils.mpi_init(options)

mesh, V = spyro.io.read_mesh(options, comm)

Expand Down
2 changes: 1 addition & 1 deletion test/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def test_ricker_varies_in_time():
# modelRicker["opts"]["method"] = "CG"
# modelRicker["opts"]["degree"] = 2
# modelRicker["opts"]["dimension"] = 2
# comm = spyro.utils.mpi_init(modelRicker)
# comm, _ = spyro.utils.mpi_init(modelRicker)
# mesh = fire.UnitSquareMesh(10, 10)
# element = fire.FiniteElement("CG", mesh.ufl_cell(), 2, variant="equispaced")
# V = fire.FunctionSpace(mesh, element)
Expand Down
2 changes: 1 addition & 1 deletion test/test_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_mesh_generation_for_grid_calc():
model = spyro.tools.create_model_for_grid_point_calculation(
grid_point_calculator_parameters, degree_reference
)
comm = spyro.utils.mpi_init(model)
comm, _ = spyro.utils.mpi_init(model)
for G in Gs:
model["mesh"]["meshfile"] = "meshes/2Dhomogeneous" + str(G) + ".msh"
model = spyro.tools.generate_mesh(model, G, comm)
Expand Down
Empty file removed test_ad/__init__.py
Empty file.
Loading

0 comments on commit f18c01e

Please sign in to comment.