Skip to content

Commit

Permalink
Merge branch 'main' into 312-jsondecode-error-in-watchercycle
Browse files Browse the repository at this point in the history
  • Loading branch information
mbthornton-lbl committed Dec 4, 2024
2 parents c382616 + 270afce commit e61ba73
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 76 deletions.
11 changes: 11 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -1,2 +1,13 @@
[run]
branch = True
omit =
*/site-packages/*
*/distutils/*
*/tests/*
*/test/*
*/__init__.py
*/__main__.py
*/setup.py
*/re_iding/*
*/examples/*
*/nmdc_common/*
8 changes: 4 additions & 4 deletions badges/coverage.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 4 additions & 4 deletions badges/tests.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
64 changes: 48 additions & 16 deletions nmdc_automation/workflow_automation/watch_nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,21 +184,47 @@ def prepare_and_cache_new_job(self, new_job: WorkflowJob, opid: str, force=False
return new_job

def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]:
""" Get finished jobs """
"""
Get finished jobs
Returns a tuple of successful jobs and failed jobs
Jobs are considered finished if they have a last status of "Succeeded" or "Failed"
or if they have reached the maximum number of failures
Unfinished jobs are checked for status and updated if needed.
A checkpoint is saved after checking for finished jobs.
"""
successful_jobs = []
failed_jobs = []
for job in self.job_cache:
if not job.done:
status = job.job_status
if status == "Succeeded" and job.opid:
if job.workflow.last_status == "Succeeded" and job.opid:
successful_jobs.append(job)
continue
if job.workflow.last_status == "Failed" and job.workflow.failed_count >= self._MAX_FAILS:
failed_jobs.append(job)
continue
# check status
status = job.job.get_job_status()

if status == "Succeded":
job.workflow.last_status = status
successful_jobs.append(job)
elif status == "Failed" and job.opid:
continue
elif status == "Failed":
job.workflow.last_status = status
job.workflow.failed_count += 1
failed_jobs.append(job)
continue
else:
job.workflow.last_status = status
logger.debug(f"Job {job.opid} status: {status}")
self.save_checkpoint()

if successful_jobs:
logger.info(f"Found {len(successful_jobs)} successful jobs.")
if failed_jobs:
logger.info(f"Found {len(failed_jobs)} failed jobs.")
return (successful_jobs, failed_jobs)
return successful_jobs, failed_jobs

def process_successful_job(self, job: WorkflowJob) -> Database:
""" Process a successful job and return a Database object """
Expand Down Expand Up @@ -242,7 +268,7 @@ def process_successful_job(self, job: WorkflowJob) -> Database:
self.save_checkpoint()
return database

def process_failed_job(self, job: WorkflowJob) -> None:
def process_failed_job(self, job: WorkflowJob) -> Optional[str]:
""" Process a failed job """
if job.workflow.state.get("failed_count", 0) >= self._MAX_FAILS:
logger.error(f"Job {job.opid} failed {self._MAX_FAILS} times. Skipping.")
Expand All @@ -252,8 +278,9 @@ def process_failed_job(self, job: WorkflowJob) -> None:
job.workflow.state["failed_count"] = job.workflow.state.get("failed_count", 0) + 1
job.workflow.state["last_status"] = job.job_status
self.save_checkpoint()
logger.error(f"Job {job.opid} failed {job.workflow.state['failed_count']} times. Retrying.")
job.job.submit_job()
logger.warning(f"Job {job.opid} failed {job.workflow.state['failed_count']} times. Retrying.")
jobid = job.job.submit_job()
return jobid


class RuntimeApiHandler:
Expand Down Expand Up @@ -292,7 +319,7 @@ def update_operation(self, opid, done, meta):
class Watcher:
""" Watcher class for monitoring and managing jobs """
def __init__(self, site_configuration_file: Union[str, Path], state_file: Union[str, Path] = None):
self._POLL = 20
self._POLL_INTERVAL_SEC = 60
self._MAX_FAILS = 2
self.should_skip_claim = False
self.config = SiteConfig(site_configuration_file)
Expand All @@ -313,16 +340,19 @@ def restore_from_checkpoint(self, state_data: Dict[str, Any] = None)-> None:
def cycle(self):
""" Perform a cycle of watching for unclaimed jobs, claiming jobs, and processing finished jobs """
self.restore_from_checkpoint()
if not self.should_skip_claim:
unclaimed_jobs = self.runtime_api_handler.get_unclaimed_jobs(self.config.allowed_workflows)
# if not self.should_skip_claim: - is this actually used?
unclaimed_jobs = self.runtime_api_handler.get_unclaimed_jobs(self.config.allowed_workflows)
if unclaimed_jobs:
logger.info(f"Found {len(unclaimed_jobs)} unclaimed jobs.")
self.claim_jobs(unclaimed_jobs)
self.claim_jobs(unclaimed_jobs)


logger.info(f"Checking for finished jobs.")
logger.debug(f"Checking for finished jobs.")
successful_jobs, failed_jobs = self.job_manager.get_finished_jobs()
logger.debug(f"Found {len(successful_jobs)} successful jobs and {len(failed_jobs)} failed jobs.")
if not successful_jobs and not failed_jobs:
logger.debug("No finished jobs found.")
for job in successful_jobs:
logger.info(f"Processing successful job: {job.opid}, {job.workflow_execution_id}")
job_database = self.job_manager.process_successful_job(job)
# sanity checks
if not job_database.data_object_set:
Expand All @@ -349,20 +379,22 @@ def cycle(self):
resp = self.runtime_api_handler.update_operation(
job.opid, done=True, meta=job.job.metadata
)
logging.info(f"Updated operation {job.opid} response id: {resp}")
logging.info(f"Updated operation {job.opid} response id: {resp['id']}")

for job in failed_jobs:
logger.info(f"Processing failed job: {job.opid}, {job.workflow_execution_id}")
self.job_manager.process_failed_job(job)

def watch(self):
""" Maintain a polling loop to 'cycle' through job claims and processing """
logger.info("Entering polling loop")
while True:
try:
print(".")
self.cycle()
except (IOError, ValueError, TypeError, AttributeError) as e:
logger.exception(f"Error occurred during cycle: {e}", exc_info=True)
sleep(self._POLL)
sleep(self._POLL_INTERVAL_SEC)

def claim_jobs(self, unclaimed_jobs: List[WorkflowJob] = None) -> None:
""" Claim unclaimed jobs, prepare them, and submit them. Write a checkpoint after claiming jobs. """
Expand Down
20 changes: 16 additions & 4 deletions nmdc_automation/workflow_automation/wfutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ class CromwellRunner(JobRunnerABC):
"""Job runner for Cromwell"""
LABEL_SUBMITTER_VALUE = "nmdcda"
LABEL_PARAMETERS = ["release", "wdl", "git_repo"]
NO_SUBMIT_STATES = ["Submitted", # job is already submitted but not running
# States that indicate a job is in some active state and does not need to be submitted
NO_SUBMIT_STATES = [
"Submitted", # job is already submitted but not running
"Running", # job is already running
"Failed", # job failed
"Succeeded", # job succeeded
"Aborted", # job was aborted and did not finish
"Aborting" # job is in the process of being aborted
"On Hold", # job is on hold and not running. It can be manually resumed later
]
Expand Down Expand Up @@ -152,7 +152,7 @@ def submit_job(self, force: bool = False) -> Optional[str]:
:param force: if True, submit the job even if it is in a state that does not require submission
:return: the job id
"""
status = self.get_job_status()
status = self.workflow.last_status
if status in self.NO_SUBMIT_STATES and not force:
logging.info(f"Job {self.job_id} in state {status}, skipping submission")
return
Expand Down Expand Up @@ -270,6 +270,18 @@ def config(self) -> Dict[str, Any]:
def last_status(self) -> Optional[str]:
return self.cached_state.get("last_status", None)

@last_status.setter
def last_status(self, status: str):
self.cached_state["last_status"] = status

@property
def failed_count(self) -> int:
return self.cached_state.get("failed_count", 0)

@failed_count.setter
def failed_count(self, count: int):
self.cached_state["failed_count"] = count

@property
def nmdc_jobid(self) -> Optional[str]:
return self.cached_state.get("nmdc_jobid", None)
Expand Down
Loading

0 comments on commit e61ba73

Please sign in to comment.