Skip to content

Commit

Permalink
Support HPRC assembly WDLs (DataBiosphere#4439)
Browse files Browse the repository at this point in the history
* Drop unused WDLInputJob

* Match MiniWDL and Cromwell behavior for passing nulls to inputs with default values

* Add more logging to try and debug missing task inputs

* Log whole job str() when complaining about resources

* Avoid resurrecting shadowed bindings and obnoxiously long binding chains

* Quiet debugging

* Put the job identification info in the InsufficientSystemResources error as a string

* Carry namespace/path from root down the tree

* Sync on Singularity directories

* Make sure Singularity cache directories exist before we would need to lock them

* Make sure to not have two DeferredFunctionManagers

* Make sure the coordination directory is caught vanishing if it vanishes

* Drop XDG variables from Slurm job submissions

* Remove asserts where we don't actually always have the directory stored

* Expose Slurm's assigned suspicious session, and just don't use it

* Remove some environment logging

* Revise logging and make checking for missing coordination dir more heavyweight, to address review comments
  • Loading branch information
adamnovak authored Apr 24, 2023
1 parent 1b96dae commit 2804edf
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 90 deletions.
9 changes: 4 additions & 5 deletions src/toil/batchSystems/abstractBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ def workerCleanup(info: WorkerCleanupInfo) -> None:
:param WorkerCleanupInfo info: A named tuple consisting of all the relevant information
for cleaning up the worker.
"""
logger.debug('Attempting worker cleanup')
assert isinstance(info, WorkerCleanupInfo)
assert info.workflow_id is not None
workflowDir = Toil.getLocalWorkflowDir(info.workflow_id, info.work_dir)
Expand All @@ -456,9 +457,11 @@ def workerCleanup(info: WorkerCleanupInfo) -> None:
AbstractFileStore.shutdownFileStore(info.workflow_id, info.work_dir, info.coordination_dir)
if info.clean_work_dir in ('always', 'onSuccess', 'onError'):
if workflowDirContents in ([], [cacheDirName(info.workflow_id)]):
logger.debug('Deleting workflow directory %s', workflowDir)
shutil.rmtree(workflowDir, ignore_errors=True)
if coordination_dir != workflowDir:
# No more coordination to do here either.
logger.debug('Deleting coordination directory %s', coordination_dir)
shutil.rmtree(coordination_dir, ignore_errors=True)

class NodeInfo:
Expand Down Expand Up @@ -559,11 +562,7 @@ def __init__(self, requirer: Requirer, resource: str, available: Optional[Parsed
:param details: Any extra details about the problem that can be attached to the error.
"""

self.job_name : Optional[str] = None
if hasattr(requirer, 'jobName') and isinstance(getattr(requirer, 'jobName'), str):
# Keep the job name if any
self.job_name = cast(str, getattr(requirer, 'jobName'))

self.job_name : Optional[str] = str(requirer)
self.resource = resource
self.requested = cast(ParsedRequirement, getattr(requirer, resource))
self.available = available
Expand Down
5 changes: 5 additions & 0 deletions src/toil/batchSystems/cleanup_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,23 @@ def __enter__(self) -> None:
# Set up an arena so we know who is the last worker to leave
self.arena = LastProcessStandingArena(Toil.get_toil_coordination_dir(self.workerCleanupInfo.work_dir, self.workerCleanupInfo.coordination_dir),
self.workerCleanupInfo.workflow_id + '-cleanup')
logger.debug('Entering cleanup arena')
self.arena.enter()
logger.debug('Cleanup arena entered')

# This is exactly the signature MyPy demands.
# Also, it demands we not say we can return a bool if we return False
# always, because it can be smarter about reachability if it knows what
# context managers never eat exceptions. So it decides any context manager
# that is always falsey but claims to return a bool is an error.
def __exit__(self, type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]) -> None:
logger.debug('Leaving cleanup arena')
for _ in self.arena.leave():
# We are the last concurrent worker to finish.
# Do batch system cleanup.
logger.debug('Cleaning up worker')
BatchSystemSupport.workerCleanup(self.workerCleanupInfo)
# Now the coordination_dir is allowed to no longer exist on the node.
logger.debug('Cleanup arena left')


