From 86665a2713d86f331137d2e5b6ceefbee6d149e9 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Fri, 5 May 2023 10:24:26 -0700 Subject: [PATCH] Use local jobs for WDL interpreter (#4460) * Change CWL internal job system to generic local job system * Make all WDL jobs except running or attempting to run a task local * Add a non-downloading file size implementation * Satisfy MyPy * Eliminate the old stack and adopt a slightly more controlled successor phase concept * Appease MyPy * Drop a lingering use of the old stack * Get mini tests to pass * Get WDL workflow to start * Add debugging to show that jobs are missing from the registry * Fix ordering to actually look at the right job * Quiet debugging * Change log message * Stop making unwanted changes * Remove redundant and wrongly-typed check * Get docker-compose from apt instead of pip where it fights Toil deps * Ban urllib3 2.0+ until Docker module supports it * Use docker-compose in the prebake * Move Slurm tests to earlier stage * Account for how docker compose plugin names containers * Revert "Move Slurm tests to earlier stage" This reverts commit 4b824bb9c891fa8801d6433221a74e4e214daad8. --- .gitlab-ci.yml | 2 +- contrib/slurm-test/docker-compose.yml | 1 + contrib/slurm-test/slurm_test.sh | 31 +++-- contrib/toil-ci-prebake/Dockerfile | 3 +- requirements.txt | 2 + src/toil/batchSystems/local_support.py | 7 +- src/toil/batchSystems/options.py | 11 +- src/toil/common.py | 3 +- src/toil/cwl/cwltoil.py | 15 ++- src/toil/cwl/utils.py | 10 -- src/toil/job.py | 163 +++++++++++++++--------- src/toil/jobStores/abstractJobStore.py | 31 +++-- src/toil/leader.py | 75 ++++++----- src/toil/test/jobStores/jobStoreTest.py | 4 +- src/toil/test/src/jobDescriptionTest.py | 1 - src/toil/utils/toilStatus.py | 35 +++-- src/toil/wdl/wdltoil.py | 61 ++++++++- src/toil/worker.py | 31 ++--- 18 files changed, 289 insertions(+), 197 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index d19c812227..f43fcac1c8 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -164,7 +164,7 @@ slurm_test: script: - pwd - cd contrib/slurm-test/ - - pip install docker-compose + - docker compose version - ./slurm_test.sh cwl_v1.0: diff --git a/contrib/slurm-test/docker-compose.yml b/contrib/slurm-test/docker-compose.yml index 0cedd9f916..62ac112662 100644 --- a/contrib/slurm-test/docker-compose.yml +++ b/contrib/slurm-test/docker-compose.yml @@ -1,3 +1,4 @@ +# This is a v3 compose file services: slurmmaster: image: rancavil/slurm-master:19.05.5-1 diff --git a/contrib/slurm-test/slurm_test.sh b/contrib/slurm-test/slurm_test.sh index 3cbdc8e5ed..03aca6cc5f 100755 --- a/contrib/slurm-test/slurm_test.sh +++ b/contrib/slurm-test/slurm_test.sh @@ -1,22 +1,27 @@ #!/bin/bash set -e -docker-compose up -d +# With the docker compose plugin, containers are named like slurm-test-slurmmaster-1 +# If your containers are named like ${LEADER} you have the old docker-compose Python version instead. +# Try running with NAME_SEP=_ +NAME_SEP=${CONTAINER_NAME_SEP:--} +LEADER="slurm-test${NAME_SEP}slurmmaster${NAME_SEP}1" +docker compose up -d docker ps -docker cp toil_workflow.py slurm-test_slurmmaster_1:/home/admin -docker cp -L sort.py slurm-test_slurmmaster_1:/home/admin -docker cp fileToSort.txt slurm-test_slurmmaster_1:/home/admin -docker cp toil_workflow.py slurm-test_slurmmaster_1:/home/admin +docker cp toil_workflow.py ${LEADER}:/home/admin +docker cp -L sort.py ${LEADER}:/home/admin +docker cp fileToSort.txt ${LEADER}:/home/admin +docker cp toil_workflow.py ${LEADER}:/home/admin GIT_COMMIT=$(git rev-parse HEAD) -docker exec slurm-test_slurmmaster_1 sudo apt install python3-pip -y -docker exec slurm-test_slurmmaster_1 pip3 install "git+https://github.com/DataBiosphere/toil.git@${GIT_COMMIT}" -docker exec slurm-test_slurmmaster_1 sinfo -N -l +docker exec ${LEADER} sudo apt install python3-pip -y +docker exec ${LEADER} pip3 install "git+https://github.com/DataBiosphere/toil.git@${GIT_COMMIT}" +docker exec ${LEADER} sinfo -N -l # Test 1: A really basic workflow to check Slurm is working correctly -docker exec slurm-test_slurmmaster_1 python3 /home/admin/toil_workflow.py file:my-job-store --batchSystem slurm --disableCaching --retryCount 0 -docker cp slurm-test_slurmmaster_1:/home/admin/output.txt output_Docker.txt +docker exec ${LEADER} python3 /home/admin/toil_workflow.py file:my-job-store --batchSystem slurm --disableCaching --retryCount 0 +docker cp ${LEADER}:/home/admin/output.txt output_Docker.txt # Test 2: Make sure that "sort" workflow runs under slurm -docker exec slurm-test_slurmmaster_1 python3 /home/admin/sort.py file:my-job-store --batchSystem slurm --disableCaching --retryCount 0 -docker cp slurm-test_slurmmaster_1:/home/admin/sortedFile.txt sortedFile.txt -docker-compose stop +docker exec ${LEADER} python3 /home/admin/sort.py file:my-job-store --batchSystem slurm --disableCaching --retryCount 0 +docker cp ${LEADER}:/home/admin/sortedFile.txt sortedFile.txt +docker compose stop ./check_out.sh rm sort.py echo "Sucessfully ran workflow on slurm cluster" diff --git a/contrib/toil-ci-prebake/Dockerfile b/contrib/toil-ci-prebake/Dockerfile index 4ea6196c80..4fb4b4f583 100644 --- a/contrib/toil-ci-prebake/Dockerfile +++ b/contrib/toil-ci-prebake/Dockerfile @@ -18,7 +18,8 @@ ENV DEBIAN_FRONTEND=noninteractive RUN mkdir -p ~/.docker/cli-plugins/ RUN curl -L https://github.com/docker/buildx/releases/download/v0.6.3/buildx-v0.6.3.linux-amd64 > ~/.docker/cli-plugins/docker-buildx -RUN chmod u+x ~/.docker/cli-plugins/docker-buildx +RUN curl -L https://github.com/docker/compose/releases/download/v2.17.2/docker-compose-linux-x86_64 > ~/.docker/cli-plugins/docker-compose +RUN chmod u+x ~/.docker/cli-plugins/* RUN apt-get -q -y update && \ apt-get -q -y upgrade && \ diff --git a/requirements.txt b/requirements.txt index 0e4b9edbf6..bb0a557880 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ dill>=0.3.2, <0.4 requests>=2, <3 docker>=3.7.2, <6 +# Work around https://github.com/docker/docker-py/issues/3113 +urllib3>=1.26.0, <2.0.0 python-dateutil psutil >= 3.0.1, <6 py-tes>=0.4.2,<1 diff --git a/src/toil/batchSystems/local_support.py b/src/toil/batchSystems/local_support.py index 9cf3b83c4d..9f2494e049 100644 --- a/src/toil/batchSystems/local_support.py +++ b/src/toil/batchSystems/local_support.py @@ -18,7 +18,6 @@ UpdatedBatchJobInfo) from toil.batchSystems.singleMachine import SingleMachineBatchSystem from toil.common import Config -from toil.cwl.utils import CWL_INTERNAL_JOBS from toil.job import JobDescription logger = logging.getLogger(__name__) @@ -40,11 +39,11 @@ def handleLocalJob(self, jobDesc: JobDescription) -> Optional[int]: Returns the jobID if the jobDesc has been submitted to the local queue, otherwise returns None """ - if (not self.config.runCwlInternalJobsOnWorkers - and jobDesc.jobName.startswith(CWL_INTERNAL_JOBS)): + if (not self.config.run_local_jobs_on_workers + and jobDesc.local): # Since singleMachine.py doesn't typecheck yet and MyPy is ignoring # it, it will raise errors here unless we add type annotations to - # everything we get back from it. THe easiest way to do that seems + # everything we get back from it. The easiest way to do that seems # to be to put it in a variable with a type annotation on it. That # somehow doesn't error whereas just returning the value complains # we're returning an Any. TODO: When singleMachine.py typechecks, diff --git a/src/toil/batchSystems/options.py b/src/toil/batchSystems/options.py index 0fd7cfb334..b9bff796ea 100644 --- a/src/toil/batchSystems/options.py +++ b/src/toil/batchSystems/options.py @@ -74,7 +74,7 @@ def set_batchsystem_options(batch_system: Optional[str], set_option: OptionSette set_option("coalesceStatusCalls") set_option("maxLocalJobs", int) set_option("manualMemArgs") - set_option("runCwlInternalJobsOnWorkers", bool, default=False) + set_option("run_local_jobs_on_workers", bool, default=False) set_option("statePollingWait") set_option("batch_logs_dir", env=["TOIL_BATCH_LOGS_DIR"]) @@ -124,13 +124,14 @@ def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) - "Requires that TOIL_GRIDGENGINE_ARGS be set.", ) parser.add_argument( + "--runLocalJobsOnWorkers" "--runCwlInternalJobsOnWorkers", - dest="runCwlInternalJobsOnWorkers", + dest="run_local_jobs_on_workers", action="store_true", default=None, - help="Whether to run CWL internal jobs (e.g. CWLScatter) on the worker nodes " - "instead of the primary node. If false (default), then all such jobs are run on " - "the primary node. Setting this to true can speed up the pipeline for very large " + help="Whether to run jobs marked as local (e.g. CWLScatter) on the worker nodes " + "instead of the leader node. If false (default), then all such jobs are run on " + "the leader node. Setting this to true can speed up CWL pipelines for very large " "workflows with many sub-workflows and/or scatters, provided that the worker " "pool is large enough.", ) diff --git a/src/toil/common.py b/src/toil/common.py index d4a8c73c20..3be1378d2b 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -109,7 +109,7 @@ class Config: logRotating: bool cleanWorkDir: str maxLocalJobs: int - runCwlInternalJobsOnWorkers: bool + run_local_jobs_on_workers: bool tes_endpoint: str tes_user: str tes_password: str @@ -438,7 +438,6 @@ def check_nodestoreage_overrides(overrides: List[str]) -> bool: set_option("environment", parseSetEnv) set_option("disableChaining") set_option("disableJobStoreChecksumVerification") - set_option("runCwlInternalJobsOnWorkers") set_option("statusWait", int) set_option("disableProgress") diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index e8afba6fff..daffb27cc7 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -1863,6 +1863,7 @@ def __init__( tool_id: Optional[str] = None, parent_name: Optional[str] = None, subjob_name: Optional[str] = None, + local: Optional[bool] = None, ) -> None: """ Make a new job and set up its requirements and naming. @@ -1905,6 +1906,7 @@ def __init__( accelerators=accelerators, unitName=unit_name, displayName=display_name, + local=local, ) @@ -1918,7 +1920,7 @@ class ResolveIndirect(CWLNamedJob): def __init__(self, cwljob: Promised[CWLObjectType], parent_name: Optional[str] = None): """Store the dictionary of promises for later resolution.""" - super().__init__(parent_name=parent_name, subjob_name="_resolve") + super().__init__(parent_name=parent_name, subjob_name="_resolve", local=True) self.cwljob = cwljob def run(self, file_store: AbstractFileStore) -> CWLObjectType: @@ -2094,7 +2096,10 @@ def __init__( ): """Store our context for later evaluation.""" super().__init__( - tool_id=tool.tool.get("id"), parent_name=parent_name, subjob_name="_wrapper" + tool_id=tool.tool.get("id"), + parent_name=parent_name, + subjob_name="_wrapper", + local=True, ) self.cwltool = remove_pickle_problems(tool) self.cwljob = cwljob @@ -2482,7 +2487,7 @@ def __init__( conditional: Union[Conditional, None], ): """Store our context for later execution.""" - super().__init__(cores=1, memory="1GiB", disk="1MiB") + super().__init__(cores=1, memory="1GiB", disk="1MiB", local=True) self.step = step self.cwljob = cwljob self.runtime_context = runtime_context @@ -2642,7 +2647,7 @@ def __init__( outputs: Promised[Union[CWLObjectType, List[CWLObjectType]]], ): """Collect our context for later gathering.""" - super().__init__(cores=1, memory="1GiB", disk="1MiB") + super().__init__(cores=1, memory="1GiB", disk="1MiB", local=True) self.step = step self.outputs = outputs @@ -2743,7 +2748,7 @@ def __init__( conditional: Union[Conditional, None] = None, ): """Gather our context for later execution.""" - super().__init__(tool_id=cwlwf.tool.get("id"), parent_name=parent_name) + super().__init__(tool_id=cwlwf.tool.get("id"), parent_name=parent_name, local=True) self.cwlwf = cwlwf self.cwljob = cwljob self.runtime_context = runtime_context diff --git a/src/toil/cwl/utils.py b/src/toil/cwl/utils.py index 37ad82576a..9058a84688 100644 --- a/src/toil/cwl/utils.py +++ b/src/toil/cwl/utils.py @@ -37,16 +37,6 @@ # Customized CWL utilities -# Define internal jobs we should avoid submitting to batch systems and logging -CWL_INTERNAL_JOBS: Tuple[str, ...] = ( - "CWLJobWrapper", - "CWLWorkflow", - "CWLScatter", - "CWLGather", - "ResolveIndirect", -) - - # What exit code do we need to bail with if we or any of the local jobs that # parse workflow files see an unsupported feature? CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE = 33 diff --git a/src/toil/job.py b/src/toil/job.py index b2934227ac..184b504265 100644 --- a/src/toil/job.py +++ b/src/toil/job.py @@ -323,7 +323,7 @@ def accelerators_fully_satisfy(candidates: Optional[List[AcceleratorRequirement] Determine if a set of accelerators satisfy a requirement. Ignores fields specified in ignore. - + :returns: True if the requirement AcceleratorRequirement is fully satisfied by the ones in the list, taken together (i.e. check all fields including count). @@ -731,6 +731,7 @@ def __init__( unitName: str = "", displayName: str = "", command: Optional[str] = None, + local: Optional[bool] = None ) -> None: """ Create a new JobDescription. @@ -750,10 +751,16 @@ def __init__( :param displayName: A human-readable name to identify this particular job instance. Ought to be the job class's name if no real user-defined name is available. + :param local: If True, the job is meant to use minimal resources but is + sensitive to execution latency, and so should be executed by the + leader. """ # Set requirements super().__init__(requirements) + # Set local-ness flag, which is not (yet?) a requirement + self.local: bool = local or False + # Save names, making sure they are strings and not e.g. bytes. def makeString(x: Union[str, bytes]) -> str: return x if not isinstance(x, bytes) else x.decode('utf-8', errors='replace') @@ -819,7 +826,13 @@ def makeString(x: Union[str, bytes]) -> str: # The IDs of all follow-on jobs of the described job. # Follow-ons which are done must be removed with filterSuccessors. - self.followOnIDs = set() + self.followOnIDs: Set[str] = set() + + # We keep our own children and follow-ons in a list of successor + # phases, along with any successors adopted from jobs we have chained + # from. When we finish our own children and follow-ons, we may have to + # go back and finish successors for those jobs. + self.successor_phases: List[Set[str]] = [self.followOnIDs, self.childIDs] # Dict from ServiceHostJob ID to list of child ServiceHostJobs that start after it. # All services must have an entry, if only to an empty list. @@ -867,11 +880,30 @@ def serviceHostIDsInBatches(self) -> Iterator[List[str]]: def successorsAndServiceHosts(self) -> Iterator[str]: """Get an iterator over all child, follow-on, and service job IDs.""" - return itertools.chain(self.childIDs, self.followOnIDs, self.serviceTree.keys()) - def allSuccessors(self): - """Get an iterator over all child and follow-on job IDs.""" - return itertools.chain(self.childIDs, self.followOnIDs) + return itertools.chain(self.allSuccessors(), self.serviceTree.keys()) + + def allSuccessors(self) -> Iterator[str]: + """ + Get an iterator over all child, follow-on, and chained, inherited successor job IDs. + + Follow-ons will come before children. + """ + + for phase in self.successor_phases: + for successor in phase: + yield successor + + def successors_by_phase(self) -> Iterator[Tuple[int, str]]: + """ + Get an iterator over all child/follow-on/chained inherited successor job IDs, along with their phase numbere on the stack. + + Phases ececute higher numbers to lower numbers. + """ + + for i, phase in enumerate(self.successor_phases): + for successor in phase: + yield i, successor @property def services(self): @@ -882,7 +914,7 @@ def services(self): """ return list(self.serviceTree.keys()) - def nextSuccessors(self) -> List[str]: + def nextSuccessors(self) -> Set[str]: """ Return the collection of job IDs for the successors of this job that are ready to run. @@ -896,44 +928,14 @@ def nextSuccessors(self) -> List[str]: if self.command is not None: # We ourselves need to run. So there's not nothing to do # but no successors are ready. - return [] - elif len(self.childIDs) != 0: - # Our children need to run - return self.childIDs - elif len(self.followOnIDs) != 0: - # Our follow-ons need to run - return self.followOnIDs + return set() else: - # Everything is done. - return None - - @property - def stack(self) -> Tuple[Tuple[str, ...], ...]: - """ - Get IDs of successors that need to run still. - - Batches of successors are in reverse order of the order they need to run in. - - Some successors in each batch may have already been finished. Batches may be empty. - - Exists so that code that used the old stack list immutably can work - still. New development should use nextSuccessors(), and all mutations - should use filterSuccessors() (which automatically removes completed - phases). - - :return: Batches of successors that still need to run, in reverse - order. An empty batch may exist under a non-empty batch, or at the top - when the job itself is not done. - :rtype: tuple(tuple(str)) - """ - result = [] - if self.command is not None or len(self.childIDs) != 0 or len(self.followOnIDs) != 0: - # Follow-ons haven't all finished yet - result.append(tuple(self.followOnIDs)) - if self.command is not None or len(self.childIDs) != 0: - # Children haven't all finished yet - result.append(tuple(self.childIDs)) - return tuple(result) + for phase in reversed(self.successor_phases): + if len(phase) > 0: + # Rightmost phase that isn't empty + return phase + # If no phase isn't empty, we're done. + return None def filterSuccessors(self, predicate: Callable[[str], bool]) -> None: """ @@ -943,8 +945,12 @@ def filterSuccessors(self, predicate: Callable[[str], bool]) -> None: Treats all other successors as complete and forgets them. """ - self.childIDs = {x for x in self.childIDs if predicate(x)} - self.followOnIDs = {x for x in self.followOnIDs if predicate(x)} + + for phase in self.successor_phases: + for successor_id in list(phase): + if not predicate(successor_id): + phase.remove(successor_id) + self.successor_phases = [p for p in self.successor_phases if len(p) > 0] def filterServiceHosts(self, predicate: Callable[[str], bool]) -> None: """ @@ -978,9 +984,10 @@ def clear_nonexistent_dependents(self, job_store: "AbstractJobStore") -> None: self.filterServiceHosts(predicate) def clear_dependents(self) -> None: - """Remove all references to child, follow-on, and associated service jobs.""" + """Remove all references to successor and service jobs.""" self.childIDs = set() self.followOnIDs = set() + self.successor_phases = [self.followOnIDs, self.childIDs] self.serviceTree = {} def is_subtree_done(self) -> bool: @@ -1000,10 +1007,22 @@ def replace(self, other: "JobDescription") -> None: Useful for chaining jobs: the chained-to job can replace the parent job. - Merges cleanup state from the job being replaced into this one. + Merges cleanup state and successors other than this job from the job + being replaced into this one. :param other: Job description to replace. """ + + # TODO: We can't join the job graphs with Job._jobGraphsJoined, is that a problem? + + # Take all the successors other than this one + old_phases = [{i for i in p if i != self.jobStoreID} for p in other.successor_phases] + # And drop empty phases + old_phases = [p for p in old_phases if len(p) > 0] + # And put in front of our existing phases + logger.debug('%s is adopting successor phases from %s of: %s', self, other, old_phases) + self.successor_phases = old_phases + self.successor_phases + # TODO: also be able to take on the successors of the other job, under # ours on the stack, somehow. @@ -1058,8 +1077,13 @@ def renameReferences(self, renames: Dict[TemporaryID, str]) -> None: :param renames: Rename operations to apply. """ - self.childIDs = {renames.get(old, old) for old in self.childIDs} - self.followOnIDs = {renames.get(old, old) for old in self.followOnIDs} + for phase in self.successor_phases: + items = list(phase) + for item in items: + if isinstance(item, TemporaryID) and item in renames: + # Replace each renamed item one at a time to preserve set identity + phase.remove(item) + phase.add(renames[item]) self.serviceTree = {renames.get(parent, parent): [renames.get(child, child) for child in children] for parent, children in self.serviceTree.items()} @@ -1281,7 +1305,8 @@ def restartCheckpoint(self, jobStore: "AbstractJobStore") -> List[str]: """ assert self.checkpoint is not None successorsDeleted = [] - if self.childIDs or self.followOnIDs or self.serviceTree or self.command is not None: + all_successors = list(self.allSuccessors()) + if len(all_successors) > 0 or self.serviceTree or self.command is not None: if self.command is not None: assert self.command == self.checkpoint logger.debug("Checkpoint job already has command set to run") @@ -1291,10 +1316,10 @@ def restartCheckpoint(self, jobStore: "AbstractJobStore") -> List[str]: jobStore.update_job(self) # Update immediately to ensure that checkpoint # is made before deleting any remaining successors - if self.childIDs or self.followOnIDs or self.serviceTree: + if len(all_successors) > 0 or self.serviceTree: # If the subtree of successors is not complete restart everything - logger.debug("Checkpoint job has unfinished successor jobs, deleting children: %s, followOns: %s, services: %s " % - (self.childIDs, self.followOnIDs, self.serviceTree.keys())) + logger.debug("Checkpoint job has unfinished successor jobs, deleting successors: %s, services: %s " % + (all_successors, self.serviceTree.keys())) # Delete everything on the stack, as these represent successors to clean # up as we restart the queue @@ -1337,6 +1362,7 @@ def __init__( checkpoint: Optional[bool] = False, displayName: Optional[str] = "", descriptionClass: Optional[type] = None, + local: Optional[bool] = None, ) -> None: """ Job initializer. @@ -1356,6 +1382,7 @@ def __init__( :func:`toil.job.Job.checkNewCheckpointsAreCutVertices`. :param displayName: Human-readable job type display name. :param descriptionClass: Override for the JobDescription class used to describe the job. + :param local: if the job can be run on the leader. :type memory: int or string convertible by toil.lib.conversions.human2bytes to an int :type cores: float, int, or string convertible by toil.lib.conversions.human2bytes to an int @@ -1389,7 +1416,13 @@ def __init__( # Create the JobDescription that owns all the scheduling information. # Make it with a temporary ID until we can be assigned a real one by # the JobStore. - self._description = descriptionClass(requirements, jobName, unitName=unitName, displayName=displayName) + self._description = descriptionClass( + requirements, + jobName, + unitName=unitName, + displayName=displayName, + local=local + ) # Private class variables needed to actually execute a job, in the worker. # Also needed for setting up job graph structures before saving to the JobStore. @@ -1398,6 +1431,7 @@ def __init__( # Will be shared among all jobs in a disconnected piece of the job # graph that hasn't been registered with a JobStore yet. # Make sure to initially register ourselves. + # After real IDs are assigned, this maps from real ID to the job objects. self._registry = {self._description.jobStoreID: self} # Job relationships are all stored exactly once in the JobDescription. @@ -1977,12 +2011,14 @@ def _getImpliedEdges(roots) -> Dict["Job", List["Job"]]: #connected by an implied edge extraEdges = {n: [] for n in nodes} for job in nodes: - for depth in range(1, len(job.description.stack)): + # Get all the nonempty successor phases + phases = [p for p in job.description.successor_phases if len(p) > 0] + for depth in range(1, len(phases)): # Add edges from all jobs in the earlier/upper subtrees to all # the roots of the later/lower subtrees - upper = job.description.stack[depth] - lower = job.description.stack[depth - 1] + upper = phases[depth] + lower = phases[depth - 1] # Find everything in the upper subtree reacheable = set() @@ -2339,7 +2375,7 @@ def getTopologicalOrderingOfJobs(self) -> List["Job"]: visited.add(job.jobStoreID) ordering.append(job) - for otherID in itertools.chain(job.description.followOnIDs, job.description.childIDs): + for otherID in job.description.allSuccessors(): if otherID in self._registry: # Stack up descendants so we process children and then follow-ons. # So stack up follow-ons deeper @@ -2511,12 +2547,19 @@ def _saveJobGraph(self, jobStore: "AbstractJobStore", saveSelf: bool = False, re # Set up to save last job first, so promises flow the right way ordering.reverse() - logger.info("Saving graph of %d jobs, %d new", len(allJobs), len(fakeToReal)) + logger.info("Saving graph of %d jobs, %d non-service, %d new", len(allJobs), len(ordering), len(fakeToReal)) # Make sure we're the root assert ordering[-1] == self # Don't verify the ordering length: it excludes service host jobs. + ordered_ids = {o.jobStoreID for o in ordering} + for j in allJobs: + # But do ensure all non-service-host jobs are in it. + if not isinstance(j, ServiceHostJob) and j.jobStoreID not in ordered_ids: + raise RuntimeError(f"{j} not found in ordering {ordering}") + + if not saveSelf: # Fulfil promises for return values (even if value is None) diff --git a/src/toil/jobStores/abstractJobStore.py b/src/toil/jobStores/abstractJobStore.py index a597f899a4..c2fd12305f 100644 --- a/src/toil/jobStores/abstractJobStore.py +++ b/src/toil/jobStores/abstractJobStore.py @@ -776,21 +776,20 @@ def get_jobs_reachable_from_root() -> Set[str]: while unprocessed_job_descriptions: new_job_descriptions_to_process = [] # Reset. for job_description in unprocessed_job_descriptions: - for jobs in job_description.stack: - for successor_jobstore_id in jobs: - if successor_jobstore_id not in reachable_from_root and haveJob(successor_jobstore_id): - successor_job_description = getJobDescription(successor_jobstore_id) - - # Add each successor job. - reachable_from_root.add( - str(successor_job_description.jobStoreID) - ) - # Add all of the successor's linked service jobs as well. - for service_jobstore_id in successor_job_description.services: - if haveJob(service_jobstore_id): - reachable_from_root.add(service_jobstore_id) - - new_job_descriptions_to_process.append(successor_job_description) + for successor_jobstore_id in job_description.allSuccessors(): + if successor_jobstore_id not in reachable_from_root and haveJob(successor_jobstore_id): + successor_job_description = getJobDescription(successor_jobstore_id) + + # Add each successor job. + reachable_from_root.add( + str(successor_job_description.jobStoreID) + ) + # Add all of the successor's linked service jobs as well. + for service_jobstore_id in successor_job_description.services: + if haveJob(service_jobstore_id): + reachable_from_root.add(service_jobstore_id) + + new_job_descriptions_to_process.append(successor_job_description) unprocessed_job_descriptions = new_job_descriptions_to_process logger.debug(f"{len(reachable_from_root)} jobs reachable from root.") @@ -854,7 +853,7 @@ def get_jobs_reachable_from_root() -> Set[str]: if jobDescription.command is None: def stackSizeFn() -> int: - return sum(map(len, jobDescription.stack)) + return len(list(jobDescription.allSuccessors())) startStackSize = stackSizeFn() # Remove deleted jobs jobDescription.filterSuccessors(haveJob) diff --git a/src/toil/leader.py b/src/toil/leader.py index 7f5e9dd725..e3bfcf97ed 100644 --- a/src/toil/leader.py +++ b/src/toil/leader.py @@ -37,8 +37,7 @@ JobUpdatedMessage, QueueSizeMessage) from toil.common import Config, Toil, ToilMetrics -from toil.cwl.utils import (CWL_INTERNAL_JOBS, - CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE) +from toil.cwl.utils import CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE from toil.job import (CheckpointJobDescription, JobDescription, ServiceJobDescription, @@ -307,7 +306,7 @@ def _handledFailedSuccessor(self, successor_id: str, predecessor_id: str) -> boo :param successor_id: The successor which has failed. :param predecessor_id: The job which the successor comes after. - :returns: True if there are still active successors. + :returns: True if there are still active successors. False if all successors have failed and the job is queued to run to handle the failed successors. """ logger.debug("Successor job: %s of job: %s has failed """ @@ -415,17 +414,21 @@ def _runJobSuccessors(self, predecessor_id: str) -> None: # Grab the predecessor's JobDescription predecessor = self.toilState.get_job(predecessor_id) - assert len(predecessor.stack[-1]) > 0 + # Grap the successors + next_successors = predecessor.nextSuccessors() + + assert next_successors is not None + assert len(next_successors) > 0 logger.debug("Job: %s has %i successors to schedule", - predecessor_id, len(predecessor.stack[-1])) + predecessor_id, len(next_successors)) #Record the number of successors that must be completed before #the job can be considered again assert self.toilState.count_pending_successors(predecessor_id) == 0, 'Attempted to schedule successors of the same job twice!' - self.toilState.successors_pending(predecessor_id, len(predecessor.stack[-1])) + self.toilState.successors_pending(predecessor_id, len(next_successors)) # For each successor schedule if all predecessors have been completed successors = [] - for successor_id in predecessor.stack[-1]: + for successor_id in next_successors: try: successor = self.toilState.get_job(successor_id) except NoSuchJobException: @@ -441,7 +444,7 @@ def _runJobSuccessors(self, predecessor_id: str) -> None: def _processFailedSuccessors(self, predecessor_id: str): """ Deal with some of a job's successors having failed. - + Either fail the job, or restart it if it has retries left and is a checkpoint job. """ @@ -493,7 +496,7 @@ def _processReadyJob(self, job_id: str, result_status: int): logger.debug('Updating status of job %s with result status: %s', readyJob, result_status) - + # TODO: Filter out nonexistent successors/services now, so we can tell # if they are all done and the job needs deleting? @@ -544,7 +547,7 @@ def _processReadyJob(self, job_id: str, result_status: int): logger.debug("Giving job: %s to service manager to schedule its jobs", readyJob) # Use the service manager to start the services self.serviceManager.put_client(job_id) - elif len(readyJob.stack) > 0: + elif readyJob.nextSuccessors() is not None: # There are successors to run self._runJobSuccessors(job_id) elif readyJob.jobStoreID in self.toilState.servicesIssued: @@ -576,7 +579,7 @@ def _processReadyJob(self, job_id: str, result_status: int): if readyJob.remainingTryCount > 0: # add attribute to let issueJob know that this is an empty job and should be deleted logger.debug("Job: %s is empty, we are cleaning it up", readyJob) - + try: self.toilState.delete_job(readyJob.jobStoreID) except Exception as e: @@ -725,7 +728,7 @@ def _processLostJobs(self): def innerLoop(self): """ Process jobs. - + This is the leader's main loop. """ self.timeSinceJobsLastRescued = time.time() @@ -864,7 +867,7 @@ def issueJob(self, jobNode: JobDescription) -> None: # Never issue the same job multiple times simultaneously assert jobNode.jobStoreID not in self.toilState.jobs_issued, \ f"Attempted to issue {jobNode} multiple times simultaneously!" - + workerCommand = [resolveEntryPoint('_toil_worker'), jobNode.jobName, self.jobStoreLocator, @@ -899,7 +902,7 @@ def issueJob(self, jobNode: JobDescription) -> None: # len(issued_jobs_by_batch_system_id) should always be greater than or equal to preemptibleJobsIssued, # so increment this value after the job is added to the issuedJob dict self.preemptibleJobsIssued += 1 - cur_logger = logger.debug if jobNode.jobName.startswith(CWL_INTERNAL_JOBS) else logger.info + cur_logger = logger.debug if jobNode.local else logger.info cur_logger("Issued job %s with job batch system ID: " "%s and %s", jobNode, str(jobBatchSystemID), jobNode.requirements_string()) @@ -1119,7 +1122,7 @@ def process_finished_job(self, batch_system_id, result_status, wall_time=None, e Process finished jobs. Called when an attempt to run a job finishes, either successfully or otherwise. - + Takes the job out of the issued state, and then works out what to do about the fact that it succeeded or failed. @@ -1128,28 +1131,28 @@ def process_finished_job(self, batch_system_id, result_status, wall_time=None, e """ # De-issue the job. issued_job = self.removeJob(batch_system_id) - + if result_status != 0: # Show job as failed in progress (and take it from completed) self.progress_overall.update(incr=-1) self.progress_failed.update(incr=1) - + # Delegate to the vers return self.process_finished_job_description(issued_job, result_status, wall_time, exit_reason, batch_system_id) - + def process_finished_job_description(self, finished_job: JobDescription, result_status: int, wall_time: Optional[float] = None, exit_reason: Optional[BatchJobExitReason] = None, batch_system_id: Optional[int] = None) -> bool: """ Process a finished JobDescription based upon its succees or failure. - + If wall-clock time is available, informs the cluster scaler about the job finishing. - + If the job failed and a batch system ID is available, checks for and reports batch system logs. - + Checks if it succeeded and was removed, or if it failed and needs to be set up after failure, and dispatches to the appropriate function. @@ -1196,7 +1199,7 @@ def process_finished_job_description(self, finished_job: JobDescription, result_ # reduce the try count here. if replacement_job.logJobStoreFileID is None: logger.warning("No log file is present, despite job failing: %s", replacement_job) - + if batch_system_id is not None: # Look for any standard output/error files created by the batch system. # They will only appear if the batch system actually supports @@ -1260,22 +1263,18 @@ def successorRecursion(job_id: str) -> None: jobDesc = self.toilState.get_job(job_id) # For lists of successors - for successorList in jobDesc.stack: - - # For each successor in list of successors - for successorID in successorList: - - # If successor not already visited - if successorID not in alreadySeenSuccessors: - - # Add to set of successors - successors.add(successorID) - alreadySeenSuccessors.add(successorID) - - # Recurse if job exists - # (job may not exist if already completed) - if self.toilState.job_exists(successorID): - successorRecursion(successorID) + for successorID in jobDesc.allSuccessors(): + # If successor not already visited + if successorID not in alreadySeenSuccessors: + + # Add to set of successors + successors.add(successorID) + alreadySeenSuccessors.add(successorID) + + # Recurse if job exists + # (job may not exist if already completed) + if self.toilState.job_exists(successorID): + successorRecursion(successorID) successorRecursion(job_id) # Recurse from passed job diff --git a/src/toil/test/jobStores/jobStoreTest.py b/src/toil/test/jobStores/jobStoreTest.py index 500fac8988..c69d1af153 100644 --- a/src/toil/test/jobStores/jobStoreTest.py +++ b/src/toil/test/jobStores/jobStoreTest.py @@ -277,11 +277,11 @@ def testUpdateBehavior(self): # Check equivalence between jobstore1 and jobstore2. # While job1 and job2 share a jobStoreID, job1 has not been "refreshed" to show the newly added child jobs. - self.assertNotEqual([sorted(x) for x in job2.stack], [sorted(x) for x in job1.stack]) + self.assertNotEqual(sorted(job2.allSuccessors()), sorted(job1.allSuccessors())) # Reload parent job on jobstore, "refreshing" the job. job1 = jobstore1.load_job(job1.jobStoreID) - self.assertEqual([sorted(x) for x in job2.stack], [sorted(x) for x in job1.stack]) + self.assertEqual(sorted(job2.allSuccessors()), sorted(job1.allSuccessors())) # Jobs still shouldn't *actually* be equal, even if their contents are the same. self.assertNotEqual(job2, job1) diff --git a/src/toil/test/src/jobDescriptionTest.py b/src/toil/test/src/jobDescriptionTest.py index f55526f09b..9b0a5779b6 100644 --- a/src/toil/test/src/jobDescriptionTest.py +++ b/src/toil/test/src/jobDescriptionTest.py @@ -63,7 +63,6 @@ def testJobDescription(self): self.assertEqual(list(j.serviceHostIDsInBatches()), []) self.assertEqual(list(j.services), []) self.assertEqual(list(j.nextSuccessors()), []) - self.assertEqual(sum(len(level) for level in j.stack), 0) self.assertEqual(j.predecessorsFinished, set()) self.assertEqual(j.logJobStoreFileID, None) diff --git a/src/toil/utils/toilStatus.py b/src/toil/utils/toilStatus.py index 45617555ea..a0ad8ee0fd 100644 --- a/src/toil/utils/toilStatus.py +++ b/src/toil/utils/toilStatus.py @@ -65,18 +65,17 @@ def print_dot_chart(self) -> None: # Print the edges for job in set(self.jobsToReport): - for level, jobList in enumerate(job.stack): - for childJob in jobList: - # Check, b/c successor may be finished / not in the set of jobs - if childJob in jobsToNodeNames: - print( - '%s -> %s [label="%i"];' - % ( - jobsToNodeNames[str(job.jobStoreID)], - jobsToNodeNames[childJob], - level, - ) + for level, childJob in job.successors_by_phase(): + # Check, b/c successor may be finished / not in the set of jobs + if childJob in jobsToNodeNames: + print( + '%s -> %s [label="%i"];' + % ( + jobsToNodeNames[str(job.jobStoreID)], + jobsToNodeNames[childJob], + level, ) + ) print("}") def printJobLog(self) -> None: @@ -94,9 +93,8 @@ def printJobChildren(self) -> None: """Takes a list of jobs, and prints their successors.""" for job in self.jobsToReport: children = "CHILDREN_OF_JOB:%s " % job - for level, jobList in enumerate(job.stack): - for childJob in jobList: - children += "\t(CHILD_JOB:%s,PRECEDENCE:%i)" % (childJob, level) + for level, childJob in job.successors_by_phase(): + children += "\t(CHILD_JOB:%s,PRECEDENCE:%i)" % (childJob, level) print(children) def printAggregateJobStats(self, properties: List[str], childNumber: int) -> None: @@ -131,7 +129,7 @@ def report_on_jobs(self) -> Dict[str, Any]: if job.logJobStoreFileID is not None: hasLogFile.append(job) - childNumber = reduce(lambda x, y: x + y, list(map(len, job.stack)) + [0]) + childNumber = len(list(job.allSuccessors())) if childNumber > 0: # Total number of successors > 0 hasChildren.append(job) properties.add("HAS_CHILDREN") @@ -304,10 +302,9 @@ def traverseJobGraph( foundJobStoreIDs.add(str(rootJob.jobStoreID)) jobsToReport.append(rootJob) # Traverse jobs in stack - for jobs in rootJob.stack: - for successorJobStoreID in jobs: - if successorJobStoreID not in foundJobStoreIDs and self.jobStore.job_exists(successorJobStoreID): - self.traverseJobGraph(self.jobStore.load_job(successorJobStoreID), jobsToReport, foundJobStoreIDs) + for successorJobStoreID in rootJob.allSuccessors(): + if successorJobStoreID not in foundJobStoreIDs and self.jobStore.job_exists(successorJobStoreID): + self.traverseJobGraph(self.jobStore.load_job(successorJobStoreID), jobsToReport, foundJobStoreIDs) # Traverse service jobs for jobs in rootJob.services: diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index fee68cd1b3..e2521bc661 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -36,6 +36,7 @@ from urllib.parse import urlsplit, urljoin, quote, unquote import WDL +from WDL._util import byte_size_units from WDL.runtime.task_container import TaskContainer from WDL.runtime.backend.singularity import SingularityContainer from WDL.runtime.backend.docker_swarm import SwarmContainer @@ -328,6 +329,49 @@ def unpack_toil_uri(toil_uri: str) -> Tuple[FileID, str]: return file_id, file_basename +class NonDownloadingSize(WDL.StdLib._Size): + """ + WDL size() implementatiuon that avoids downloading files. + + MiniWDL's default size() implementation downloads the whole file to get its + size. We want to be able to get file sizes from code running on the leader, + where there may not be space to download the whole file. So we override the + fancy class that implements it so that we can handle sizes for FileIDs + using the FileID's stored size info. + """ + + def _call_eager(self, expr: "WDL.Expr.Apply", arguments: List[WDL.Value.Base]) -> WDL.Value.Base: + """ + Replacement evaluation implementation that avoids downloads. + """ + + # Get all the URIs of files that actually are set. + file_uris: List[str] = [f.value for f in arguments[0].coerce(WDL.Type.Array(WDL.Type.File(optional=True))).value if not isinstance(f, WDL.Value.Null)] + + total_size = 0.0 + for uri in file_uris: + # Sum up the sizes of all the files, if any. + if uri.startswith(TOIL_URI_SCHEME): + # This is a Toil File ID we encoded; we have the size + # available. + file_id, _ = unpack_toil_uri(uri) + # Use the encoded size + total_size += file_id.size + else: + # We need to fetch it and get its size. + total_size += os.path.getsize(self.stdlib._devirtualize_filename(uri)) + + if len(arguments) > 1: + # Need to convert units. See + # + unit_name: str = arguments[1].coerce(WDL.Type.String()).value + if unit_name not in byte_size_units: + raise WDL.Error.EvalError(expr, "size(): invalid unit " + unit_name) + # Divide down to the right unit + total_size /= float(byte_size_units[unit_name]) + + # Return the result as a WDL float value + return WDL.Value.Float(total_size) class ToilWDLStdLibBase(WDL.StdLib.Base): """ @@ -345,6 +389,10 @@ def __init__(self, file_store: AbstractFileStore): # Set up miniwdl's implementation (which may be WDL.StdLib.TaskOutputs) super().__init__(wdl_version, write_dir) + # Replace the MiniWDL size() implementation with one that doesn't need + # to always download the file. + self.size = NonDownloadingSize(self) + # Keep the file store around so we can access files. self._file_store = file_store @@ -372,7 +420,6 @@ def _devirtualize_filename(self, filename: str) -> str: # TODO: Support people doing path operations (join, split, get parent directory) on the virtualized filenames. # TODO: For task inputs, we are supposed to make sure to put things in the same directory if they came from the same directory. See - if filename.startswith(TOIL_URI_SCHEME): # This is a reference to the Toil filestore. # Deserialize the FileID @@ -809,6 +856,11 @@ def __init__(self, **kwargs: Any) -> None: in the constructor because it needs to happen in the leader and the worker before a job body containing MiniWDL structures can be saved. """ + + # Default everything to being a local job + if 'local' not in kwargs: + kwargs['local'] = True + super().__init__(**kwargs) # The jobs can't pickle under the default Python recursion limit of @@ -847,7 +899,10 @@ def __init__(self, task: WDL.Tree.Task, prev_node_results: Sequence[Promised[WDL The caller has alredy added the task's own name. """ - super().__init__(unitName=namespace, displayName=namespace, **kwargs) + # This job should not be local because it represents a real workflow task. + # TODO: Instead of re-scheduling with more resources, add a local + # "wrapper" job like CWL uses to determine the actual requirements. + super().__init__(unitName=namespace, displayName=namespace, local=False, **kwargs) logger.info("Preparing to run task %s as %s", task.name, namespace) @@ -1828,7 +1883,7 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: """ super().run(file_store) - # Evaluate all the outputs in the noirmal, non-task-outputs library context + # Evaluate all the outputs in the normal, non-task-outputs library context standard_library = ToilWDLStdLibBase(file_store) output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() for output_decl in self._outputs: diff --git a/src/toil/worker.py b/src/toil/worker.py index 7fa4ca173b..d906284045 100644 --- a/src/toil/worker.py +++ b/src/toil/worker.py @@ -31,8 +31,7 @@ from toil import logProcessContext from toil.common import Config, Toil, safeUnpickleFromStream -from toil.cwl.utils import (CWL_INTERNAL_JOBS, - CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION, +from toil.cwl.utils import (CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION, CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE) from toil.deferred import DeferredFunctionManager from toil.fileStores.abstractFileStore import AbstractFileStore @@ -63,22 +62,14 @@ def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, confi :param config: The configuration for the current run. """ #If no more jobs to run or services not finished, quit - if len(predecessor.stack) == 0 or len(predecessor.services) > 0 or (isinstance(predecessor, CheckpointJobDescription) and predecessor.checkpoint != None): - logger.debug("Stopping running chain of jobs: length of stack: %s, services: %s, checkpoint: %s", - len(predecessor.stack), len(predecessor.services), (isinstance(predecessor, CheckpointJobDescription) and predecessor.checkpoint != None)) + if predecessor.nextSuccessors() is None or len(predecessor.services) > 0 or (isinstance(predecessor, CheckpointJobDescription) and predecessor.checkpoint != None): + logger.debug("Stopping running chain of jobs: no successors: %s, services: %s, checkpoint: %s", + predecessor.nextSuccessors() is None, len(predecessor.services), (isinstance(predecessor, CheckpointJobDescription) and predecessor.checkpoint != None)) return None - if len(predecessor.stack) > 1 and len(predecessor.stack[-1]) > 0 and len(predecessor.stack[-2]) > 0: - # TODO: Without a real stack list we can freely mutate, we can't chain - # to a child, which may branch, and then go back and do the follow-ons - # of the original job. - # TODO: Go back to a free-form stack list and require some kind of - # stack build phase? - logger.debug("Stopping running chain of jobs because job has both children and follow-ons") - return None #Get the next set of jobs to run - jobs = predecessor.nextSuccessors() + jobs = list(predecessor.nextSuccessors()) if len(jobs) == 0: # If there are no jobs, we might just not have any children. logger.debug("Stopping running chain of jobs because job has no ready children or follow-ons") @@ -87,11 +78,14 @@ def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, confi #If there are 2 or more jobs to run in parallel we quit if len(jobs) >= 2: logger.debug("No more jobs can run in series by this worker," - " it's got %i children", len(jobs)-1) + " it's got %i successors", len(jobs)) + logger.debug("Two distinct successors are %s and %s", jobs[0], jobs[1]) return None # Grab the only job that should be there. - successorID = next(iter(jobs)) + successorID = jobs[0] + + logger.debug("%s would chain to ID %s", predecessor, successorID) # Load the successor JobDescription successor = jobStore.load_job(successorID) @@ -487,6 +481,9 @@ def blockFn() -> bool: # after the commit process we just kicked off, and aren't committed # early or partially. jobDesc = copy.deepcopy(jobDesc) + # Bump its version since saving will do that too and we don't want duplicate versions. + jobDesc.pre_update_hook() + logger.debug("Starting the next job") @@ -603,7 +600,7 @@ def blockFn() -> bool: # Commit log file reference back to JobStore jobStore.update_job(jobDesc) - elif ((debugging or (config.writeLogsFromAllJobs and not jobName.startswith(CWL_INTERNAL_JOBS))) + elif ((debugging or (config.writeLogsFromAllJobs and not jobDesc.local)) and redirectOutputToLogFile): # write log messages with open(tempWorkerLogPath, 'rb') as logFile: if os.path.getsize(tempWorkerLogPath) > logFileByteReportLimit != 0: