Skip to content

Commit

Permalink
Use local jobs for WDL interpreter (DataBiosphere#4460)
Browse files Browse the repository at this point in the history
* Change CWL internal job system to generic local job system

* Make all WDL jobs except running or attempting to run a task local

* Add a non-downloading file size implementation

* Satisfy MyPy

* Eliminate the old stack and adopt a slightly more controlled successor phase concept

* Appease MyPy

* Drop a lingering use of the old stack

* Get mini tests to pass

* Get WDL workflow to start

* Add debugging to show that jobs are missing from the registry

* Fix ordering to actually look at the right job

* Quiet debugging

* Change log message

* Stop making unwanted changes

* Remove redundant and wrongly-typed check

* Get docker-compose from apt instead of pip where it fights Toil deps

* Ban urllib3 2.0+ until Docker module supports it

* Use docker-compose in the prebake

* Move Slurm tests to earlier stage

* Account for how docker compose plugin names containers

* Revert "Move Slurm tests to earlier stage"

This reverts commit 4b824bb.
  • Loading branch information
adamnovak authored May 5, 2023
1 parent 356261d commit 86665a2
Show file tree
Hide file tree
Showing 18 changed files with 289 additions and 197 deletions.
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ slurm_test:
script:
- pwd
- cd contrib/slurm-test/
- pip install docker-compose
- docker compose version
- ./slurm_test.sh

cwl_v1.0:
Expand Down
1 change: 1 addition & 0 deletions contrib/slurm-test/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# This is a v3 compose file
services:
slurmmaster:
image: rancavil/slurm-master:19.05.5-1
Expand Down
31 changes: 18 additions & 13 deletions contrib/slurm-test/slurm_test.sh
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
#!/bin/bash
set -e
docker-compose up -d
# With the docker compose plugin, containers are named like slurm-test-slurmmaster-1
# If your containers are named like ${LEADER} you have the old docker-compose Python version instead.
# Try running with NAME_SEP=_
NAME_SEP=${CONTAINER_NAME_SEP:--}
LEADER="slurm-test${NAME_SEP}slurmmaster${NAME_SEP}1"
docker compose up -d
docker ps
docker cp toil_workflow.py slurm-test_slurmmaster_1:/home/admin
docker cp -L sort.py slurm-test_slurmmaster_1:/home/admin
docker cp fileToSort.txt slurm-test_slurmmaster_1:/home/admin
docker cp toil_workflow.py slurm-test_slurmmaster_1:/home/admin
docker cp toil_workflow.py ${LEADER}:/home/admin
docker cp -L sort.py ${LEADER}:/home/admin
docker cp fileToSort.txt ${LEADER}:/home/admin
docker cp toil_workflow.py ${LEADER}:/home/admin
GIT_COMMIT=$(git rev-parse HEAD)
docker exec slurm-test_slurmmaster_1 sudo apt install python3-pip -y
docker exec slurm-test_slurmmaster_1 pip3 install "git+https://github.com/DataBiosphere/toil.git@${GIT_COMMIT}"
docker exec slurm-test_slurmmaster_1 sinfo -N -l
docker exec ${LEADER} sudo apt install python3-pip -y
docker exec ${LEADER} pip3 install "git+https://github.com/DataBiosphere/toil.git@${GIT_COMMIT}"
docker exec ${LEADER} sinfo -N -l
# Test 1: A really basic workflow to check Slurm is working correctly
docker exec slurm-test_slurmmaster_1 python3 /home/admin/toil_workflow.py file:my-job-store --batchSystem slurm --disableCaching --retryCount 0
docker cp slurm-test_slurmmaster_1:/home/admin/output.txt output_Docker.txt
docker exec ${LEADER} python3 /home/admin/toil_workflow.py file:my-job-store --batchSystem slurm --disableCaching --retryCount 0
docker cp ${LEADER}:/home/admin/output.txt output_Docker.txt
# Test 2: Make sure that "sort" workflow runs under slurm
docker exec slurm-test_slurmmaster_1 python3 /home/admin/sort.py file:my-job-store --batchSystem slurm --disableCaching --retryCount 0
docker cp slurm-test_slurmmaster_1:/home/admin/sortedFile.txt sortedFile.txt
docker-compose stop
docker exec ${LEADER} python3 /home/admin/sort.py file:my-job-store --batchSystem slurm --disableCaching --retryCount 0
docker cp ${LEADER}:/home/admin/sortedFile.txt sortedFile.txt
docker compose stop
./check_out.sh
rm sort.py
echo "Sucessfully ran workflow on slurm cluster"
3 changes: 2 additions & 1 deletion contrib/toil-ci-prebake/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ ENV DEBIAN_FRONTEND=noninteractive

RUN mkdir -p ~/.docker/cli-plugins/
RUN curl -L https://github.com/docker/buildx/releases/download/v0.6.3/buildx-v0.6.3.linux-amd64 > ~/.docker/cli-plugins/docker-buildx
RUN chmod u+x ~/.docker/cli-plugins/docker-buildx
RUN curl -L https://github.com/docker/compose/releases/download/v2.17.2/docker-compose-linux-x86_64 > ~/.docker/cli-plugins/docker-compose
RUN chmod u+x ~/.docker/cli-plugins/*

RUN apt-get -q -y update && \
apt-get -q -y upgrade && \
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
dill>=0.3.2, <0.4
requests>=2, <3
docker>=3.7.2, <6
# Work around https://github.com/docker/docker-py/issues/3113
urllib3>=1.26.0, <2.0.0
python-dateutil
psutil >= 3.0.1, <6
py-tes>=0.4.2,<1
Expand Down
7 changes: 3 additions & 4 deletions src/toil/batchSystems/local_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
UpdatedBatchJobInfo)
from toil.batchSystems.singleMachine import SingleMachineBatchSystem
from toil.common import Config
from toil.cwl.utils import CWL_INTERNAL_JOBS
from toil.job import JobDescription

logger = logging.getLogger(__name__)
Expand All @@ -40,11 +39,11 @@ def handleLocalJob(self, jobDesc: JobDescription) -> Optional[int]:
Returns the jobID if the jobDesc has been submitted to the local queue,
otherwise returns None
"""
if (not self.config.runCwlInternalJobsOnWorkers
and jobDesc.jobName.startswith(CWL_INTERNAL_JOBS)):
if (not self.config.run_local_jobs_on_workers
and jobDesc.local):
# Since singleMachine.py doesn't typecheck yet and MyPy is ignoring
# it, it will raise errors here unless we add type annotations to
# everything we get back from it. THe easiest way to do that seems
# everything we get back from it. The easiest way to do that seems
# to be to put it in a variable with a type annotation on it. That
# somehow doesn't error whereas just returning the value complains
# we're returning an Any. TODO: When singleMachine.py typechecks,
Expand Down
11 changes: 6 additions & 5 deletions src/toil/batchSystems/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def set_batchsystem_options(batch_system: Optional[str], set_option: OptionSette
set_option("coalesceStatusCalls")
set_option("maxLocalJobs", int)
set_option("manualMemArgs")
set_option("runCwlInternalJobsOnWorkers", bool, default=False)
set_option("run_local_jobs_on_workers", bool, default=False)
set_option("statePollingWait")
set_option("batch_logs_dir", env=["TOIL_BATCH_LOGS_DIR"])

Expand Down Expand Up @@ -124,13 +124,14 @@ def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) -
"Requires that TOIL_GRIDGENGINE_ARGS be set.",
)
parser.add_argument(
"--runLocalJobsOnWorkers"
"--runCwlInternalJobsOnWorkers",
dest="runCwlInternalJobsOnWorkers",
dest="run_local_jobs_on_workers",
action="store_true",
default=None,
help="Whether to run CWL internal jobs (e.g. CWLScatter) on the worker nodes "
"instead of the primary node. If false (default), then all such jobs are run on "
"the primary node. Setting this to true can speed up the pipeline for very large "
help="Whether to run jobs marked as local (e.g. CWLScatter) on the worker nodes "
"instead of the leader node. If false (default), then all such jobs are run on "
"the leader node. Setting this to true can speed up CWL pipelines for very large "
"workflows with many sub-workflows and/or scatters, provided that the worker "
"pool is large enough.",
)
Expand Down
3 changes: 1 addition & 2 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class Config:
logRotating: bool
cleanWorkDir: str
maxLocalJobs: int
runCwlInternalJobsOnWorkers: bool
run_local_jobs_on_workers: bool
tes_endpoint: str
tes_user: str
tes_password: str
Expand Down Expand Up @@ -438,7 +438,6 @@ def check_nodestoreage_overrides(overrides: List[str]) -> bool:
set_option("environment", parseSetEnv)
set_option("disableChaining")
set_option("disableJobStoreChecksumVerification")
set_option("runCwlInternalJobsOnWorkers")
set_option("statusWait", int)
set_option("disableProgress")

Expand Down
15 changes: 10 additions & 5 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -1863,6 +1863,7 @@ def __init__(
tool_id: Optional[str] = None,
parent_name: Optional[str] = None,
subjob_name: Optional[str] = None,
local: Optional[bool] = None,
) -> None:
"""
Make a new job and set up its requirements and naming.
Expand Down Expand Up @@ -1905,6 +1906,7 @@ def __init__(
accelerators=accelerators,
unitName=unit_name,
displayName=display_name,
local=local,
)


Expand All @@ -1918,7 +1920,7 @@ class ResolveIndirect(CWLNamedJob):

def __init__(self, cwljob: Promised[CWLObjectType], parent_name: Optional[str] = None):
"""Store the dictionary of promises for later resolution."""
super().__init__(parent_name=parent_name, subjob_name="_resolve")
super().__init__(parent_name=parent_name, subjob_name="_resolve", local=True)
self.cwljob = cwljob

def run(self, file_store: AbstractFileStore) -> CWLObjectType:
Expand Down Expand Up @@ -2094,7 +2096,10 @@ def __init__(
):
"""Store our context for later evaluation."""
super().__init__(
tool_id=tool.tool.get("id"), parent_name=parent_name, subjob_name="_wrapper"
tool_id=tool.tool.get("id"),
parent_name=parent_name,
subjob_name="_wrapper",
local=True,
)
self.cwltool = remove_pickle_problems(tool)
self.cwljob = cwljob
Expand Down Expand Up @@ -2482,7 +2487,7 @@ def __init__(
conditional: Union[Conditional, None],
):
"""Store our context for later execution."""
super().__init__(cores=1, memory="1GiB", disk="1MiB")
super().__init__(cores=1, memory="1GiB", disk="1MiB", local=True)
self.step = step
self.cwljob = cwljob
self.runtime_context = runtime_context
Expand Down Expand Up @@ -2642,7 +2647,7 @@ def __init__(
outputs: Promised[Union[CWLObjectType, List[CWLObjectType]]],
):
"""Collect our context for later gathering."""
super().__init__(cores=1, memory="1GiB", disk="1MiB")
super().__init__(cores=1, memory="1GiB", disk="1MiB", local=True)
self.step = step
self.outputs = outputs

Expand Down Expand Up @@ -2743,7 +2748,7 @@ def __init__(
conditional: Union[Conditional, None] = None,
):
"""Gather our context for later execution."""
super().__init__(tool_id=cwlwf.tool.get("id"), parent_name=parent_name)
super().__init__(tool_id=cwlwf.tool.get("id"), parent_name=parent_name, local=True)
self.cwlwf = cwlwf
self.cwljob = cwljob
self.runtime_context = runtime_context
Expand Down
10 changes: 0 additions & 10 deletions src/toil/cwl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@

# Customized CWL utilities

# Define internal jobs we should avoid submitting to batch systems and logging
CWL_INTERNAL_JOBS: Tuple[str, ...] = (
"CWLJobWrapper",
"CWLWorkflow",
"CWLScatter",
"CWLGather",
"ResolveIndirect",
)


# What exit code do we need to bail with if we or any of the local jobs that
# parse workflow files see an unsupported feature?
CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE = 33
Expand Down
Loading

0 comments on commit 86665a2

Please sign in to comment.