20 changes: 19 additions & 1 deletion src/toil/batchSystems/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,25 @@ def prepareSubmission(self,

def submitJob(self, subLine):
try:
output = call_command(subLine)
# Slurm is not quite clever enough to follow the XDG spec on
# its own. If the submission command sees e.g. XDG_RUNTIME_DIR
# in our environment, it will send it along (especially with
# --export=ALL), even though it makes a promise to the job that
# Slurm isn't going to keep. It also has a tendency to create
# /run/user/<uid> *at the start* of a job, but *not* keep it
# around for the duration of the job.
#
# So we hide the whole XDG universe from Slurm before we make
# the submission.
# Might as well hide DBUS also.
# This doesn't get us a trustworthy XDG session in Slurm, but
# it does let us see the one Slurm tries to give us.
no_session_environment = os.environ.copy()
session_names = [n for n in no_session_environment.keys() if n.startswith('XDG_') or n.startswith('DBUS_')]
for name in session_names:
del no_session_environment[name]

output = call_command(subLine, env=no_session_environment)
# sbatch prints a line like 'Submitted batch job 2954103'
result = int(output.strip().split()[-1])
logger.debug("sbatch submitted job %d", result)
Expand Down
9 changes: 8 additions & 1 deletion src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,11 @@ def get_toil_coordination_dir(cls, config_work_dir: Optional[str], config_coordi
deleted.
"""

if 'XDG_RUNTIME_DIR' in os.environ and not os.path.exists(os.environ['XDG_RUNTIME_DIR']):
# Slurm has been observed providing this variable but not keeping
# the directory live as long as we run for.
logger.warning('XDG_RUNTIME_DIR is set to nonexistent directory %s; your environment may be out of spec!', os.environ['XDG_RUNTIME_DIR'])

# Go get a coordination directory, using a lot of short-circuiting of
# or and the fact that and returns its second argument when it
# succeeds.
Expand All @@ -1381,7 +1386,9 @@ def get_toil_coordination_dir(cls, config_work_dir: Optional[str], config_coordi
# session that has the env var set. Otherwise it might belong to a
# different set of sessions and get cleaned up out from under us
# when that session ends.
('XDG_RUNTIME_DIR' in os.environ and try_path(os.path.join(os.environ['XDG_RUNTIME_DIR'], 'toil'))) or
# We don't think Slurm XDG sessions are trustworthy, depending on
# the cluster's PAM configuration, so don't use them.
('XDG_RUNTIME_DIR' in os.environ and 'SLURM_JOBID' not in os.environ and try_path(os.path.join(os.environ['XDG_RUNTIME_DIR'], 'toil'))) or
# Try under /run/lock. It might be a temp dir style sticky directory.
try_path('/run/lock') or
# Finally, fall back on the work dir and hope it's a legit filesystem.
Expand Down
4 changes: 4 additions & 0 deletions src/toil/deferred.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ def _runOrphanedDeferredFunctions(self):
# So skip it.
continue

# We need to make sure that we don't hold two
# DeferredFunctionManagers at once! So make sure to del yours
# when you are done with it. TODO: Make it a singleton!

fd = None

try:
Expand Down
53 changes: 49 additions & 4 deletions src/toil/fileStores/nonCachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,52 @@ def __init__(
self.jobStateFile: Optional[str] = None
self.localFileMap: DefaultDict[str, List[str]] = defaultdict(list)

self.check_for_state_corruption()

@staticmethod
def check_for_coordination_corruption(coordination_dir: Optional[str]) -> None:
"""
Make sure the coordination directory hasn't been deleted unexpectedly.
Slurm has been known to delete XDG_RUNTIME_DIR out from under processes
it was promised to, so it is possible that in certain misconfigured
environments the coordination directory and everything in it could go
away unexpectedly. We are going to regularly make sure that the things
we think should exist actually exist, and we are going to abort if they
do not.
"""

if coordination_dir and not os.path.exists(coordination_dir):
raise RuntimeError(
f'The Toil coordination directory at {coordination_dir} '
f'was removed while the workflow was running! Please provide a '
f'TOIL_COORDINATION_DIR or --coordinationDir at a location that '
f'is safe from automated cleanup during the workflow run.'
)

def check_for_state_corruption(self) -> None:
"""
Make sure state tracking information hasn't been deleted unexpectedly.
"""

NonCachingFileStore.check_for_coordination_corruption(self.coordination_dir)

if self.jobStateFile and not os.path.exists(self.jobStateFile):
raise RuntimeError(
f'The job state file {self.jobStateFile} '
f'was removed while the workflow was running! Please provide a '
f'TOIL_COORDINATION_DIR or --coordinationDir at a location that '
f'is safe from automated cleanup during the workflow run.'
)

@contextmanager
def open(self, job: Job) -> Generator[None, None, None]:
jobReqs = job.disk
startingDir = os.getcwd()
self.localTempDir: str = make_public_dir(in_directory=self.localTempDir)
self._removeDeadJobs(self.coordination_dir)
self.jobStateFile = self._createJobStateFile()
self.check_for_state_corruption()
freeSpace, diskSize = getFileSystemSize(self.localTempDir)
if freeSpace <= 0.1 * diskSize:
logger.warning(f'Starting job {self.jobName} with less than 10%% of disk space remaining.')
Expand All @@ -85,6 +124,7 @@ def open(self, job: Job) -> Generator[None, None, None]:
self.logToMaster(disk_usage, level=logging.DEBUG)
os.chdir(startingDir)
# Finally delete the job from the worker
self.check_for_state_corruption()
os.remove(self.jobStateFile)

def writeGlobalFile(self, localFileName: str, cleanup: bool=False) -> FileID:
Expand Down Expand Up @@ -193,6 +233,8 @@ def _removeDeadJobs(cls, coordination_dir: str, batchSystemShutdown: bool=False)
:return:
"""

cls.check_for_coordination_corruption(coordination_dir)

for jobState in cls._getAllJobStates(coordination_dir):
if not process_name_exists(coordination_dir, jobState['jobProcessName']):
# We need to have a race to pick someone to clean up.
Expand Down Expand Up @@ -226,8 +268,8 @@ def _removeDeadJobs(cls, coordination_dir: str, batchSystemShutdown: bool=False)
fcntl.lockf(dirFD, fcntl.LOCK_UN)
os.close(dirFD)

@staticmethod
def _getAllJobStates(coordination_dir: str) -> Iterator[Dict[str, str]]:
@classmethod
def _getAllJobStates(cls, coordination_dir: str) -> Iterator[Dict[str, str]]:
"""
Generator function that deserializes and yields the job state for every job on the node,
one at a time.
Expand All @@ -237,15 +279,17 @@ def _getAllJobStates(coordination_dir: str) -> Iterator[Dict[str, str]]:
:return: dict with keys (jobName, jobProcessName, jobDir)
"""
jobStateFiles = []


cls.check_for_coordination_corruption(coordination_dir)

# Note that the directory may contain files whose names are not decodable to Unicode.
# So we need to work in bytes.
for entry in os.scandir(os.fsencode(coordination_dir)):
# For each job state file in the coordination directory
if entry.name.endswith(b'.jobState'):
# This is the state of a job
jobStateFiles.append(os.fsdecode(entry.path))

for fname in jobStateFiles:
try:
yield NonCachingFileStore._readJobState(fname)
Expand All @@ -272,6 +316,7 @@ def _createJobStateFile(self) -> str:
:return: Path to the job state file
:rtype: str
"""
self.check_for_state_corruption()
jobState = {'jobProcessName': get_process_name(self.coordination_dir),
'jobName': self.jobName,
'jobDir': self.localTempDir}
Expand Down
Loading

0 comments on commit 2804edf

Please sign in to comment.