diff --git a/.coveragerc b/.coveragerc index 398ff08a..d38fcc4a 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,2 +1,13 @@ [run] branch = True +omit = + */site-packages/* + */distutils/* + */tests/* + */test/* + */__init__.py + */__main__.py + */setup.py + */re_iding/* + */examples/* + */nmdc_common/* diff --git a/badges/coverage.svg b/badges/coverage.svg index 24c797f7..07ecfe02 100644 --- a/badges/coverage.svg +++ b/badges/coverage.svg @@ -5,7 +5,7 @@ width="92.5" height="20" role="img" - aria-label="coverage: 63%" + aria-label="coverage: 67%" > - coverage: 63% + coverage: 67% @@ -42,8 +42,8 @@ - 63% - 63% + 67% + 67% diff --git a/badges/tests.svg b/badges/tests.svg index 1227c608..b85ca4de 100644 --- a/badges/tests.svg +++ b/badges/tests.svg @@ -5,7 +5,7 @@ width="62.5" height="20" role="img" - aria-label="tests: 26" + aria-label="tests: 95" > - tests: 26 + tests: 95 @@ -42,8 +42,8 @@ - 26 - 26 + 95 + 95 diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index 439479bd..f4469801 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -184,21 +184,47 @@ def prepare_and_cache_new_job(self, new_job: WorkflowJob, opid: str, force=False return new_job def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]: - """ Get finished jobs """ + """ + Get finished jobs + Returns a tuple of successful jobs and failed jobs + Jobs are considered finished if they have a last status of "Succeeded" or "Failed" + or if they have reached the maximum number of failures + + Unfinished jobs are checked for status and updated if needed. + A checkpoint is saved after checking for finished jobs. + """ successful_jobs = [] failed_jobs = [] for job in self.job_cache: if not job.done: - status = job.job_status - if status == "Succeeded" and job.opid: + if job.workflow.last_status == "Succeeded" and job.opid: + successful_jobs.append(job) + continue + if job.workflow.last_status == "Failed" and job.workflow.failed_count >= self._MAX_FAILS: + failed_jobs.append(job) + continue + # check status + status = job.job.get_job_status() + + if status == "Succeded": + job.workflow.last_status = status successful_jobs.append(job) - elif status == "Failed" and job.opid: + continue + elif status == "Failed": + job.workflow.last_status = status + job.workflow.failed_count += 1 failed_jobs.append(job) + continue + else: + job.workflow.last_status = status + logger.debug(f"Job {job.opid} status: {status}") + self.save_checkpoint() + if successful_jobs: logger.info(f"Found {len(successful_jobs)} successful jobs.") if failed_jobs: logger.info(f"Found {len(failed_jobs)} failed jobs.") - return (successful_jobs, failed_jobs) + return successful_jobs, failed_jobs def process_successful_job(self, job: WorkflowJob) -> Database: """ Process a successful job and return a Database object """ @@ -242,7 +268,7 @@ def process_successful_job(self, job: WorkflowJob) -> Database: self.save_checkpoint() return database - def process_failed_job(self, job: WorkflowJob) -> None: + def process_failed_job(self, job: WorkflowJob) -> Optional[str]: """ Process a failed job """ if job.workflow.state.get("failed_count", 0) >= self._MAX_FAILS: logger.error(f"Job {job.opid} failed {self._MAX_FAILS} times. Skipping.") @@ -252,8 +278,9 @@ def process_failed_job(self, job: WorkflowJob) -> None: job.workflow.state["failed_count"] = job.workflow.state.get("failed_count", 0) + 1 job.workflow.state["last_status"] = job.job_status self.save_checkpoint() - logger.error(f"Job {job.opid} failed {job.workflow.state['failed_count']} times. Retrying.") - job.job.submit_job() + logger.warning(f"Job {job.opid} failed {job.workflow.state['failed_count']} times. Retrying.") + jobid = job.job.submit_job() + return jobid class RuntimeApiHandler: @@ -292,7 +319,7 @@ def update_operation(self, opid, done, meta): class Watcher: """ Watcher class for monitoring and managing jobs """ def __init__(self, site_configuration_file: Union[str, Path], state_file: Union[str, Path] = None): - self._POLL = 20 + self._POLL_INTERVAL_SEC = 60 self._MAX_FAILS = 2 self.should_skip_claim = False self.config = SiteConfig(site_configuration_file) @@ -313,16 +340,19 @@ def restore_from_checkpoint(self, state_data: Dict[str, Any] = None)-> None: def cycle(self): """ Perform a cycle of watching for unclaimed jobs, claiming jobs, and processing finished jobs """ self.restore_from_checkpoint() - if not self.should_skip_claim: - unclaimed_jobs = self.runtime_api_handler.get_unclaimed_jobs(self.config.allowed_workflows) + # if not self.should_skip_claim: - is this actually used? + unclaimed_jobs = self.runtime_api_handler.get_unclaimed_jobs(self.config.allowed_workflows) + if unclaimed_jobs: logger.info(f"Found {len(unclaimed_jobs)} unclaimed jobs.") - self.claim_jobs(unclaimed_jobs) + self.claim_jobs(unclaimed_jobs) - logger.info(f"Checking for finished jobs.") + logger.debug(f"Checking for finished jobs.") successful_jobs, failed_jobs = self.job_manager.get_finished_jobs() - logger.debug(f"Found {len(successful_jobs)} successful jobs and {len(failed_jobs)} failed jobs.") + if not successful_jobs and not failed_jobs: + logger.debug("No finished jobs found.") for job in successful_jobs: + logger.info(f"Processing successful job: {job.opid}, {job.workflow_execution_id}") job_database = self.job_manager.process_successful_job(job) # sanity checks if not job_database.data_object_set: @@ -349,9 +379,10 @@ def cycle(self): resp = self.runtime_api_handler.update_operation( job.opid, done=True, meta=job.job.metadata ) - logging.info(f"Updated operation {job.opid} response id: {resp}") + logging.info(f"Updated operation {job.opid} response id: {resp['id']}") for job in failed_jobs: + logger.info(f"Processing failed job: {job.opid}, {job.workflow_execution_id}") self.job_manager.process_failed_job(job) def watch(self): @@ -359,10 +390,11 @@ def watch(self): logger.info("Entering polling loop") while True: try: + print(".") self.cycle() except (IOError, ValueError, TypeError, AttributeError) as e: logger.exception(f"Error occurred during cycle: {e}", exc_info=True) - sleep(self._POLL) + sleep(self._POLL_INTERVAL_SEC) def claim_jobs(self, unclaimed_jobs: List[WorkflowJob] = None) -> None: """ Claim unclaimed jobs, prepare them, and submit them. Write a checkpoint after claiming jobs. """ diff --git a/nmdc_automation/workflow_automation/wfutils.py b/nmdc_automation/workflow_automation/wfutils.py index 4f4d5bcc..c63b1847 100755 --- a/nmdc_automation/workflow_automation/wfutils.py +++ b/nmdc_automation/workflow_automation/wfutils.py @@ -70,11 +70,11 @@ class CromwellRunner(JobRunnerABC): """Job runner for Cromwell""" LABEL_SUBMITTER_VALUE = "nmdcda" LABEL_PARAMETERS = ["release", "wdl", "git_repo"] - NO_SUBMIT_STATES = ["Submitted", # job is already submitted but not running + # States that indicate a job is in some active state and does not need to be submitted + NO_SUBMIT_STATES = [ + "Submitted", # job is already submitted but not running "Running", # job is already running - "Failed", # job failed "Succeeded", # job succeeded - "Aborted", # job was aborted and did not finish "Aborting" # job is in the process of being aborted "On Hold", # job is on hold and not running. It can be manually resumed later ] @@ -152,7 +152,7 @@ def submit_job(self, force: bool = False) -> Optional[str]: :param force: if True, submit the job even if it is in a state that does not require submission :return: the job id """ - status = self.get_job_status() + status = self.workflow.last_status if status in self.NO_SUBMIT_STATES and not force: logging.info(f"Job {self.job_id} in state {status}, skipping submission") return @@ -270,6 +270,18 @@ def config(self) -> Dict[str, Any]: def last_status(self) -> Optional[str]: return self.cached_state.get("last_status", None) + @last_status.setter + def last_status(self, status: str): + self.cached_state["last_status"] = status + + @property + def failed_count(self) -> int: + return self.cached_state.get("failed_count", 0) + + @failed_count.setter + def failed_count(self, count: int): + self.cached_state["failed_count"] = count + @property def nmdc_jobid(self) -> Optional[str]: return self.cached_state.get("nmdc_jobid", None) diff --git a/pytest.xml b/pytest.xml index d4d6c35d..aa997e1b 100644 --- a/pytest.xml +++ b/pytest.xml @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index b14164d5..8a0187c4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -123,10 +123,10 @@ def site_config(site_config_file): return SiteConfig(site_config_file) @fixture -def initial_state_file(fixtures_dir, tmp_path): - state_file = fixtures_dir / "initial_state.json" +def initial_state_file_1_failure(fixtures_dir, tmp_path): + state_file = fixtures_dir / "agent_state_1_failure.json" # make a working copy in tmp_path - copied_state_file = tmp_path / "initial_state.json" + copied_state_file = tmp_path / "agent_state_1_failure.json" shutil.copy(state_file, copied_state_file) return copied_state_file diff --git a/tests/fixtures/initial_state.json b/tests/fixtures/agent_state_1_failure.json similarity index 99% rename from tests/fixtures/initial_state.json rename to tests/fixtures/agent_state_1_failure.json index aa7dfc62..a56e1cba 100644 --- a/tests/fixtures/initial_state.json +++ b/tests/fixtures/agent_state_1_failure.json @@ -1,12 +1,12 @@ { "jobs": [ { - "type": "MAGs: v1.3.10", + "type": "MAGs: v1.3.12", "cromwell_jobid": "9492a397-eb30-472b-9d3b-abc123456789", "nmdc_jobid": "nmdc:66cf64b6-7462-11ef-8b84-abc123456789", "conf": { "git_repo": "https://github.com/microbiomedata/metaMAGs", - "release": "v1.3.10", + "release": "v1.3.12", "wdl": "mbin_nmdc.wdl", "activity_id": "nmdc:wfmag-11-g7msr323.1", "activity_set": "mags_activity_set", diff --git a/tests/fixtures/failed_job_state.json b/tests/fixtures/failed_job_state_2.json similarity index 100% rename from tests/fixtures/failed_job_state.json rename to tests/fixtures/failed_job_state_2.json diff --git a/tests/test_watch_nmdc.py b/tests/test_watch_nmdc.py index 902bb2ee..34f36261 100644 --- a/tests/test_watch_nmdc.py +++ b/tests/test_watch_nmdc.py @@ -21,10 +21,10 @@ # FileHandler init tests -def test_file_handler_init_from_state_file(site_config, initial_state_file, tmp_path): +def test_file_handler_init_from_state_file(site_config, initial_state_file_1_failure, tmp_path): copy_state_file = tmp_path / "copy_state.json" - shutil.copy(initial_state_file, copy_state_file) - fh = FileHandler(site_config, initial_state_file) + shutil.copy(initial_state_file_1_failure, copy_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) assert fh assert fh.state_file assert isinstance(fh.state_file, PosixPath) @@ -35,7 +35,7 @@ def test_file_handler_init_from_state_file(site_config, initial_state_file, tmp_ assert not fh.state_file # test setter - fh.state_file = initial_state_file + fh.state_file = initial_state_file_1_failure assert fh.state_file assert fh.state_file.exists() assert fh.state_file.is_file() @@ -48,9 +48,9 @@ def test_file_handler_init_from_state_file(site_config, initial_state_file, tmp_ assert fh.state_file.is_file() -def test_file_handler_init_from_config_agent_state(site_config, initial_state_file, tmp_path): +def test_file_handler_init_from_config_agent_state(site_config, initial_state_file_1_failure, tmp_path): with patch("nmdc_automation.config.siteconfig.SiteConfig.agent_state", new_callable=PropertyMock) as mock_agent_state: - mock_agent_state.return_value = initial_state_file + mock_agent_state.return_value = initial_state_file_1_failure fh = FileHandler(site_config) assert fh assert fh.state_file @@ -76,8 +76,8 @@ def test_file_handler_init_default_state(site_config): assert fh2.state_file.exists() -def test_file_handler_read_state(site_config, initial_state_file): - fh = FileHandler(site_config, initial_state_file) +def test_file_handler_read_state(site_config, initial_state_file_1_failure): + fh = FileHandler(site_config, initial_state_file_1_failure) state = fh.read_state() assert state assert isinstance(state, dict) @@ -86,8 +86,8 @@ def test_file_handler_read_state(site_config, initial_state_file): assert len(state.get("jobs")) == 1 -def test_file_handler_write_state(site_config, initial_state_file, fixtures_dir): - fh = FileHandler(site_config, initial_state_file) +def test_file_handler_write_state(site_config, initial_state_file_1_failure, fixtures_dir): + fh = FileHandler(site_config, initial_state_file_1_failure) state = fh.read_state() assert state # add new job @@ -106,7 +106,7 @@ def test_file_handler_write_state(site_config, initial_state_file, fixtures_dir) fh.write_state(state) -def test_file_handler_get_output_path(site_config, initial_state_file, fixtures_dir): +def test_file_handler_get_output_path(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange was_informed_by = "nmdc:1234" workflow_execution_id = "nmdc:56789" @@ -116,7 +116,7 @@ def test_file_handler_get_output_path(site_config, initial_state_file, fixtures_ expected_output_path = site_config.data_dir / Path(was_informed_by) / Path(workflow_execution_id) - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) # Act output_path = fh.get_output_path(mock_job) @@ -127,7 +127,7 @@ def test_file_handler_get_output_path(site_config, initial_state_file, fixtures_ assert output_path == expected_output_path -def test_file_handler_write_metadata_if_not_exists(site_config, initial_state_file, fixtures_dir, tmp_path): +def test_file_handler_write_metadata_if_not_exists(site_config, initial_state_file_1_failure, fixtures_dir, tmp_path): # Arrange was_informed_by = "nmdc:1234" workflow_execution_id = "nmdc:56789" @@ -141,7 +141,7 @@ def test_file_handler_write_metadata_if_not_exists(site_config, initial_state_fi # patch config.data_dir with patch("nmdc_automation.config.siteconfig.SiteConfig.data_dir", new_callable=PropertyMock) as mock_data_dir: mock_data_dir.return_value = tmp_path - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) # Act metadata_path = fh.write_metadata_if_not_exists(mock_job) @@ -153,18 +153,18 @@ def test_file_handler_write_metadata_if_not_exists(site_config, initial_state_fi # JobManager tests -def test_job_manager_init(site_config, initial_state_file): +def test_job_manager_init(site_config, initial_state_file_1_failure): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) assert jm assert jm.file_handler assert jm.file_handler.state_file -def test_job_manager_restore_from_state(site_config, initial_state_file): +def test_job_manager_restore_from_state(site_config, initial_state_file_1_failure): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh, init_cache=False) # Act jm.restore_from_state() @@ -174,10 +174,14 @@ def test_job_manager_restore_from_state(site_config, initial_state_file): assert len(jm.job_cache) == 1 assert isinstance(jm.job_cache[0], WorkflowJob) + # job has been cached - get new workflow jobs from state should not return any + new_jobs = jm.get_new_workflow_jobs_from_state() + assert not new_jobs -def test_job_manager_job_checkpoint(site_config, initial_state_file): + +def test_job_manager_job_checkpoint(site_config, initial_state_file_1_failure): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) # Act data = jm.job_checkpoint() @@ -189,9 +193,9 @@ def test_job_manager_job_checkpoint(site_config, initial_state_file): assert len(data.get("jobs")) == 1 -def test_job_manager_save_checkpoint(site_config, initial_state_file): +def test_job_manager_save_checkpoint(site_config, initial_state_file_1_failure): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) # Act jm.save_checkpoint() @@ -202,9 +206,9 @@ def test_job_manager_save_checkpoint(site_config, initial_state_file): # cleanup fh.state_file.unlink() -def test_job_manager_find_job_by_opid(site_config, initial_state_file): +def test_job_manager_find_job_by_opid(site_config, initial_state_file_1_failure): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) # Act job = jm.find_job_by_opid("nmdc:test-opid") @@ -215,9 +219,9 @@ def test_job_manager_find_job_by_opid(site_config, initial_state_file): assert not job.done -def test_job_manager_prepare_and_cache_new_job(site_config, initial_state_file, fixtures_dir): +def test_job_manager_prepare_and_cache_new_job(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) new_job_state = json.load(open(fixtures_dir / "new_state_job.json")) assert new_job_state @@ -234,9 +238,9 @@ def test_job_manager_prepare_and_cache_new_job(site_config, initial_state_file, jm.job_cache = [] -def test_job_manager_prepare_and_cache_new_job_force(site_config, initial_state_file, fixtures_dir): +def test_job_manager_prepare_and_cache_new_job_force(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) #already has an opid new_job_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) @@ -261,12 +265,10 @@ def test_job_manager_prepare_and_cache_new_job_force(site_config, initial_state_ assert job2.opid == opid - - -def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures_dir): +def test_job_manager_get_finished_jobs(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange - initial state has 1 failure and is not done - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) # Add a finished job: finished job is not done, but has a last_status of Succeeded @@ -278,7 +280,7 @@ def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures assert len(jm.job_cache) == 2 # add a failed job - failed_job_state = json.load(open(fixtures_dir / "failed_job_state.json")) + failed_job_state = json.load(open(fixtures_dir / "failed_job_state_2.json")) assert failed_job_state failed_job = WorkflowJob(site_config, failed_job_state) assert failed_job.job_status == "Failed" @@ -308,16 +310,14 @@ def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures jm.job_cache = [] -def test_job_manager_process_successful_job(site_config, initial_state_file, fixtures_dir): +def test_job_manager_process_successful_job(site_config, initial_state_file_1_failure, fixtures_dir): # mock job.job.get_job_metadata - use fixture cromwell/succeded_metadata.json job_metadata = json.load(open(fixtures_dir / "mags_job_metadata.json")) with patch("nmdc_automation.workflow_automation.wfutils.CromwellRunner.get_job_metadata") as mock_get_metadata: mock_get_metadata.return_value = job_metadata - - # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) new_job_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) assert new_job_state @@ -334,11 +334,51 @@ def test_job_manager_process_successful_job(site_config, initial_state_file, fix jm.job_cache = [] -def test_job_manager_process_failed_job(site_config, initial_state_file, fixtures_dir): +def test_job_manager_get_finished_jobs_1_failure(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange - fh = FileHandler(site_config, initial_state_file) + with requests_mock.Mocker() as mocker: + # Mock the GET request for the workflow status + mocker.get( + "http://localhost:8088/api/workflows/v1/9492a397-eb30-472b-9d3b-abc123456789/status", + json={"status": "Failed"} # Mocked response body + ) + fh = FileHandler(site_config, initial_state_file_1_failure) + jm = JobManager(site_config, fh) + # job handler should initialize the job_cache from the state file by default + assert jm.job_cache + assert isinstance(jm.job_cache, list) + assert len(jm.job_cache) == 1 + + successful_jobs, failed_jobs = jm.get_finished_jobs() + assert not successful_jobs + assert failed_jobs + failed_job = failed_jobs[0] + assert failed_job.job_status == "Failed" + +@mock.patch("nmdc_automation.workflow_automation.wfutils.CromwellRunner.generate_submission_files") +def test_job_manager_process_failed_job_1_failure( + mock_generate_submission_files, site_config, initial_state_file_1_failure, mock_cromwell_api): + # Arrange + mock_generate_submission_files.return_value = { + "workflowSource": "workflowSource", + "workflowDependencies": "workflowDependencies", + "workflowInputs": "workflowInputs", + "labels": "labels" + } + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) - failed_job_state = json.load(open(fixtures_dir / "failed_job_state.json")) + failed_job = jm.job_cache[0] + # Act + jobid = jm.process_failed_job(failed_job) + assert jobid + + + +def test_job_manager_process_failed_job_2_failures(site_config, initial_state_file_1_failure, fixtures_dir): + # Arrange + fh = FileHandler(site_config, initial_state_file_1_failure) + jm = JobManager(site_config, fh) + failed_job_state = json.load(open(fixtures_dir / "failed_job_state_2.json")) assert failed_job_state failed_job = WorkflowJob(site_config, failed_job_state) jm.job_cache.append(failed_job) @@ -346,6 +386,7 @@ def test_job_manager_process_failed_job(site_config, initial_state_file, fixture jm.process_failed_job(failed_job) # Assert assert failed_job.done + assert failed_job.job_status == "Failed" @fixture @@ -376,7 +417,7 @@ def test_claim_jobs(mock_submit, site_config_file, site_config, fixtures_dir): assert unclaimed_wfj.job_status -def test_runtime_manager_get_unclaimed_jobs(site_config, initial_state_file, fixtures_dir): +def test_runtime_manager_get_unclaimed_jobs(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange rt = RuntimeApiHandler(site_config) # Act diff --git a/tests/test_wfutils.py b/tests/test_wfutils.py index 37ed404a..ca9509a3 100644 --- a/tests/test_wfutils.py +++ b/tests/test_wfutils.py @@ -271,7 +271,8 @@ def test_cromwell_job_runner_submit_job_new_job(mock_generate_submission_files, wf_state_manager = WorkflowStateManager(wf_state) job_runner = CromwellRunner(site_config, wf_state_manager) - job_runner.submit_job() + jobid = job_runner.submit_job() + assert jobid def test_workflow_job_data_objects_and_execution_record_mags(site_config, fixtures_dir, tmp_path):