Skip to content

Commit

Permalink
updating closing slurm client
Browse files Browse the repository at this point in the history
  • Loading branch information
camilolaiton committed Jun 5, 2024
1 parent 76f58c4 commit 6fdf674
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 1 deletion.
35 changes: 35 additions & 0 deletions src/aind_smartspim_data_transformation/compress/dask_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,38 @@ def cancel_slurm_job(
response = requests.delete(endpoint, headers=headers)

return response


def _cleanup(deployment: str) -> None:
"""
Clean up any resources that were created during the job.
Parameters
----------
deployment : str
The type of deployment. Either "local" or "slurm"
"""
if deployment == Deployment.SLURM.value:
job_id = os.getenv("SLURM_JOBID")
if job_id is not None:
try:
api_url = f"http://{os.environ['HPC_HOST']}"
api_url += f":{os.environ['HPC_PORT']}"
api_url += f"/{os.environ['HPC_API_ENDPOINT']}"
headers = {
"X-SLURM-USER-NAME": os.environ["HPC_USERNAME"],
"X-SLURM-USER-PASSWORD": os.environ["HPC_PASSWORD"],
"X-SLURM-USER-TOKEN": os.environ["HPC_TOKEN"],
}
except KeyError as ke:
logging.error(f"Failed to get SLURM env vars to cleanup: {ke}")
return
logging.info(f"Cancelling SLURM job {job_id}")
response = cancel_slurm_job(job_id, api_url, headers)
if response.status_code != 200:
logging.error(
f"Failed to cancel SLURM job {job_id}: {response.text}"
)
else:
# This might not run if the job is cancelled
logging.info(f"Cancelled SLURM job {job_id}")
7 changes: 6 additions & 1 deletion src/aind_smartspim_data_transformation/smartspim_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
JobResponse,
get_parser,
)
from distributed.utils import silence_logging_cmgr
from numcodecs.blosc import Blosc
from pydantic import Field

from aind_smartspim_data_transformation.compress.dask_utils import (
_cleanup,
get_client,
get_deployment,
)
Expand Down Expand Up @@ -159,7 +161,10 @@ def _compress_and_write_channels(
)

# Closing client
client.shutdown()
with silence_logging_cmgr(logging.CRITICAL):
client.shutdown()

_cleanup(deployment)

def _compress_raw_data(self) -> None:
"""Compresses smartspim data"""
Expand Down

0 comments on commit 6fdf674

Please sign in to comment.