diff --git a/.coveragerc b/.coveragerc index 61ffa067..fd7affab 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,4 +1,5 @@ # Configuration file for Python coverage tests [run] +disable_warnings = include-ignored include = dask_cuda/* omit = dask_cuda/tests/* diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 27528dfc..a2b60871 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,7 +4,7 @@ repos: hooks: - id: isort - repo: https://github.com/ambv/black - rev: 19.10b0 + rev: 22.3.0 hooks: - id: black - repo: https://gitlab.com/pycqa/flake8 diff --git a/CHANGELOG.md b/CHANGELOG.md index e992e428..4b5dcaf2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,39 @@ +# dask-cuda 22.06.00 (7 Jun 2022) + +## 🚨 Breaking Changes + +- Upgrade `numba` pinning to be in-line with rest of rapids ([#912](https://github.com/rapidsai/dask-cuda/pull/912)) [@galipremsagar](https://github.com/galipremsagar) + +## 🐛 Bug Fixes + +- Reduce `test_cudf_cluster_device_spill` test and speed it up ([#918](https://github.com/rapidsai/dask-cuda/pull/918)) [@pentschev](https://github.com/pentschev) +- Update ImportError tests with --pre-import ([#914](https://github.com/rapidsai/dask-cuda/pull/914)) [@pentschev](https://github.com/pentschev) +- Add xfail mark to `test_pre_import_not_found` ([#908](https://github.com/rapidsai/dask-cuda/pull/908)) [@pentschev](https://github.com/pentschev) +- Increase spill tests timeout to 30 seconds ([#901](https://github.com/rapidsai/dask-cuda/pull/901)) [@pentschev](https://github.com/pentschev) +- Fix errors related with `distributed.worker.memory.terminate` ([#900](https://github.com/rapidsai/dask-cuda/pull/900)) [@pentschev](https://github.com/pentschev) +- Skip tests on import error for some optional packages ([#899](https://github.com/rapidsai/dask-cuda/pull/899)) [@pentschev](https://github.com/pentschev) +- Update auto host_memory computation when threads per worker > 1 ([#896](https://github.com/rapidsai/dask-cuda/pull/896)) [@ayushdg](https://github.com/ayushdg) +- Update black to 22.3.0 ([#889](https://github.com/rapidsai/dask-cuda/pull/889)) [@charlesbluca](https://github.com/charlesbluca) +- Remove legacy `check_python_3` ([#886](https://github.com/rapidsai/dask-cuda/pull/886)) [@pentschev](https://github.com/pentschev) + +## 📖 Documentation + +- Add documentation for `RAPIDS_NO_INITIALIZE` ([#898](https://github.com/rapidsai/dask-cuda/pull/898)) [@charlesbluca](https://github.com/charlesbluca) +- Use upstream warning functions for CUDA initialization ([#894](https://github.com/rapidsai/dask-cuda/pull/894)) [@charlesbluca](https://github.com/charlesbluca) + +## 🛠️ Improvements + +- Pin `dask` and `distributed` for release ([#922](https://github.com/rapidsai/dask-cuda/pull/922)) [@galipremsagar](https://github.com/galipremsagar) +- Pin `dask` & `distributed` for release ([#916](https://github.com/rapidsai/dask-cuda/pull/916)) [@galipremsagar](https://github.com/galipremsagar) +- Upgrade `numba` pinning to be in-line with rest of rapids ([#912](https://github.com/rapidsai/dask-cuda/pull/912)) [@galipremsagar](https://github.com/galipremsagar) +- Removing test of `cudf.merge_sorted()` ([#905](https://github.com/rapidsai/dask-cuda/pull/905)) [@madsbk](https://github.com/madsbk) +- Disable `include-ignored` coverage warnings ([#903](https://github.com/rapidsai/dask-cuda/pull/903)) [@pentschev](https://github.com/pentschev) +- Fix ci/local script ([#902](https://github.com/rapidsai/dask-cuda/pull/902)) [@Ethyling](https://github.com/Ethyling) +- Use conda to build python packages during GPU tests ([#897](https://github.com/rapidsai/dask-cuda/pull/897)) [@Ethyling](https://github.com/Ethyling) +- Pull `requirements.txt` into Conda recipe ([#893](https://github.com/rapidsai/dask-cuda/pull/893)) [@jakirkham](https://github.com/jakirkham) +- Unpin `dask` & `distributed` for development ([#892](https://github.com/rapidsai/dask-cuda/pull/892)) [@galipremsagar](https://github.com/galipremsagar) +- Build packages using mambabuild ([#846](https://github.com/rapidsai/dask-cuda/pull/846)) [@Ethyling](https://github.com/Ethyling) + # dask-cuda 22.04.00 (6 Apr 2022) ## 🚨 Breaking Changes diff --git a/ci/cpu/build.sh b/ci/cpu/build.sh index 33ebb855..d2450cfe 100755 --- a/ci/cpu/build.sh +++ b/ci/cpu/build.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -# Copyright (c) 2019, NVIDIA CORPORATION. +# Copyright (c) 2019-2022, NVIDIA CORPORATION. ################################################################################ # dask-cuda cpu build ################################################################################ @@ -68,8 +68,11 @@ pip install git+https://github.com/dask/distributed.git@main # BUILD - Package builds ################################################################################ -gpuci_logger "Build conda pkg for libcudf" -gpuci_conda_retry build conda/recipes/dask-cuda --python=${PYTHON} +# FIXME: Move boa install to gpuci/rapidsai +gpuci_mamba_retry install -c conda-forge boa + +gpuci_logger "Build conda pkg for dask-cuda" +gpuci_conda_retry mambabuild conda/recipes/dask-cuda --python=${PYTHON} rm -rf dist/ python setup.py sdist bdist_wheel diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 1894a78c..48396f93 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -26,15 +26,19 @@ cd "$WORKSPACE" export GIT_DESCRIBE_TAG=`git describe --tags` export MINOR_VERSION=`echo $GIT_DESCRIBE_TAG | grep -o -E '([0-9]+\.[0-9]+)'` export UCX_PATH=$CONDA_PREFIX -export UCXPY_VERSION=0.25.* +export UCXPY_VERSION=0.26.* +unset GIT_DESCRIBE_TAG # Enable NumPy's __array_function__ protocol (needed for NumPy 1.16.x, # will possibly be enabled by default starting on 1.17) export NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1 -# Install dask and distributed from master branch. Usually needed during +# Install dask and distributed from main branch. Usually needed during # development time and disabled before a new dask-cuda release. -export INSTALL_DASK_MASTER=0 +export INSTALL_DASK_MAIN=0 + +# Dask version to install when `INSTALL_DASK_MAIN=0` +export DASK_STABLE_VERSION="2022.05.2" ################################################################################ # SETUP - Check environment @@ -49,43 +53,38 @@ nvidia-smi gpuci_logger "Activate conda env" . /opt/conda/etc/profile.d/conda.sh conda activate rapids + conda info conda config --show-sources conda list --show-channel-urls -# Fixing Numpy version to avoid RuntimeWarning: numpy.ufunc size changed, may -# indicate binary incompatibility. Expected 192 from C header, got 216 from PyObject. -# Also installing cucim in order to test GDS spilling -gpuci_mamba_retry install "cudatoolkit=$CUDA_REL" \ - "cudf=${MINOR_VERSION}" "dask-cudf=${MINOR_VERSION}" \ - "ucx-py=${UCXPY_VERSION}" "ucx-proc=*=gpu" \ - "rapids-build-env=$MINOR_VERSION.*" \ - "cucim" - +# Installing cucim in order to test GDS spilling # Pin pytest-asyncio because latest versions modify the default asyncio # `event_loop_policy`. See https://github.com/dask/distributed/pull/4212 . -gpuci_mamba_retry install "pytest-asyncio=<0.14.0" - -# https://docs.rapids.ai/maintainers/depmgmt/ -# gpuci_mamba_retry remove -f rapids-build-env -# gpuci_mamba_retry install "your-pkg=1.0.0" - -conda info -conda config --show-sources -conda list --show-channel-urls - -# Install the main version of dask and distributed -if [[ "${INSTALL_DASK_MASTER}" == 1 ]]; then - gpuci_logger "pip install git+https://github.com/dask/distributed.git@main --upgrade" - pip install "git+https://github.com/dask/distributed.git@main" --upgrade - gpuci_logger "pip install git+https://github.com/dask/dask.git@main --upgrade" - pip install "git+https://github.com/dask/dask.git@main" --upgrade +gpuci_mamba_retry install "cudf=${MINOR_VERSION}" \ + "dask-cudf=${MINOR_VERSION}" \ + "ucx-py=${UCXPY_VERSION}" \ + "ucx-proc=*=gpu" \ + "cucim" \ + "pytest-asyncio=<0.14.0" + +# Install latest nightly version for dask and distributed if needed +if [[ "${INSTALL_DASK_MAIN}" == 1 ]]; then + gpuci_logger "Installing dask and distributed from dask nightly channel" + gpuci_mamba_retry install -c dask/label/dev \ + "dask/label/dev::dask" \ + "dask/label/dev::distributed" +else + gpuci_logger "gpuci_mamba_retry install conda-forge::dask==${DASK_STABLE_VERSION} conda-forge::distributed==${DASK_STABLE_VERSION} conda-forge::dask-core==${DASK_STABLE_VERSION} --force-reinstall" + gpuci_mamba_retry install conda-forge::dask==${DASK_STABLE_VERSION} conda-forge::distributed==${DASK_STABLE_VERSION} conda-forge::dask-core==${DASK_STABLE_VERSION} --force-reinstall fi + gpuci_logger "Check versions" python --version $CC --version $CXX --version + conda info conda config --show-sources conda list --show-channel-urls @@ -94,9 +93,14 @@ conda list --show-channel-urls # BUILD - Build dask-cuda ################################################################################ -gpuci_logger "Build dask-cuda" -cd "$WORKSPACE" -python -m pip install -e . +# TODO: Move boa install to gpuci/rapidsai +gpuci_mamba_retry install boa + +gpuci_logger "Build and install dask-cuda" +cd "${WORKSPACE}" +CONDA_BLD_DIR="${WORKSPACE}/.conda-bld" +gpuci_conda_retry mambabuild --croot "${CONDA_BLD_DIR}" conda/recipes/dask-cuda --python="${PYTHON}" +gpuci_mamba_retry install -c "${CONDA_BLD_DIR}" dask-cuda ################################################################################ # TEST - Run pytests for ucx-py @@ -118,4 +122,3 @@ fi if [ -n "${CODECOV_TOKEN}" ]; then codecov -t $CODECOV_TOKEN fi - diff --git a/ci/local/build.sh b/ci/local/build.sh index 0e6c7a97..20d867e8 100755 --- a/ci/local/build.sh +++ b/ci/local/build.sh @@ -131,7 +131,7 @@ DOCKER_MAJOR=$(docker -v|sed 's/[^[0-9]*\([0-9]*\).*/\1/') GPU_OPTS="--gpus device=${NVIDIA_VISIBLE_DEVICES}" if [ "$DOCKER_MAJOR" -lt 19 ] then - GPU_OPTS="--runtime=nvidia -e NVIDIA_VISIBLE_DEVICES='${NVIDIA_VISIBLE_DEVICES}'" + GPU_OPTS="--runtime=nvidia -e NVIDIA_VISIBLE_DEVICES=${NVIDIA_VISIBLE_DEVICES}" fi docker run --rm -it ${GPU_OPTS} \ diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index b8a80e9c..167fe847 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -2,6 +2,8 @@ # Usage: # conda build -c conda-forge . +{% set data = load_setup_py_data() %} + {% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %} {% set git_revision_count=environ.get('GIT_DESCRIBE_NUMBER', 0) %} {% set py_version=environ.get('CONDA_PY', 36) %} @@ -10,7 +12,7 @@ package: version: {{ version }} source: - path: ../../.. + git_url: ../../.. build: number: {{ git_revision_count }} @@ -27,12 +29,9 @@ requirements: - setuptools run: - python - - dask==2022.03.0 - - distributed==2022.03.0 - - pynvml>=11.0.0 - - numpy>=1.16.0 - - numba>=0.53.1 - - click==8.0.4 + {% for r in data.get("install_requires", []) %} + - {{ r }} + {% endfor %} test: imports: diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index df0865e3..ed8e6ae9 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -19,8 +19,10 @@ # Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` -dask.dataframe.shuffle.rearrange_by_column_tasks = get_rearrange_by_column_tasks_wrapper( - dask.dataframe.shuffle.rearrange_by_column_tasks +dask.dataframe.shuffle.rearrange_by_column_tasks = ( + get_rearrange_by_column_tasks_wrapper( + dask.dataframe.shuffle.rearrange_by_column_tasks + ) ) diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index 74429518..678e1033 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -36,7 +36,7 @@ def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu) import numpy as xp import pandas as xdf - xp.random.seed(2 ** 32 - 1) + xp.random.seed(2**32 - 1) chunk_type = chunk_type or "build" frac_match = frac_match or 1.0 @@ -258,7 +258,10 @@ def main(args): for (w1, w2), v in bandwidths.items() } total_nbytes = { - (scheduler_workers[w1].name, scheduler_workers[w2].name,): format_bytes(sum(nb)) + ( + scheduler_workers[w1].name, + scheduler_workers[w2].name, + ): format_bytes(sum(nb)) for (w1, w2), nb in total_nbytes.items() } @@ -379,21 +382,30 @@ def main(args): def parse_args(): special_args = [ { - "name": ["-b", "--backend",], + "name": [ + "-b", + "--backend", + ], "choices": ["dask", "explicit-comms"], "default": "dask", "type": str, "help": "The backend to use.", }, { - "name": ["-t", "--type",], + "name": [ + "-t", + "--type", + ], "choices": ["cpu", "gpu"], "default": "gpu", "type": str, "help": "Do merge with GPU or CPU dataframes", }, { - "name": ["-c", "--chunk-size",], + "name": [ + "-c", + "--chunk-size", + ], "default": 1_000_000, "metavar": "n", "type": int, @@ -444,9 +456,17 @@ def parse_args(): "action": "store_true", "help": "Write output as markdown", }, - {"name": "--runs", "default": 3, "type": int, "help": "Number of runs",}, { - "name": ["-s", "--set-index",], + "name": "--runs", + "default": 3, + "type": int, + "help": "Number of runs", + }, + { + "name": [ + "-s", + "--set-index", + ], "action": "store_true", "help": "Call set_index on the key column to sort the joined dataframe.", }, diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index f2c812d0..8d7461a7 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -138,7 +138,10 @@ def main(args): for (w1, w2), v in bandwidths.items() } total_nbytes = { - (scheduler_workers[w1].name, scheduler_workers[w2].name,): format_bytes(sum(nb)) + ( + scheduler_workers[w1].name, + scheduler_workers[w2].name, + ): format_bytes(sum(nb)) for (w1, w2), nb in total_nbytes.items() } @@ -251,14 +254,20 @@ def parse_args(): "help": "Number of input partitions (default '100')", }, { - "name": ["-b", "--backend",], + "name": [ + "-b", + "--backend", + ], "choices": ["dask", "explicit-comms"], "default": "dask", "type": str, "help": "The backend to use.", }, { - "name": ["-t", "--type",], + "name": [ + "-t", + "--type", + ], "choices": ["cpu", "gpu"], "default": "gpu", "type": str, @@ -276,7 +285,12 @@ def parse_args(): "action": "store_true", "help": "Write output as markdown", }, - {"name": "--runs", "default": 3, "type": int, "help": "Number of runs",}, + { + "name": "--runs", + "default": 3, + "type": int, + "help": "Number of runs", + }, ] return parse_benchmark_args( diff --git a/dask_cuda/benchmarks/local_cupy.py b/dask_cuda/benchmarks/local_cupy.py index a4bbc341..baa1d551 100644 --- a/dask_cuda/benchmarks/local_cupy.py +++ b/dask_cuda/benchmarks/local_cupy.py @@ -229,9 +229,10 @@ async def run(args): for (w1, w2), v in bandwidths.items() } total_nbytes = { - (scheduler_workers[w1].name, scheduler_workers[w2].name,): format_bytes( - sum(nb) - ) + ( + scheduler_workers[w1].name, + scheduler_workers[w2].name, + ): format_bytes(sum(nb)) for (w1, w2), nb in total_nbytes.items() } @@ -318,7 +319,10 @@ async def run(args): def parse_args(): special_args = [ { - "name": ["-s", "--size",], + "name": [ + "-s", + "--size", + ], "default": "10000", "metavar": "n", "type": int, @@ -326,27 +330,39 @@ def parse_args(): "the second dimension is given by --second-size.", }, { - "name": ["-2", "--second-size",], + "name": [ + "-2", + "--second-size", + ], "default": "1000", "type": int, "help": "The second dimension size for 'svd' operation (default 1000).", }, { - "name": ["-t", "--type",], + "name": [ + "-t", + "--type", + ], "choices": ["cpu", "gpu"], "default": "gpu", "type": str, "help": "Do merge with GPU or CPU dataframes.", }, { - "name": ["-o", "--operation",], + "name": [ + "-o", + "--operation", + ], "default": "transpose_sum", "type": str, "help": "The operation to run, valid options are: " "'transpose_sum' (default), 'dot', 'fft', 'svd', 'sum', 'mean', 'slice'.", }, { - "name": ["-c", "--chunk-size",], + "name": [ + "-c", + "--chunk-size", + ], "default": "2500", "type": int, "help": "Chunk size (default 2500).", diff --git a/dask_cuda/benchmarks/local_cupy_map_overlap.py b/dask_cuda/benchmarks/local_cupy_map_overlap.py index 077b212f..87904e03 100644 --- a/dask_cuda/benchmarks/local_cupy_map_overlap.py +++ b/dask_cuda/benchmarks/local_cupy_map_overlap.py @@ -118,9 +118,10 @@ async def run(args): for (w1, w2), v in bandwidths.items() } total_nbytes = { - (scheduler_workers[w1].name, scheduler_workers[w2].name,): format_bytes( - sum(nb) - ) + ( + scheduler_workers[w1].name, + scheduler_workers[w2].name, + ): format_bytes(sum(nb)) for (w1, w2), nb in total_nbytes.items() } @@ -198,28 +199,40 @@ async def run(args): def parse_args(): special_args = [ { - "name": ["-s", "--size",], + "name": [ + "-s", + "--size", + ], "default": "10000", "metavar": "n", "type": int, "help": "The size n in n^2 (default 10000)", }, { - "name": ["-t", "--type",], + "name": [ + "-t", + "--type", + ], "choices": ["cpu", "gpu"], "default": "gpu", "type": str, "help": "Use GPU or CPU arrays", }, { - "name": ["-c", "--chunk-size",], + "name": [ + "-c", + "--chunk-size", + ], "default": "128 MiB", "metavar": "nbytes", "type": str, "help": "Chunk size (default '128 MiB')", }, { - "name": ["-k", "--kernel-size",], + "name": [ + "-k", + "--kernel-size", + ], "default": "1", "metavar": "k", "type": int, @@ -232,7 +245,12 @@ def parse_args(): "type": parse_bytes, "help": "Ignore messages smaller than this (default '1 MB')", }, - {"name": "--runs", "default": 3, "type": int, "help": "Number of runs",}, + { + "name": "--runs", + "default": 3, + "type": int, + "help": "Number of runs", + }, ] return parse_benchmark_args( diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 374fc8e4..d807427c 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -62,7 +62,9 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] "Logging is only enabled if RMM memory pool is enabled.", ) parser.add_argument( - "--all-to-all", action="store_true", help="Run all-to-all before computation", + "--all-to-all", + action="store_true", + help="Run all-to-all before computation", ) parser.add_argument( "--enable-tcp-over-ucx", @@ -241,7 +243,10 @@ def get_scheduler_workers(dask_scheduler=None): def setup_memory_pool( - dask_worker=None, pool_size=None, disable_pool=False, log_directory=None, + dask_worker=None, + pool_size=None, + disable_pool=False, + log_directory=None, ): import cupy diff --git a/dask_cuda/cli/dask_cuda_worker.py b/dask_cuda/cli/dask_cuda_worker.py index 01a8b729..42f3163d 100755 --- a/dask_cuda/cli/dask_cuda_worker.py +++ b/dask_cuda/cli/dask_cuda_worker.py @@ -5,7 +5,7 @@ import click from tornado.ioloop import IOLoop, TimeoutError -from distributed.cli.utils import check_python_3, install_signal_handlers +from distributed.cli.utils import install_signal_handlers from distributed.preloading import validate_preload_argv from distributed.security import Security from distributed.utils import import_term @@ -126,7 +126,10 @@ allows querying the amount of memory allocated by RMM.""", ) @click.option( - "--pid-file", type=str, default="", help="File to write the process PID.", + "--pid-file", + type=str, + default="", + help="File to write the process PID.", ) @click.option( "--resources", @@ -314,7 +317,9 @@ def main( ): if tls_ca_file and tls_cert and tls_key: security = Security( - tls_ca_file=tls_ca_file, tls_worker_cert=tls_cert, tls_worker_key=tls_key, + tls_ca_file=tls_ca_file, + tls_worker_cert=tls_cert, + tls_worker_key=tls_key, ) else: security = None @@ -384,7 +389,6 @@ async def run(): def go(): - check_python_3() main() diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index 10fc6493..f97056b9 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -85,7 +85,10 @@ def __init__( if nthreads < 1: raise ValueError("nthreads must be higher than 0.") - memory_limit = parse_memory_limit(memory_limit, nthreads, total_cores=nprocs) + # Set nthreads=1 when parsing mem_limit since it only depends on nprocs + memory_limit = parse_memory_limit( + memory_limit=memory_limit, nthreads=1, total_cores=nprocs + ) if pid_file: with open(pid_file, "w") as f: diff --git a/dask_cuda/explicit_comms/comms.py b/dask_cuda/explicit_comms/comms.py index 55e10d19..f7726781 100644 --- a/dask_cuda/explicit_comms/comms.py +++ b/dask_cuda/explicit_comms/comms.py @@ -123,7 +123,7 @@ async def server_handler(ep): async def _create_endpoints(session_state, peers): - """ Each worker creates a UCX endpoint to all workers with greater rank""" + """Each worker creates a UCX endpoint to all workers with greater rank""" assert session_state["loop"] is asyncio.get_event_loop() myrank = session_state["rank"] @@ -191,16 +191,16 @@ def submit(self, worker, coroutine, *args, wait=False): """Run a coroutine on a single worker The coroutine is given the worker's state dict as the first argument - and *args as the following arguments. + and ``*args`` as the following arguments. Parameters ---------- worker: str - Worker to run the `coroutine` + Worker to run the ``coroutine`` coroutine: coroutine The function to run on the worker *args: - Arguments for `coroutine` + Arguments for ``coroutine`` wait: boolean, optional If True, waits for the coroutine to finished before returning. @@ -224,14 +224,14 @@ def run(self, coroutine, *args, workers=None, lock_workers=False): """Run a coroutine on multiple workers The coroutine is given the worker's state dict as the first argument - and *args as the following arguments. + and ``*args`` as the following arguments. Parameters ---------- coroutine: coroutine The function to run on each worker *args: - Arguments for `coroutine` + Arguments for ``coroutine`` workers: list, optional List of workers. Default is all workers lock_workers: bool, optional diff --git a/dask_cuda/initialize.py b/dask_cuda/initialize.py index 5d7b0952..bda4d08b 100644 --- a/dask_cuda/initialize.py +++ b/dask_cuda/initialize.py @@ -1,6 +1,5 @@ import logging import os -import warnings import click import numba.cuda @@ -41,28 +40,17 @@ def _create_cuda_context(): ) ctx = has_cuda_context() if ctx is not False and distributed.comm.ucx.cuda_context_created is False: - warnings.warn( - f"A CUDA context for device {ctx} already exists on process ID " - f"{os.getpid()}. This is often the result of a CUDA-enabled library " - "calling a CUDA runtime function before Dask-CUDA can spawn worker " - "processes. Please make sure any such function calls don't happen at " - "import time or in the global scope of a program." - ) + distributed.comm.ucx._warn_existing_cuda_context(ctx, os.getpid()) _create_cuda_context_handler() if distributed.comm.ucx.cuda_context_created is False: ctx = has_cuda_context() if ctx is not False and ctx != cuda_visible_device: - warnings.warn( - f"Worker with process ID {os.getpid()} should have a CUDA context " - f"assigned to device {cuda_visible_device}, but instead the CUDA " - f"context is on device {ctx}. This is often the result of a " - "CUDA-enabled library calling a CUDA runtime function before " - "Dask-CUDA can spawn worker processes. Please make sure any such " - "function calls don't happen at import time or in the global scope " - "of a program." + distributed.comm.ucx._warn_cuda_context_wrong_device( + cuda_visible_device, ctx, os.getpid() ) + except Exception: logger.error("Unable to start CUDA Context", exc_info=True) diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 51e118f7..d2a0022b 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -230,8 +230,9 @@ def __init__( n_workers = len(CUDA_VISIBLE_DEVICES) if n_workers < 1: raise ValueError("Number of workers cannot be less than 1.") + # Set nthreads=1 when parsing mem_limit since it only depends on n_workers self.memory_limit = parse_memory_limit( - memory_limit, threads_per_worker, n_workers + memory_limit=memory_limit, nthreads=1, total_cores=n_workers ) self.device_memory_limit = parse_device_memory_limit( device_memory_limit, device_index=nvml_device_index(0, CUDA_VISIBLE_DEVICES) @@ -378,7 +379,9 @@ def new_worker_spec(self): visible_devices = cuda_visible_devices(worker_count, self.cuda_visible_devices) spec["options"].update( { - "env": {"CUDA_VISIBLE_DEVICES": visible_devices,}, + "env": { + "CUDA_VISIBLE_DEVICES": visible_devices, + }, "plugins": { CPUAffinity( get_cpu_affinity(nvml_device_index(0, visible_devices)) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 74639fc3..f237a59d 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -378,7 +378,9 @@ def __del__(self): pxy.manager.remove(self) def _pxy_serialize( - self, serializers: Iterable[str], proxy_detail: ProxyDetail = None, + self, + serializers: Iterable[str], + proxy_detail: ProxyDetail = None, ) -> None: """Inplace serialization of the proxied object using the `serializers` diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index edbd9ae0..6f653a36 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -13,8 +13,6 @@ from distributed.utils_test import loop # noqa: F401 from distributed.utils_test import popen -import rmm - from dask_cuda.utils import ( get_gpu_count_mig, get_gpu_uuid_from_index, @@ -22,9 +20,6 @@ wait_workers, ) -_driver_version = rmm._cuda.gpu.driverGetVersion() -_runtime_version = rmm._cuda.gpu.runtimeGetVersion() - @patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0,3,7,8"}) def test_cuda_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: F811 @@ -59,7 +54,7 @@ def get_visible_devices(): workers = client.scheduler_info()["workers"] for w in workers.values(): - assert w["memory_limit"] == MEMORY_LIMIT // len(workers) * nthreads + assert w["memory_limit"] == MEMORY_LIMIT // len(workers) assert len(expected) == 0 @@ -111,12 +106,14 @@ def test_rmm_managed(loop): # noqa: F811 assert v is rmm.mr.ManagedMemoryResource -@pytest.mark.skipif( - _driver_version < 11020 or _runtime_version < 11020, - reason="cudaMallocAsync not supported", -) def test_rmm_async(loop): # noqa: F811 rmm = pytest.importorskip("rmm") + + driver_version = rmm._cuda.gpu.driverGetVersion() + runtime_version = rmm._cuda.gpu.runtimeGetVersion() + if driver_version < 11020 or runtime_version < 11020: + pytest.skip("cudaMallocAsync not supported") + with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): with popen( [ @@ -206,7 +203,12 @@ def test_pre_import(loop): # noqa: F811 with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): with popen( - ["dask-cuda-worker", "127.0.0.1:9369", "--pre-import", module,] + [ + "dask-cuda-worker", + "127.0.0.1:9369", + "--pre-import", + module, + ] ): with Client("127.0.0.1:9369", loop=loop) as client: assert wait_workers(client, n_gpus=get_n_gpus()) @@ -215,6 +217,7 @@ def test_pre_import(loop): # noqa: F811 assert all(imported) +@pytest.mark.timeout(20) @patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0"}) def test_pre_import_not_found(): with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): diff --git a/dask_cuda/tests/test_device_host_file.py b/dask_cuda/tests/test_device_host_file.py index 4da22626..fe627cd4 100644 --- a/dask_cuda/tests/test_device_host_file.py +++ b/dask_cuda/tests/test_device_host_file.py @@ -41,7 +41,9 @@ def test_device_host_file_short( tmpdir = tmp_path / "storage" tmpdir.mkdir() dhf = DeviceHostFile( - device_memory_limit=1024 * 16, memory_limit=1024 * 16, local_directory=tmpdir, + device_memory_limit=1024 * 16, + memory_limit=1024 * 16, + local_directory=tmpdir, ) host = [ @@ -77,7 +79,9 @@ def test_device_host_file_step_by_step(tmp_path): tmpdir = tmp_path / "storage" tmpdir.mkdir() dhf = DeviceHostFile( - device_memory_limit=1024 * 16, memory_limit=1024 * 16, local_directory=tmpdir, + device_memory_limit=1024 * 16, + memory_limit=1024 * 16, + local_directory=tmpdir, ) a = np.random.random(1000) diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index 4199143c..dd92e2a6 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -32,7 +32,9 @@ async def my_rank(state, arg): def _test_local_cluster(protocol): dask.config.update( dask.config.global_config, - {"distributed.comm.ucx": get_ucx_config(enable_tcp_over_ucx=True),}, + { + "distributed.comm.ucx": get_ucx_config(enable_tcp_over_ucx=True), + }, priority="new", ) @@ -102,7 +104,9 @@ def _test_dataframe_shuffle(backend, protocol, n_workers): else: dask.config.update( dask.config.global_config, - {"distributed.comm.ucx": get_ucx_config(enable_tcp_over_ucx=True),}, + { + "distributed.comm.ucx": get_ucx_config(enable_tcp_over_ucx=True), + }, priority="new", ) @@ -202,7 +206,9 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers): dask.config.update( dask.config.global_config, - {"distributed.comm.ucx": get_ucx_config(enable_tcp_over_ucx=True),}, + { + "distributed.comm.ucx": get_ucx_config(enable_tcp_over_ucx=True), + }, priority="new", ) diff --git a/dask_cuda/tests/test_gds.py b/dask_cuda/tests/test_gds.py index 46ef5a3a..257e6f59 100644 --- a/dask_cuda/tests/test_gds.py +++ b/dask_cuda/tests/test_gds.py @@ -11,7 +11,9 @@ if ProxifyHostFile._spill_to_disk is None: tmpdir = tempfile.TemporaryDirectory() ProxifyHostFile( - local_directory=tmpdir.name, device_memory_limit=1024, memory_limit=1024, + local_directory=tmpdir.name, + device_memory_limit=1024, + memory_limit=1024, ) diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index 42dddcc3..fc835281 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -7,17 +7,12 @@ from dask.distributed import Client from distributed.system import MEMORY_LIMIT -from distributed.utils_test import gen_test - -import rmm +from distributed.utils_test import gen_test, raises_with_cause from dask_cuda import CUDAWorker, LocalCUDACluster, utils from dask_cuda.initialize import initialize from dask_cuda.utils import MockWorker, get_gpu_count_mig, get_gpu_uuid_from_index -_driver_version = rmm._cuda.gpu.driverGetVersion() -_runtime_version = rmm._cuda.gpu.runtimeGetVersion() - @gen_test(timeout=20) async def test_local_cuda_cluster(): @@ -123,9 +118,11 @@ async def test_n_workers(): @gen_test(timeout=20) -async def test_threads_per_worker(): +async def test_threads_per_worker_and_memory_limit(): async with LocalCUDACluster(threads_per_worker=4, asynchronous=True) as cluster: assert all(ws.nthreads == 4 for ws in cluster.scheduler.workers.values()) + full_mem = sum(w.memory_manager.memory_limit for w in cluster.workers.values()) + assert full_mem >= MEMORY_LIMIT - 1024 and full_mem < MEMORY_LIMIT + 1024 @gen_test(timeout=20) @@ -147,7 +144,10 @@ async def test_all_to_all(): async def test_rmm_pool(): rmm = pytest.importorskip("rmm") - async with LocalCUDACluster(rmm_pool_size="2GB", asynchronous=True,) as cluster: + async with LocalCUDACluster( + rmm_pool_size="2GB", + asynchronous=True, + ) as cluster: async with Client(cluster, asynchronous=True) as client: memory_resource_type = await client.run( rmm.mr.get_current_device_resource_type @@ -167,7 +167,10 @@ async def test_rmm_maximum_poolsize_without_poolsize_error(): async def test_rmm_managed(): rmm = pytest.importorskip("rmm") - async with LocalCUDACluster(rmm_managed_memory=True, asynchronous=True,) as cluster: + async with LocalCUDACluster( + rmm_managed_memory=True, + asynchronous=True, + ) as cluster: async with Client(cluster, asynchronous=True) as client: memory_resource_type = await client.run( rmm.mr.get_current_device_resource_type @@ -176,15 +179,19 @@ async def test_rmm_managed(): assert v is rmm.mr.ManagedMemoryResource -@pytest.mark.skipif( - _driver_version < 11020 or _runtime_version < 11020, - reason="cudaMallocAsync not supported", -) @gen_test(timeout=20) async def test_rmm_async(): rmm = pytest.importorskip("rmm") - async with LocalCUDACluster(rmm_async=True, asynchronous=True,) as cluster: + driver_version = rmm._cuda.gpu.driverGetVersion() + runtime_version = rmm._cuda.gpu.runtimeGetVersion() + if driver_version < 11020 or runtime_version < 11020: + pytest.skip("cudaMallocAsync not supported") + + async with LocalCUDACluster( + rmm_async=True, + asynchronous=True, + ) as cluster: async with Client(cluster, asynchronous=True) as client: memory_resource_type = await client.run( rmm.mr.get_current_device_resource_type @@ -198,7 +205,9 @@ async def test_rmm_logging(): rmm = pytest.importorskip("rmm") async with LocalCUDACluster( - rmm_pool_size="2GB", rmm_log_directory=".", asynchronous=True, + rmm_pool_size="2GB", + rmm_log_directory=".", + asynchronous=True, ) as cluster: async with Client(cluster, asynchronous=True) as client: memory_resource_type = await client.run( @@ -222,7 +231,9 @@ async def test_pre_import(): pytest.skip("No module found that isn't already loaded") async with LocalCUDACluster( - n_workers=1, pre_import=module, asynchronous=True, + n_workers=1, + pre_import=module, + asynchronous=True, ) as cluster: async with Client(cluster, asynchronous=True) as client: imported = await client.run(lambda: module in sys.modules) @@ -232,16 +243,21 @@ async def test_pre_import(): # Intentionally not using @gen_test to skip cleanup checks async def test_pre_import_not_found(): - with pytest.raises(ModuleNotFoundError): + with raises_with_cause(RuntimeError, None, ImportError, None): await LocalCUDACluster( - n_workers=1, pre_import="my_module", asynchronous=True, + n_workers=1, + pre_import="my_module", + asynchronous=True, ) @gen_test(timeout=20) async def test_cluster_worker(): async with LocalCUDACluster( - scheduler_port=0, asynchronous=True, device_memory_limit=1, n_workers=1, + scheduler_port=0, + asynchronous=True, + device_memory_limit=1, + n_workers=1, ) as cluster: assert len(cluster.workers) == 1 async with Client(cluster, asynchronous=True) as client: @@ -283,7 +299,9 @@ async def test_gpu_uuid(): gpu_uuid = get_gpu_uuid_from_index(0) async with LocalCUDACluster( - CUDA_VISIBLE_DEVICES=gpu_uuid, scheduler_port=0, asynchronous=True, + CUDA_VISIBLE_DEVICES=gpu_uuid, + scheduler_port=0, + asynchronous=True, ) as cluster: assert len(cluster.workers) == 1 async with Client(cluster, asynchronous=True) as client: diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index cf5718a2..1edcab09 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -37,7 +37,9 @@ # Hold on to `tmpdir` to keep dir alive until exit tmpdir = tempfile.TemporaryDirectory() ProxifyHostFile( - local_directory=tmpdir.name, device_memory_limit=1024, memory_limit=1024, + local_directory=tmpdir.name, + device_memory_limit=1024, + memory_limit=1024, ) assert ProxifyHostFile._spill_to_disk is not None @@ -399,18 +401,18 @@ def is_proxy_object(x): assert all(res) # Only proxy objects -@gen_test(timeout=30) +@gen_test(timeout=60) async def test_worker_force_spill_to_disk(): - """Test Dask triggering CPU-to-Disk spilling """ + """Test Dask triggering CPU-to-Disk spilling""" cudf = pytest.importorskip("cudf") - with dask.config.set({"distributed.worker.memory.terminate": None}): + with dask.config.set({"distributed.worker.memory.terminate": False}): async with dask_cuda.LocalCUDACluster( n_workers=1, device_memory_limit="1MB", jit_unspill=True, asynchronous=True ) as cluster: async with Client(cluster, asynchronous=True) as client: # Create a df that are spilled to host memory immediately - df = cudf.DataFrame({"key": np.arange(10 ** 8)}) + df = cudf.DataFrame({"key": np.arange(10**8)}) ddf = dask.dataframe.from_pandas(df, npartitions=1).persist() await ddf @@ -420,12 +422,12 @@ async def f(): # Set a host memory limit that triggers spilling to disk w.memory_manager.memory_pause_fraction = False memory = w.monitor.proc.memory_info().rss - w.memory_manager.memory_limit = memory - 10 ** 8 + w.memory_manager.memory_limit = memory - 10**8 w.memory_manager.memory_target_fraction = 1 print(w.memory_manager.data) await w.memory_manager.memory_monitor(w) # Check that host memory are freed - assert w.monitor.proc.memory_info().rss < memory - 10 ** 7 + assert w.monitor.proc.memory_info().rss < memory - 10**7 w.memory_manager.memory_limit = memory * 10 # Un-limit await client.submit(f) diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index b5c53e26..0c1f9a7f 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -16,8 +16,6 @@ from distributed import Client from distributed.protocol.serialize import deserialize, serialize -import dask_cudf - import dask_cuda from dask_cuda import proxy_object from dask_cuda.disk_io import SpillToDiskFile @@ -29,7 +27,9 @@ if ProxifyHostFile._spill_to_disk is None: tmpdir = tempfile.TemporaryDirectory() ProxifyHostFile( - local_directory=tmpdir.name, device_memory_limit=1024, memory_limit=1024, + local_directory=tmpdir.name, + device_memory_limit=1024, + memory_limit=1024, ) @@ -284,6 +284,7 @@ def test_fixed_attribute_name(): def test_spilling_local_cuda_cluster(jit_unspill): """Testing spilling of a proxied cudf dataframe in a local cuda cluster""" cudf = pytest.importorskip("cudf") + dask_cudf = pytest.importorskip("dask_cudf") def task(x): assert isinstance(x, cudf.DataFrame) @@ -312,7 +313,7 @@ def task(x): assert_frame_equal(got.to_pandas(), df.to_pandas()) -@pytest.mark.parametrize("obj", [bytearray(10), bytearray(10 ** 6)]) +@pytest.mark.parametrize("obj", [bytearray(10), bytearray(10**6)]) def test_serializing_to_disk(obj): """Check serializing to disk""" @@ -356,7 +357,7 @@ def test_multiple_deserializations(serializer): assert not os.path.exists(file_path) -@pytest.mark.parametrize("size", [10, 10 ** 4]) +@pytest.mark.parametrize("size", [10, 10**4]) @pytest.mark.parametrize( "serializers", [None, ["dask"], ["cuda", "dask"], ["pickle"], ["disk"]] ) @@ -509,6 +510,7 @@ def test_pandas(): def test_from_cudf_of_proxy_object(): """Check from_cudf() of a proxy object""" cudf = pytest.importorskip("cudf") + dask_cudf = pytest.importorskip("dask_cudf") df = proxy_object.asproxy(cudf.DataFrame({"a": range(10)})) assert has_parallel_type(df) @@ -574,15 +576,6 @@ def test_einsum_of_proxied_cupy_arrays(): assert all(res1.flatten() == res2.flatten()) -def test_merge_sorted_of_proxied_cudf_dataframes(): - cudf = pytest.importorskip("cudf") - - dfs = [cudf.DataFrame({"a": range(10)}), cudf.DataFrame({"b": range(10)})] - got = cudf.merge_sorted(proxify_device_objects(dfs, {}, [])) - expected = cudf.merge_sorted(dfs) - assert_frame_equal(got.to_pandas(), expected.to_pandas()) - - @pytest.mark.parametrize( "np_func", [np.less, np.less_equal, np.greater, np.greater_equal, np.equal] ) diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py index 1978a57e..73f211d1 100644 --- a/dask_cuda/tests/test_spill.py +++ b/dask_cuda/tests/test_spill.py @@ -118,7 +118,7 @@ def delayed_worker_assert(total_size, device_chunk_overhead, serialized_chunk_ov }, ], ) -@gen_test(timeout=20) +@gen_test(timeout=30) async def test_cupy_cluster_device_spill(params): cupy = pytest.importorskip("cupy") with dask.config.set({"distributed.worker.memory.terminate": False}): @@ -168,16 +168,16 @@ async def test_cupy_cluster_device_spill(params): "params", [ { - "device_memory_limit": int(200e6), - "memory_limit": int(4000e6), + "device_memory_limit": int(50e6), + "memory_limit": int(1000e6), "host_target": False, "host_spill": False, "host_pause": False, "spills_to_disk": False, }, { - "device_memory_limit": int(200e6), - "memory_limit": int(200e6), + "device_memory_limit": int(50e6), + "memory_limit": int(50e6), "host_target": False, "host_spill": False, "host_pause": False, @@ -186,15 +186,15 @@ async def test_cupy_cluster_device_spill(params): { # This test setup differs from the one above as Distributed worker # pausing is enabled and thus triggers `DeviceHostFile.evict()` - "device_memory_limit": int(200e6), - "memory_limit": int(200e6), + "device_memory_limit": int(50e6), + "memory_limit": int(50e6), "host_target": None, "host_spill": None, "host_pause": False, "spills_to_disk": True, }, { - "device_memory_limit": int(200e6), + "device_memory_limit": int(50e6), "memory_limit": None, "host_target": False, "host_spill": False, @@ -203,7 +203,7 @@ async def test_cupy_cluster_device_spill(params): }, ], ) -@gen_test(timeout=20) +@gen_test(timeout=30) async def test_cudf_cluster_device_spill(params): cudf = pytest.importorskip("cudf") @@ -228,7 +228,7 @@ async def test_cudf_cluster_device_spill(params): # The same error above happens when spilling datetime64 to disk cdf = ( dask.datasets.timeseries( - dtypes={"x": int, "y": float}, freq="100ms" + dtypes={"x": int, "y": float}, freq="400ms" ) .reset_index(drop=True) .map_partitions(cudf.from_pandas) diff --git a/dask_cuda/tests/test_utils.py b/dask_cuda/tests/test_utils.py index 24cc271a..30ce6c4b 100644 --- a/dask_cuda/tests/test_utils.py +++ b/dask_cuda/tests/test_utils.py @@ -82,7 +82,10 @@ def test_get_device_total_memory(): def test_get_preload_options_default(): pytest.importorskip("ucp") - opts = get_preload_options(protocol="ucx", create_cuda_context=True,) + opts = get_preload_options( + protocol="ucx", + create_cuda_context=True, + ) assert "preload" in opts assert opts["preload"] == ["dask_cuda.initialize"] diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 61448482..f5be2b24 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -236,7 +236,8 @@ def get_cpu_affinity(device_index=None): handle = pynvml.nvmlDeviceGetHandleByIndex(device_index) # Result is a list of 64-bit integers, thus ceil(get_cpu_count() / 64) affinity = pynvml.nvmlDeviceGetCpuAffinity( - handle, math.ceil(get_cpu_count() / 64), + handle, + math.ceil(get_cpu_count() / 64), ) return unpack_bitmask(affinity) except pynvml.NVMLError: diff --git a/dask_cuda/worker_spec.py b/dask_cuda/worker_spec.py index 9c93197d..6a61fa8f 100644 --- a/dask_cuda/worker_spec.py +++ b/dask_cuda/worker_spec.py @@ -20,7 +20,7 @@ def worker_spec( enable_nvlink=False, **kwargs ): - """ Create a Spec for a CUDA worker. + """Create a Spec for a CUDA worker. The Spec created by this function can be used as a recipe for CUDA workers that can be passed to a SpecCluster. diff --git a/docs/source/examples/ucx.rst b/docs/source/examples/ucx.rst index a7789fb1..b9a36777 100644 --- a/docs/source/examples/ucx.rst +++ b/docs/source/examples/ucx.rst @@ -49,7 +49,7 @@ To connect a client to a cluster with all supported transports and an RMM pool: client = Client(cluster) dask-cuda-worker with Automatic Configuration ------------------------------------------- +--------------------------------------------- When using ``dask-cuda-worker`` with UCX communication and automatic configuration, the scheduler, workers, and client must all be started manually, but without specifying any UCX transports explicitly. This is only supported in Dask-CUDA 22.02 and newer and requires UCX >= 1.11.1. diff --git a/docs/source/ucx.rst b/docs/source/ucx.rst index 10bdc26d..7746f034 100644 --- a/docs/source/ucx.rst +++ b/docs/source/ucx.rst @@ -20,6 +20,15 @@ When using UCX, each NVLink and InfiniBand memory buffer must create a mapping b For this reason, it is strongly recommended to use `RAPIDS Memory Manager (RMM) `_ to allocate a memory pool that is only prone to a single mapping operation, which all subsequent transfers may rely upon. A memory pool also prevents the Dask scheduler from deserializing CUDA data, which will cause a crash. +.. warning:: + Dask-CUDA must create worker CUDA contexts during cluster initialization, and properly ordering that task is critical for correct UCX configuration. + If a CUDA context already exists for this process at the time of cluster initialization, unexpected behavior can occur. + To avoid this, it is advised to initialize any UCX-enabled clusters before doing operations that would result in a CUDA context being created. + Depending on the library, even an import can force CUDA context creation. + + For some RAPIDS libraries (e.g. cuDF), setting ``RAPIDS_NO_INITIALIZE=1`` at runtime will delay or disable their CUDA context creation, allowing for improved compatibility with UCX-enabled clusters and preventing runtime warnings. + + Configuration ------------- diff --git a/examples/ucx/client_initialize.py b/examples/ucx/client_initialize.py index ffd5935b..e958ebce 100644 --- a/examples/ucx/client_initialize.py +++ b/examples/ucx/client_initialize.py @@ -9,7 +9,9 @@ @click.command(context_settings=dict(ignore_unknown_options=True)) @click.argument( - "address", required=True, type=str, + "address", + required=True, + type=str, ) @click.option( "--enable-nvlink/--disable-nvlink", @@ -27,7 +29,10 @@ help="Enable RDMA connection manager, requires --enable-infiniband", ) def main( - address, enable_nvlink, enable_infiniband, enable_rdmacm, + address, + enable_nvlink, + enable_infiniband, + enable_rdmacm, ): # set up environment diff --git a/examples/ucx/local_cuda_cluster.py b/examples/ucx/local_cuda_cluster.py index c10bb525..ec953bee 100644 --- a/examples/ucx/local_cuda_cluster.py +++ b/examples/ucx/local_cuda_cluster.py @@ -40,7 +40,11 @@ "an integer (bytes) or string (like 5GB or 5000M).", ) def main( - enable_nvlink, enable_infiniband, enable_rdmacm, interface, rmm_pool_size, + enable_nvlink, + enable_infiniband, + enable_rdmacm, + interface, + rmm_pool_size, ): if (enable_infiniband or enable_nvlink) and not interface: diff --git a/requirements.txt b/requirements.txt index a5ec2e40..32325e5a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ -dask==2022.03.0 -distributed==2022.03.0 +dask==2022.05.2 +distributed==2022.05.2 pynvml>=11.0.0 numpy>=1.16.0 -numba>=0.53.1 +numba>=0.54 click==8.0.4