Skip to content

Commit

Permalink
update get_finished_jobs to use last_status
Browse files Browse the repository at this point in the history
  • Loading branch information
mbthornton-lbl committed Nov 26, 2024
1 parent 13257a9 commit edc3321
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 50 deletions.
8 changes: 4 additions & 4 deletions nmdc_automation/workflow_automation/watch_nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,16 @@ def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]:
failed_jobs = []
for job in self.job_cache:
if not job.done:
status = job.job_status
if status == "Succeeded" and job.opid:
last_status = job.workflow.last_status
if last_status == "Succeeded" and job.opid:
successful_jobs.append(job)
elif status == "Failed" and job.opid:
elif last_status == "Failed" and job.opid:
failed_jobs.append(job)
if successful_jobs:
logger.info(f"Found {len(successful_jobs)} successful jobs.")
if failed_jobs:
logger.info(f"Found {len(failed_jobs)} failed jobs.")
return (successful_jobs, failed_jobs)
return successful_jobs, failed_jobs

def process_successful_job(self, job: WorkflowJob) -> Database:
""" Process a successful job and return a Database object """
Expand Down
6 changes: 3 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ def site_config(site_config_file):
return SiteConfig(site_config_file)

@fixture
def initial_state_file(fixtures_dir, tmp_path):
state_file = fixtures_dir / "initial_state.json"
def initial_state_file_1_failure(fixtures_dir, tmp_path):
state_file = fixtures_dir / "agent_state_1_failure.json"
# make a working copy in tmp_path
copied_state_file = tmp_path / "initial_state.json"
copied_state_file = tmp_path / "agent_state_1_failure.json"
shutil.copy(state_file, copied_state_file)
return copied_state_file

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
{
"jobs": [
{
"type": "MAGs: v1.3.10",
"type": "MAGs: v1.3.12",
"cromwell_jobid": "9492a397-eb30-472b-9d3b-abc123456789",
"nmdc_jobid": "nmdc:66cf64b6-7462-11ef-8b84-abc123456789",
"conf": {
"git_repo": "https://github.com/microbiomedata/metaMAGs",
"release": "v1.3.10",
"release": "v1.3.12",
"wdl": "mbin_nmdc.wdl",
"activity_id": "nmdc:wfmag-11-g7msr323.1",
"activity_set": "mags_activity_set",
Expand Down
File renamed without changes.
97 changes: 56 additions & 41 deletions tests/test_watch_nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@


# FileHandler init tests
def test_file_handler_init_from_state_file(site_config, initial_state_file, tmp_path):
def test_file_handler_init_from_state_file(site_config, initial_state_file_1_failure, tmp_path):
copy_state_file = tmp_path / "copy_state.json"
shutil.copy(initial_state_file, copy_state_file)
fh = FileHandler(site_config, initial_state_file)
shutil.copy(initial_state_file_1_failure, copy_state_file)
fh = FileHandler(site_config, initial_state_file_1_failure)
assert fh
assert fh.state_file
assert isinstance(fh.state_file, PosixPath)
Expand All @@ -35,7 +35,7 @@ def test_file_handler_init_from_state_file(site_config, initial_state_file, tmp_
assert not fh.state_file

# test setter
fh.state_file = initial_state_file
fh.state_file = initial_state_file_1_failure
assert fh.state_file
assert fh.state_file.exists()
assert fh.state_file.is_file()
Expand All @@ -48,9 +48,9 @@ def test_file_handler_init_from_state_file(site_config, initial_state_file, tmp_
assert fh.state_file.is_file()


def test_file_handler_init_from_config_agent_state(site_config, initial_state_file, tmp_path):
def test_file_handler_init_from_config_agent_state(site_config, initial_state_file_1_failure, tmp_path):
with patch("nmdc_automation.config.siteconfig.SiteConfig.agent_state", new_callable=PropertyMock) as mock_agent_state:
mock_agent_state.return_value = initial_state_file
mock_agent_state.return_value = initial_state_file_1_failure
fh = FileHandler(site_config)
assert fh
assert fh.state_file
Expand All @@ -76,8 +76,8 @@ def test_file_handler_init_default_state(site_config):
assert fh2.state_file.exists()


def test_file_handler_read_state(site_config, initial_state_file):
fh = FileHandler(site_config, initial_state_file)
def test_file_handler_read_state(site_config, initial_state_file_1_failure):
fh = FileHandler(site_config, initial_state_file_1_failure)
state = fh.read_state()
assert state
assert isinstance(state, dict)
Expand All @@ -86,8 +86,8 @@ def test_file_handler_read_state(site_config, initial_state_file):
assert len(state.get("jobs")) == 1


def test_file_handler_write_state(site_config, initial_state_file, fixtures_dir):
fh = FileHandler(site_config, initial_state_file)
def test_file_handler_write_state(site_config, initial_state_file_1_failure, fixtures_dir):
fh = FileHandler(site_config, initial_state_file_1_failure)
state = fh.read_state()
assert state
# add new job
Expand All @@ -106,7 +106,7 @@ def test_file_handler_write_state(site_config, initial_state_file, fixtures_dir)
fh.write_state(state)


def test_file_handler_get_output_path(site_config, initial_state_file, fixtures_dir):
def test_file_handler_get_output_path(site_config, initial_state_file_1_failure, fixtures_dir):
# Arrange
was_informed_by = "nmdc:1234"
workflow_execution_id = "nmdc:56789"
Expand All @@ -116,7 +116,7 @@ def test_file_handler_get_output_path(site_config, initial_state_file, fixtures_

expected_output_path = site_config.data_dir / Path(was_informed_by) / Path(workflow_execution_id)

fh = FileHandler(site_config, initial_state_file)
fh = FileHandler(site_config, initial_state_file_1_failure)

# Act
output_path = fh.get_output_path(mock_job)
Expand All @@ -127,7 +127,7 @@ def test_file_handler_get_output_path(site_config, initial_state_file, fixtures_
assert output_path == expected_output_path


def test_file_handler_write_metadata_if_not_exists(site_config, initial_state_file, fixtures_dir, tmp_path):
def test_file_handler_write_metadata_if_not_exists(site_config, initial_state_file_1_failure, fixtures_dir, tmp_path):
# Arrange
was_informed_by = "nmdc:1234"
workflow_execution_id = "nmdc:56789"
Expand All @@ -141,7 +141,7 @@ def test_file_handler_write_metadata_if_not_exists(site_config, initial_state_fi
# patch config.data_dir
with patch("nmdc_automation.config.siteconfig.SiteConfig.data_dir", new_callable=PropertyMock) as mock_data_dir:
mock_data_dir.return_value = tmp_path
fh = FileHandler(site_config, initial_state_file)
fh = FileHandler(site_config, initial_state_file_1_failure)

# Act
metadata_path = fh.write_metadata_if_not_exists(mock_job)
Expand All @@ -153,18 +153,18 @@ def test_file_handler_write_metadata_if_not_exists(site_config, initial_state_fi


# JobManager tests
def test_job_manager_init(site_config, initial_state_file):
def test_job_manager_init(site_config, initial_state_file_1_failure):
# Arrange
fh = FileHandler(site_config, initial_state_file)
fh = FileHandler(site_config, initial_state_file_1_failure)
jm = JobManager(site_config, fh)
assert jm
assert jm.file_handler
assert jm.file_handler.state_file


def test_job_manager_restore_from_state(site_config, initial_state_file):
def test_job_manager_restore_from_state(site_config, initial_state_file_1_failure):
# Arrange
fh = FileHandler(site_config, initial_state_file)
fh = FileHandler(site_config, initial_state_file_1_failure)
jm = JobManager(site_config, fh, init_cache=False)
# Act
jm.restore_from_state()
Expand All @@ -174,10 +174,14 @@ def test_job_manager_restore_from_state(site_config, initial_state_file):
assert len(jm.job_cache) == 1
assert isinstance(jm.job_cache[0], WorkflowJob)

# job has been cached - get new workflow jobs from state should not return any
new_jobs = jm.get_new_workflow_jobs_from_state()
assert not new_jobs

def test_job_manager_job_checkpoint(site_config, initial_state_file):

def test_job_manager_job_checkpoint(site_config, initial_state_file_1_failure):
# Arrange
fh = FileHandler(site_config, initial_state_file)
fh = FileHandler(site_config, initial_state_file_1_failure)
jm = JobManager(site_config, fh)
# Act
data = jm.job_checkpoint()
Expand All @@ -189,9 +193,9 @@ def test_job_manager_job_checkpoint(site_config, initial_state_file):
assert len(data.get("jobs")) == 1


def test_job_manager_save_checkpoint(site_config, initial_state_file):
def test_job_manager_save_checkpoint(site_config, initial_state_file_1_failure):
# Arrange
fh = FileHandler(site_config, initial_state_file)
fh = FileHandler(site_config, initial_state_file_1_failure)
jm = JobManager(site_config, fh)
# Act
jm.save_checkpoint()
Expand All @@ -202,9 +206,9 @@ def test_job_manager_save_checkpoint(site_config, initial_state_file):
# cleanup
fh.state_file.unlink()

def test_job_manager_find_job_by_opid(site_config, initial_state_file):
def test_job_manager_find_job_by_opid(site_config, initial_state_file_1_failure):
# Arrange
fh = FileHandler(site_config, initial_state_file)
fh = FileHandler(site_config, initial_state_file_1_failure)
jm = JobManager(site_config, fh)
# Act
job = jm.find_job_by_opid("nmdc:test-opid")
Expand All @@ -215,9 +219,9 @@ def test_job_manager_find_job_by_opid(site_config, initial_state_file):
assert not job.done


def test_job_manager_prepare_and_cache_new_job(site_config, initial_state_file, fixtures_dir):
def test_job_manager_prepare_and_cache_new_job(site_config, initial_state_file_1_failure, fixtures_dir):
# Arrange
fh = FileHandler(site_config, initial_state_file)
fh = FileHandler(site_config, initial_state_file_1_failure)
jm = JobManager(site_config, fh)
new_job_state = json.load(open(fixtures_dir / "new_state_job.json"))
assert new_job_state
Expand All @@ -234,9 +238,9 @@ def test_job_manager_prepare_and_cache_new_job(site_config, initial_state_file,
jm.job_cache = []


def test_job_manager_prepare_and_cache_new_job_force(site_config, initial_state_file, fixtures_dir):
def test_job_manager_prepare_and_cache_new_job_force(site_config, initial_state_file_1_failure, fixtures_dir):
# Arrange
fh = FileHandler(site_config, initial_state_file)
fh = FileHandler(site_config, initial_state_file_1_failure)
jm = JobManager(site_config, fh)
#already has an opid
new_job_state = json.load(open(fixtures_dir / "mags_workflow_state.json"))
Expand All @@ -261,12 +265,10 @@ def test_job_manager_prepare_and_cache_new_job_force(site_config, initial_state_
assert job2.opid == opid




def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures_dir):
def test_job_manager_get_finished_jobs(site_config, initial_state_file_1_failure, fixtures_dir):

# Arrange - initial state has 1 failure and is not done
fh = FileHandler(site_config, initial_state_file)
fh = FileHandler(site_config, initial_state_file_1_failure)
jm = JobManager(site_config, fh)

# Add a finished job: finished job is not done, but has a last_status of Succeeded
Expand All @@ -278,7 +280,7 @@ def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures
assert len(jm.job_cache) == 2

# add a failed job
failed_job_state = json.load(open(fixtures_dir / "failed_job_state.json"))
failed_job_state = json.load(open(fixtures_dir / "failed_job_state_2.json"))
assert failed_job_state
failed_job = WorkflowJob(site_config, failed_job_state)
assert failed_job.job_status == "Failed"
Expand Down Expand Up @@ -308,16 +310,14 @@ def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures
jm.job_cache = []


def test_job_manager_process_successful_job(site_config, initial_state_file, fixtures_dir):
def test_job_manager_process_successful_job(site_config, initial_state_file_1_failure, fixtures_dir):
# mock job.job.get_job_metadata - use fixture cromwell/succeded_metadata.json
job_metadata = json.load(open(fixtures_dir / "mags_job_metadata.json"))
with patch("nmdc_automation.workflow_automation.wfutils.CromwellRunner.get_job_metadata") as mock_get_metadata:
mock_get_metadata.return_value = job_metadata



# Arrange
fh = FileHandler(site_config, initial_state_file)
fh = FileHandler(site_config, initial_state_file_1_failure)
jm = JobManager(site_config, fh)
new_job_state = json.load(open(fixtures_dir / "mags_workflow_state.json"))
assert new_job_state
Expand All @@ -334,18 +334,33 @@ def test_job_manager_process_successful_job(site_config, initial_state_file, fix
jm.job_cache = []


def test_job_manager_process_failed_job(site_config, initial_state_file, fixtures_dir):
def test_job_manager_process_failed_job_1_failure(site_config, initial_state_file_1_failure, fixtures_dir):
# Arrange
fh = FileHandler(site_config, initial_state_file_1_failure)
jm = JobManager(site_config, fh)
# job handler should initialize the job_cache from the state file by default
assert jm.job_cache
assert isinstance(jm.job_cache, list)
assert len(jm.job_cache) == 1

successful_jobs, failed_jobs = jm.get_finished_jobs()
assert not successful_jobs
assert failed_jobs


def test_job_manager_process_failed_job_2_failures(site_config, initial_state_file_1_failure, fixtures_dir):
# Arrange
fh = FileHandler(site_config, initial_state_file)
fh = FileHandler(site_config, initial_state_file_1_failure)
jm = JobManager(site_config, fh)
failed_job_state = json.load(open(fixtures_dir / "failed_job_state.json"))
failed_job_state = json.load(open(fixtures_dir / "failed_job_state_2.json"))
assert failed_job_state
failed_job = WorkflowJob(site_config, failed_job_state)
jm.job_cache.append(failed_job)
# Act
jm.process_failed_job(failed_job)
# Assert
assert failed_job.done
assert failed_job.job_status == "Failed"


@fixture
Expand Down Expand Up @@ -376,7 +391,7 @@ def test_claim_jobs(mock_submit, site_config_file, site_config, fixtures_dir):
assert unclaimed_wfj.job_status


def test_runtime_manager_get_unclaimed_jobs(site_config, initial_state_file, fixtures_dir):
def test_runtime_manager_get_unclaimed_jobs(site_config, initial_state_file_1_failure, fixtures_dir):
# Arrange
rt = RuntimeApiHandler(site_config)
# Act
Expand Down

0 comments on commit edc3321

Please sign in to comment.