Skip to content

Commit

Permalink
variable name change
Browse files Browse the repository at this point in the history
  • Loading branch information
Olender committed Jul 1, 2024
1 parent ba3da8d commit 8c3ffca
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 35 deletions.
28 changes: 14 additions & 14 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
// {
// "name": "Python Attach 0",
// "type": "python",
// "request": "attach",
// "port": 3000,
// "host": "localhost",
// },
// {
// "name": "Python Attach 1",
// "type": "python",
// "request": "attach",
// "port": 3001,
// "host": "localhost"
// },
{
"name": "Python Attach 0",
"type": "python",
"request": "attach",
"port": 3000,
"host": "localhost",
},
{
"name": "Python Attach 1",
"type": "python",
"request": "attach",
"port": 3001,
"host": "localhost"
},
{
"name": "Python Debugger: Current File",
"type": "debugpy",
Expand Down
11 changes: 6 additions & 5 deletions spyro/io/basicio.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ def wrapper(*args, **kwargs):
u, u_r = func(*args, **dict(kwargs, source_nums=[snum]))
return u, u_r
elif args[0].parallelism_type == "custom":
shots_per_core_list = args[0].shots_per_core
shot_ids_per_propagation_list = args[0].shot_ids_per_propagation
_comm = args[0].comm
for id_shots, shots_in_core in enumerate(shots_per_core_list):
if is_owner(_comm, id_shots):
u, u_r = func(*args, **dict(kwargs, source_nums=shots_in_core))
for shot_ids_in_propagation in shot_ids_per_propagation_list:
if is_owner(_comm, shot_ids_in_propagation):
u, u_r = func(*args, **dict(kwargs, source_nums=shot_ids_in_propagation))
return u, u_r

return wrapper
Expand Down Expand Up @@ -313,7 +313,8 @@ def is_owner(ens_comm, rank):
`True` if `rank` owns this shot
"""
return ens_comm.ensemble_comm.rank == (rank % ens_comm.ensemble_comm.size)
owner = ens_comm.ensemble_comm.rank == (rank % ens_comm.ensemble_comm.size)
return owner


def _check_units(c):
Expand Down
11 changes: 8 additions & 3 deletions spyro/io/model_parameters.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import numpy as np
from mpi4py import MPI
import warnings
from .. import io
from .. import utils
Expand Down Expand Up @@ -542,11 +543,15 @@ def _sanitize_comm(self, comm):
else:
warnings.warn("No paralellism type listed. Assuming automatic")
self.parallelism_type = "automatic"

if self.parallelism_type == "custom":
self.shots_per_core = dictionary["parallelism"]["shots_per_core"]
self.shot_ids_per_propagation = dictionary["parallelism"]["shot_ids_per_propagation"]
else:
self.shots_per_core = 1
shot_ids_per_propagation = []
available_cores = COMM_WORLD.size
num_cores_per_propagation = available_cores / self.number_of_sources
for shot in range(self.number_of_sources):


if comm is None:
self.comm = utils.mpi_init(self)
Expand Down
1 change: 1 addition & 0 deletions spyro/solvers/acoustic_wave.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def forward_solve(self):
self.c = self.initial_velocity_model
self.matrix_building()
self.wave_propagator()
self.comm.comm.barrier()

def force_rebuild_function_space(self):
if self.mesh is None:
Expand Down
12 changes: 6 additions & 6 deletions spyro/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,19 @@ def mpi_init(model):
available_cores = COMM_WORLD.size # noqa: F405
print(f"Parallelism type: {model.parallelism_type}", flush=True)
if model.parallelism_type == "automatic":
num_cores_per_shot = available_cores / model.number_of_sources
num_cores_per_propagation = available_cores / model.number_of_sources
if available_cores % model.number_of_sources != 0:
raise ValueError(
"Available cores cannot be divided between sources equally."
)
elif model.parallelism_type == "spatial":
num_cores_per_shot = available_cores
num_cores_per_propagation = available_cores
elif model.parallelism_type == "custom":
shots_per_core = model.shots_per_core
num_max_shots_per_core = max(len(sublist) for sublist in shots_per_core)
num_cores_per_shot = len(shots_per_core)
shot_ids_per_propagation = model.shot_ids_per_propagation
num_max_shots_per_core = max(len(sublist) for sublist in shot_ids_per_propagation)
num_cores_per_propagation = len(shot_ids_per_propagation)

comm_ens = Ensemble(COMM_WORLD, num_cores_per_shot) # noqa: F405
comm_ens = Ensemble(COMM_WORLD, num_cores_per_propagation) # noqa: F405
return comm_ens


Expand Down
13 changes: 6 additions & 7 deletions temp_forward_shot.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# from mpi4py.MPI import COMM_WORLD
# import debugpy
# debugpy.listen(3000 + COMM_WORLD.rank)
# debugpy.wait_for_client()
from mpi4py.MPI import COMM_WORLD
import debugpy
debugpy.listen(3000 + COMM_WORLD.rank)
debugpy.wait_for_client()
import spyro
import numpy as np
import math
Expand All @@ -24,8 +24,7 @@ def run_forward(dt):
# spyro however supports both spatial parallelism and "shot" parallelism.
dictionary["parallelism"] = {
"type": "custom", # options: automatic (same number of cores for evey processor) or spatial
"seperate_shots": True,
"shots_per_core": [[0, 1]],
"shot_ids_per_propagation": [[0, 1, 2]],
}

# Define the domain size without the PML. Here we'll assume a 1.00 x 1.00 km
Expand All @@ -40,7 +39,7 @@ def run_forward(dt):
}
dictionary["acquisition"] = {
"source_type": "ricker",
"source_locations": spyro.create_transect((-0.55, 0.7), (-0.55, 1.3), 2),
"source_locations": spyro.create_transect((-0.55, 0.7), (-0.55, 1.3), 3),
"frequency": 5.0,
"delay": 0.2,
"delay_type": "time",
Expand Down
110 changes: 110 additions & 0 deletions temp_test_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from mpi4py.MPI import COMM_WORLD
import debugpy
debugpy.listen(3000 + COMM_WORLD.rank)
debugpy.wait_for_client()
from mpi4py.MPI import COMM_WORLD
from mpi4py import MPI
import numpy as np
import firedrake as fire
import spyro
import warnings
warnings.filterwarnings("ignore")


def error_calc(p_numerical, p_analytical, nt):
norm = np.linalg.norm(p_numerical, 2) / np.sqrt(nt)
error_time = np.linalg.norm(p_analytical - p_numerical, 2) / np.sqrt(nt)
div_error_time = error_time / norm
return div_error_time


def test_forward_3_shots():
final_time = 1.0

dictionary = {}
dictionary["options"] = {
"cell_type": "Q", # simplexes such as triangles or tetrahedra (T) or quadrilaterals (Q)
"variant": "lumped", # lumped, equispaced or DG, default is lumped
"degree": 4, # p order
"dimension": 2, # dimension
}
dictionary["parallelism"] = {
"type": "automatic", # options: automatic (same number of cores for evey processor) or spatial
}
dictionary["mesh"] = {
"Lz": 3.0, # depth in km - always positive # Como ver isso sem ler a malha?
"Lx": 3.0, # width in km - always positive
"Ly": 0.0, # thickness in km - always positive
"mesh_file": None,
"mesh_type": "firedrake_mesh",
}
dictionary["acquisition"] = {
"source_type": "ricker",
"source_locations": [(-1.1, 1.2), (-1.1, 1.5), (-1.1, 1.8)],
"frequency": 5.0,
"delay": 0.2,
"delay_type": "time",
"receiver_locations": spyro.create_transect((-1.3, 1.2), (-1.3, 1.8), 301),
}
dictionary["time_axis"] = {
"initial_time": 0.0, # Initial time for event
"final_time": final_time, # Final time for event
"dt": 0.001, # timestep size
"amplitude": 1, # the Ricker has an amplitude of 1.
"output_frequency": 100, # how frequently to output solution to pvds - Perguntar Daiane ''post_processing_frequnecy'
"gradient_sampling_frequency": 1,
}
dictionary["visualization"] = {
"forward_output": False,
"forward_output_filename": "results/forward_output.pvd",
"fwi_velocity_model_output": False,
"velocity_model_filename": None,
"gradient_output": False,
"gradient_filename": None,
}

Wave_obj = spyro.AcousticWave(dictionary=dictionary)
Wave_obj.set_mesh(mesh_parameters={"dx": 0.1})

mesh_z = Wave_obj.mesh_z
cond = fire.conditional(mesh_z < -1.5, 3.5, 1.5)
Wave_obj.set_initial_velocity_model(conditional=cond, output=True)

Wave_obj.forward_solve()

comm = Wave_obj.comm

arr = Wave_obj.receivers_output

if comm.ensemble_comm.rank == 0:
analytical_p = spyro.utils.nodal_homogeneous_analytical(
Wave_obj, 0.2, 1.5, n_extra=100
)
else:
analytical_p = None
analytical_p = comm.ensemble_comm.bcast(analytical_p, root=0)

# Checking if error before reflection matches
if comm.ensemble_comm.rank == 0:
rec_id = 0
elif comm.ensemble_comm.rank == 1:
rec_id = 150
elif comm.ensemble_comm.rank == 2:
rec_id = 300

arr0 = arr[:, rec_id]
arr0 = arr0.flatten()

error = error_calc(arr0[:430], analytical_p[:430], 430)
if comm.comm.rank == 0:
print(f"Error for shot {Wave_obj.current_sources} is {error} and test has passed equals {np.abs(error) < 0.01}", flush=True)
error_all = COMM_WORLD.allreduce(error, op=MPI.SUM)
error_all /= 3

test = np.abs(error_all) < 0.01

assert test


if __name__ == "__main__":
test_forward_3_shots()
Binary file removed velocity_models/tutorial
Binary file not shown.

0 comments on commit 8c3ffca

Please sign in to comment.