Skip to content

Commit

Permalink
fix logic in get_finished_jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
mbthornton-lbl committed Nov 26, 2024
1 parent 61f1896 commit e5bf09b
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 7 deletions.
32 changes: 25 additions & 7 deletions nmdc_automation/workflow_automation/watch_nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,29 @@ def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]:
failed_jobs = []
for job in self.job_cache:
if not job.done:
last_status = job.workflow.last_status
if last_status == "Succeeded" and job.opid:
if job.workflow.last_status == "Succeeded" and job.opid:
job.done = True
successful_jobs.append(job)
elif last_status == "Failed" and job.opid:
continue
if job.workflow.last_status == "Failed" and job.workflow.failed_count >= self._MAX_FAILS:
job.done = True
failed_jobs.append(job)
continue
# check status
status = job.job.get_job_status()
if status == "Succeded":
job.workflow.last_status = status
successful_jobs.append(job)
continue
elif status == "Failed":
job.workflow.last_status = status
job.workflow.failed_count += 1
failed_jobs.append(job)
continue
else:
job.workflow.last_status = status
self.save_checkpoint()

if successful_jobs:
logger.info(f"Found {len(successful_jobs)} successful jobs.")
if failed_jobs:
Expand Down Expand Up @@ -314,10 +332,10 @@ def restore_from_checkpoint(self, state_data: Dict[str, Any] = None)-> None:
def cycle(self):
""" Perform a cycle of watching for unclaimed jobs, claiming jobs, and processing finished jobs """
self.restore_from_checkpoint()
if not self.should_skip_claim:
unclaimed_jobs = self.runtime_api_handler.get_unclaimed_jobs(self.config.allowed_workflows)
logger.info(f"Found {len(unclaimed_jobs)} unclaimed jobs.")
self.claim_jobs(unclaimed_jobs)
# if not self.should_skip_claim: - is this actually used?
unclaimed_jobs = self.runtime_api_handler.get_unclaimed_jobs(self.config.allowed_workflows)
logger.info(f"Found {len(unclaimed_jobs)} unclaimed jobs.")
self.claim_jobs(unclaimed_jobs)


logger.info(f"Checking for finished jobs.")
Expand Down
12 changes: 12 additions & 0 deletions nmdc_automation/workflow_automation/wfutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,18 @@ def config(self) -> Dict[str, Any]:
def last_status(self) -> Optional[str]:
return self.cached_state.get("last_status", None)

@last_status.setter
def last_status(self, status: str):
self.cached_state["last_status"] = status

@property
def failed_count(self) -> int:
return self.cached_state.get("failed_count", 0)

@failed_count.setter
def failed_count(self, count: int):
self.cached_state["failed_count"] = count

@property
def nmdc_jobid(self) -> Optional[str]:
return self.cached_state.get("nmdc_jobid", None)
Expand Down

0 comments on commit e5bf09b

Please sign in to comment.