Skip to content

Commit

Permalink
Merge pull request #2 from AllenNeuralDynamics/feat-smartspim-zarr-co…
Browse files Browse the repository at this point in the history
…mpression

Feat smartspim zarr compression
  • Loading branch information
camilolaiton authored May 30, 2024
2 parents 0b24b47 + d23d389 commit 1c3e7db
Show file tree
Hide file tree
Showing 13 changed files with 2,900 additions and 2 deletions.
13 changes: 12 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
name = "aind-smartspim-data-transformation"
description = "Generated from aind-library-template"
license = {text = "MIT"}
requires-python = ">=3.7"
requires-python = ">=3.9"
authors = [
{name = "Allen Institute for Neural Dynamics"}
]
Expand All @@ -17,6 +17,17 @@ readme = "README.md"
dynamic = ["version"]

dependencies = [
'aind-data-transformation>=0.0.18',
'zarr==2.18.2',
'numcodecs==0.11.0',
'dask-image==2024.5.3',
'xarray_multiscale==1.1.0',
'bokeh==2.4.2',
'pims==0.6.1',
'dask[distributed]==2024.5.1',
'ome-zarr==0.8.2',
'imagecodecs[all]==2023.3.16',
'natsort==8.4.0',
]

[project.optional-dependencies]
Expand Down
25 changes: 25 additions & 0 deletions scripts/singularity_build.def
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
Bootstrap: docker
From: python:3.10-bullseye

%setup
# Copy project directory into container
cp -R . ${SINGULARITY_ROOTFS}/aind-smartspim-data-transformation

%post
# Installing dask mpi
wget https://www.mpich.org/static/downloads/3.2/mpich-3.2.tar.gz
tar xfz mpich-3.2.tar.gz
rm mpich-3.2.tar.gz
mkdir mpich-build
cd mpich-build
../mpich-3.2/configure --disable-fortran 2>&1 | tee c.txt
make 2>&1 | tee m.txt
make install 2>&1 | tee mi.txt
cd ..

cd ${SINGULARITY_ROOTFS}/aind-smartspim-data-transformation
pip install . --no-cache-dir
rm -rf ${SINGULARITY_ROOTFS}/aind-smartspim-data-transformation

pip install mpi4py --no-cache-dir
pip install dask_mpi
3 changes: 2 additions & 1 deletion src/aind_smartspim_data_transformation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""Init package"""
__version__ = "0.0.0"

__version__ = "0.0.1"
3 changes: 3 additions & 0 deletions src/aind_smartspim_data_transformation/compress/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
Init internal package
"""
139 changes: 139 additions & 0 deletions src/aind_smartspim_data_transformation/compress/dask_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""
Module for dask utilities
"""

import logging
import os
import socket
from enum import Enum
from typing import Optional, Tuple

import distributed
import requests
from distributed import Client, LocalCluster

try:
from dask_mpi import initialize

DASK_MPI_INSTALLED = True
except ImportError:
DASK_MPI_INSTALLED = False

LOGGER = logging.getLogger(__name__)


class Deployment(Enum):
"""Deployment enums"""

LOCAL = "local"
SLURM = "slurm"


def log_dashboard_address(
client: distributed.Client, login_node_address: str = "hpc-login"
) -> None:
"""
Logs the terminal command required to access the Dask dashboard
Args:
client: the Client instance
login_node_address: the address of the cluster login node
"""
host = client.run_on_scheduler(socket.gethostname)
port = client.scheduler_info()["services"]["dashboard"]
user = os.getenv("USER")
LOGGER.info(
f"To access the dashboard, run the following in "
"a terminal: ssh -L {port}:{host}:{port} {user}@"
f"{login_node_address} "
)


def get_deployment() -> str:
"""
Gets the SLURM deployment if this
exists
Returns
-------
str
SLURM_JOBID
"""
if os.getenv("SLURM_JOBID") is None:
deployment = Deployment.LOCAL.value
else:
# we're running on the Allen HPC
deployment = Deployment.SLURM.value
return deployment


def get_client(
deployment: str = Deployment.LOCAL.value,
worker_options: Optional[dict] = None,
n_workers: int = 1,
processes=True,
) -> Tuple[distributed.Client, int]:
"""
Create a distributed Client
Args:
deployment: the type of deployment. Either "local" or "slurm"
worker_options: a dictionary of options to pass to the worker class
n_workers: the number of workers (only applies to "local" deployment)
Returns:
the distributed Client and number of workers
"""
if deployment == Deployment.SLURM.value:
if not DASK_MPI_INSTALLED:
raise ImportError(
"dask-mpi must be installed to use the SLURM deployment"
)
if worker_options is None:
worker_options = {}
slurm_job_id = os.getenv("SLURM_JOBID")
if slurm_job_id is None:
raise Exception(
"SLURM_JOBID environment variable is not set."
"Are you running under SLURM?"
)
initialize(
nthreads=int(os.getenv("SLURM_CPUS_PER_TASK", 1)),
local_directory=f"/scratch/fast/{slurm_job_id}",
worker_class="distributed.nanny.Nanny",
worker_options=worker_options,
)
client = Client()
log_dashboard_address(client)
n_workers = int(os.getenv("SLURM_NTASKS"))
elif deployment == Deployment.LOCAL.value:
client = Client(
LocalCluster(
n_workers=n_workers, processes=processes, threads_per_worker=1
)
)
else:
raise NotImplementedError
return client, n_workers


def cancel_slurm_job(
job_id: str, api_url: str, headers: dict
) -> requests.Response:
"""
Attempt to release resources and cancel the job
Args:
job_id: the SLURM job ID
api_url: the URL of the SLURM REST API.
E.g., "http://myhost:80/api/slurm/v0.0.36"
Raises:
HTTPError: if the request to cancel the job fails
"""
# Attempt to release resources and cancel the job
# Workaround for https://github.com/dask/dask-mpi/issues/87
endpoint = f"{api_url}/job/{job_id}"
response = requests.delete(endpoint, headers=headers)

return response
Loading

0 comments on commit 1c3e7db

Please sign in to comment.