Skip to content

Commit

Permalink
Merge pull request #305 from microbiomedata/304-watcherclaim_jobs-fai…
Browse files Browse the repository at this point in the history
…ling-with-a-keyerror

fix mis-handled claim_jobs
  • Loading branch information
mbthornton-lbl authored Nov 26, 2024
2 parents 2aec24f + 13257a9 commit eedecdb
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 18 deletions.
2 changes: 1 addition & 1 deletion nmdc_automation/run_process/run_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def watcher(ctx, site_configuration_file):
level=logging_level, format="%(asctime)s %(levelname)s: %(message)s"
)
logger = logging.getLogger(__name__)
logger.info(f"Config file: {site_configuration_file}")
logger.info(f"Initializing Watcher: config file: {site_configuration_file}")
ctx.obj = Watcher(site_configuration_file)


Expand Down
14 changes: 8 additions & 6 deletions nmdc_automation/workflow_automation/watch_nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(self, config: SiteConfig, state_file: Union[str, Path] = None):
self._state_file = None
# set state file
if state_file:
logger.info(f"Using state file: {state_file}")
logger.info(f"Initializing FileHandler with state file: {state_file}")
self._state_file = Path(state_file)
elif self.config.agent_state:
logger.info(f"Using state file from config: {self.config.agent_state}")
Expand Down Expand Up @@ -64,7 +64,6 @@ def state_file(self, value) -> None:

def read_state(self) -> Optional[Dict[str, Any]]:
""" Read the state file and return the data """
logging.info(f"Reading state from {self.state_file}")
with open(self.state_file, "r") as f:
state = loads(f.read())
return state
Expand Down Expand Up @@ -137,7 +136,7 @@ def restore_from_state(self) -> None:
""" Restore jobs from state data """
new_jobs = self.get_new_workflow_jobs_from_state()
if new_jobs:
logger.info(f"Restoring {len(new_jobs)} jobs from state.")
logger.info(f"Adding {len(new_jobs)} new jobs from state file.")
self.job_cache.extend(new_jobs)

def get_new_workflow_jobs_from_state(self) -> List[WorkflowJob]:
Expand All @@ -151,10 +150,10 @@ def get_new_workflow_jobs_from_state(self) -> List[WorkflowJob]:
# already in cache
continue
wf_job = WorkflowJob(self.config, workflow_state=job)
logger.debug(f"New workflow job: {wf_job.opid} from state.")
logger.info(f"New Job from State: {wf_job.workflow_execution_id}, {wf_job.workflow.nmdc_jobid}")
logger.info(f"Last Status: {wf_job.workflow.last_status}")
job_cache_ids.append(wf_job.opid)
wf_job_list.append(wf_job)
logging.info(f"Restored {len(wf_job_list)} jobs from state")
return wf_job_list

def find_job_by_opid(self, opid) -> Optional[WorkflowJob]:
Expand Down Expand Up @@ -319,7 +318,10 @@ def cycle(self):
logger.info(f"Found {len(unclaimed_jobs)} unclaimed jobs.")
self.claim_jobs(unclaimed_jobs)


logger.info(f"Checking for finished jobs.")
successful_jobs, failed_jobs = self.job_manager.get_finished_jobs()
logger.debug(f"Found {len(successful_jobs)} successful jobs and {len(failed_jobs)} failed jobs.")
for job in successful_jobs:
job_database = self.job_manager.process_successful_job(job)
# sanity checks
Expand Down Expand Up @@ -367,7 +369,7 @@ def claim_jobs(self, unclaimed_jobs: List[WorkflowJob] = None) -> None:
for job in unclaimed_jobs:
logger.info(f"Claiming job {job.workflow.nmdc_jobid}")
claim = self.runtime_api_handler.claim_job(job.workflow.nmdc_jobid)
opid = claim["detail"]["id"]
opid = claim["id"]
new_job = self.job_manager.prepare_and_cache_new_job(job, opid)
if new_job:
new_job.job.submit_job()
Expand Down
30 changes: 25 additions & 5 deletions nmdc_automation/workflow_automation/wfutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,20 @@ def submit_job(self, force: bool = False) -> Optional[str]:

def get_job_status(self) -> str:
""" Get the status of a job from Cromwell """
if not self.job_id:
if not self.workflow.cromwell_jobid:
return "Unknown"
status_url = f"{self.service_url}/{self.job_id}/status"
response = requests.get(status_url)
response.raise_for_status()
return response.json().get("status", "Unknown")
status_url = f"{self.service_url}/{self.workflow.cromwell_jobid}/status"
# There can be a delay between submitting a job and it
# being available in Cromwell so handle 404 errors
logging.debug(f"Getting job status from {status_url}")
try:
response = requests.get(status_url)
response.raise_for_status()
return response.json().get("status", "Unknown")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return "Unknown"
raise e

def get_job_metadata(self) -> Dict[str, Any]:
""" Get metadata for a job from Cromwell """
Expand Down Expand Up @@ -258,6 +266,18 @@ def config(self) -> Dict[str, Any]:
# for backward compatibility we need to check for both keys
return self.cached_state.get("conf", self.cached_state.get("config", {}))

@property
def last_status(self) -> Optional[str]:
return self.cached_state.get("last_status", None)

@property
def nmdc_jobid(self) -> Optional[str]:
return self.cached_state.get("nmdc_jobid", None)

@property
def cromwell_jobid(self) -> Optional[str]:
return self.cached_state.get("cromwell_jobid", None)

@property
def execution_template(self) -> Dict[str, str]:
# for backward compatibility we need to check for both keys
Expand Down
24 changes: 18 additions & 6 deletions tests/test_watch_nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ def test_job_manager_prepare_and_cache_new_job_force(site_config, initial_state_


def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures_dir):
# Mock the URL and response

# Arrange - initial state has 1 failure and is not done
fh = FileHandler(site_config, initial_state_file)
Expand All @@ -287,11 +286,24 @@ def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures
# sanity check
assert len(jm.job_cache) == 3

# Act
successful_jobs, failed_jobs = jm.get_finished_jobs()
# Assert
assert successful_jobs
assert failed_jobs
# Mock requests for job status
with requests_mock.Mocker() as m:
# Mock the successful job status
m.get(
"http://localhost:8088/api/workflows/v1/9492a397-eb30-472b-9d3b-abc123456789/status",
json={"status": "Succeeded"}
)
# Mock the failed job status
m.get(
"http://localhost:8088/api/workflows/v1/12345678-abcd-efgh-ijkl-9876543210/status",
json={"status": "Failed"}
)

# Act
successful_jobs, failed_jobs = jm.get_finished_jobs()
# Assert
assert successful_jobs
assert failed_jobs
# cleanup
jm.job_cache = []

Expand Down

0 comments on commit eedecdb

Please sign in to comment.