Skip to content

Commit

Permalink
remove the MPI functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
renefritze committed Sep 3, 2024
1 parent b5a4313 commit f9ec722
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 444 deletions.
69 changes: 0 additions & 69 deletions .github/workflows/test_mpi.yml

This file was deleted.

74 changes: 0 additions & 74 deletions pytimings/mpi.py

This file was deleted.

52 changes: 11 additions & 41 deletions pytimings/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@
import psutil

import pytimings
from pytimings.mpi import (
get_communication_wrapper,
get_communicator,
get_local_communicator,
)
from pytimings.tools import ensure_directory_exists

try:
Expand Down Expand Up @@ -153,30 +148,14 @@ def delta(self, section_name: str) -> dict[str, int]:
except KeyError:
raise NoTimerError(section_name, self) # noqa: B904

def output_files(self, output_dir: Path, csv_base: str, per_rank=False) -> None | Path:
"""creates one file local to each MPI-rank (no global averaging)
and one single rank-0 file with all combined/averaged measures
"""
communication = get_communication_wrapper()
rank = communication.rank

def output_files(self, output_dir: Path, csv_base: str) -> Path:
"""output all recorded measures to a csv file"""
output_dir = Path(output_dir)
ensure_directory_exists(output_dir)
if per_rank:
filename = output_dir / f"{csv_base}_p{rank:08d}.csv"
with open(filename, "w") as outfile: # noqa: PTH123
self.output_all_measures(outfile, get_local_communicator())
tmp_out = StringIO()
# all ranks have to participate in the data generation
self.output_all_measures(tmp_out, get_communicator())
# but only rank 0 needs to write it
if rank == 0:
a_filename = output_dir / f"{csv_base}.csv"
tmp_out.seek(0)
open(a_filename, "w").write(tmp_out.read()) # noqa: PTH123
return a_filename
outfile = output_dir / f"{csv_base}.csv"
with outfile.open("w") as out:
self.output_all_measures(out)
return outfile

def output_console(self):
"""outputs walltime only w/o MPI-rank averaging"""
Expand Down Expand Up @@ -204,24 +183,19 @@ def output_console(self):
else:
csl.print("No timings were recorded")

def output_all_measures(self, out=None, mpi_comm=None) -> None:
def output_all_measures(self, out=None) -> None:
"""output all recorded measures
Outputs average, min, max over all MPI processes associated to mpi_comm
"""
out = out or sys.stdout
mpi_comm = mpi_comm or get_communicator()
comm = get_communication_wrapper(mpi_comm)
stash = StringIO()
csv_file = csv.writer(stash, lineterminator="\n")
# header
csv_file.writerow(["section", "value"])

# threadManager().max_threads()
csv_file.writerow(["threads", 1])
csv_file.writerow(["ranks", comm.size])

weight = 1 / comm.size

for section, delta in self._commited_deltas.items():
delta = delta._asdict() # noqa: PLW2901
Expand All @@ -230,12 +204,9 @@ def output_all_measures(self, out=None, mpi_comm=None) -> None:
syst = delta[SYS_TIME]
csv_file.writerows(
[
[f"{section}_avg_usr", comm.sum(usr) * weight],
[f"{section}_max_usr", comm.max(usr)],
[f"{section}_avg_wall", comm.sum(wall) * weight],
[f"{section}_max_wall", comm.max(wall)],
[f"{section}_avg_sys", comm.sum(syst) * weight],
[f"{section}_max_sys", comm.max(syst)],
[f"{section}_usr", usr],
[f"{section}_wall", wall],
[f"{section}_sys", syst],
]
)
csv_file.writerows([[f"pytimings::data::{k}", v] for k, v in self.extra_data.items()])
Expand All @@ -247,8 +218,7 @@ def output_all_measures(self, out=None, mpi_comm=None) -> None:
)
csv_file.writerow(["pytimings::data::_version", pytimings.__version__])
stash.seek(0)
if comm.rank == 0:
shutil.copyfileobj(stash, out)
shutil.copyfileobj(stash, out)

def add_extra_data(self, data: [dict]):
"""Use this for something configuration data that makes the csv more informative.
Expand Down
8 changes: 1 addition & 7 deletions pytimings/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,5 @@ def generate_example_data(output_dir, number_of_runs=10):
busywait(number_of_runs / 10 / i)
with scoped_timing("quadratic", timings=timings):
busywait(number_of_runs / 10 / i**2)
files.append(
timings.output_files(
output_dir=output_dir,
csv_base=f"example_speedup_{i:05}",
per_rank=False,
)
)
files.append(timings.output_files(output_dir=output_dir, csv_base=f"example_speedup_{i:05}"))
return files
Loading

0 comments on commit f9ec722

Please sign in to comment.