Skip to content

Commit

Permalink
refactoring for clarity and added test.
Browse files Browse the repository at this point in the history
  • Loading branch information
mbthornton-lbl committed Nov 21, 2024
1 parent 11fe8b9 commit 33977ca
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 53 deletions.
6 changes: 1 addition & 5 deletions nmdc_automation/workflow_automation/sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,7 @@ def cycle(self, dryrun: bool = False, skiplist: set = set(),
"""
This function does a single cycle of looking for new jobs
"""
filt = {}
if allowlist:
filt = {"was_informed_by": {"$in": list(allowlist)}}
# TODO: Quite a lot happens under the hood here. This function should be broken down into smaller
# functions to improve readability and maintainability.

wfp_nodes = load_workflow_process_nodes(self.db, self.workflows, allowlist)

self.get_existing_jobs.cache_clear()
Expand Down
41 changes: 20 additions & 21 deletions nmdc_automation/workflow_automation/workflow_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ def get_required_data_objects_map(db, workflows: List[WorkflowConfig]) -> Dict[s
for wf in workflows:
required_types.update(set(wf.data_object_types))

required_data_objs_by_id = dict()
required_data_object_map = dict()
for rec in db.data_object_set.find({"data_object_type": {"$ne": None}}):
do = DataObject(**rec)
if do.data_object_type.code.text not in required_types:
data_object = DataObject(**rec)
if data_object.data_object_type.code.text not in required_types:
continue
required_data_objs_by_id[do.id] = do
return required_data_objs_by_id
required_data_object_map[data_object.id] = data_object
return required_data_object_map


@lru_cache
Expand Down Expand Up @@ -214,29 +214,28 @@ def _resolve_relationships(wfp_nodes: List[WorkflowProcessNode], wfp_nodes_by_da
return wfp_nodes


def _associate_workflow_process_nodes_to_data_objects(wfp_nodes: List[WorkflowProcessNode], data_objs_by_id):
def _map_nodes_to_data_objects(current_nodes: List[WorkflowProcessNode], required_data_object_map):
"""
Associate the data objects with workflow process nodes
"""
wfp_nodes_by_data_object_id = dict()
for wfp_node in wfp_nodes:
for do_id in wfp_node.has_output:
if do_id in data_objs_by_id:
do = data_objs_by_id[do_id]
wfp_node.add_data_object(do)
node_data_object_map = dict()
for node in current_nodes:
for data_object_id in node.has_output:
if data_object_id in required_data_object_map:
do = required_data_object_map[data_object_id]
node.add_data_object(do)
# If its a dupe, set it to none
# so we can ignore it later.
# Once we re-id the data objects this
# Post re-id we would not expect thi
if do_id in wfp_nodes_by_data_object_id:
if do_id not in warned_objects:
logging.warning(f"Duplicate output object {do_id}")
warned_objects.add(do_id)
wfp_nodes_by_data_object_id[do_id] = None
if data_object_id in node_data_object_map:
if data_object_id not in warned_objects:
logging.warning(f"Duplicate output object {data_object_id}")
warned_objects.add(data_object_id)
node_data_object_map[data_object_id] = None
else:
wfp_nodes_by_data_object_id[do_id] = wfp_node
return wfp_nodes_by_data_object_id, wfp_nodes

node_data_object_map[data_object_id] = node
return node_data_object_map, current_nodes


def load_workflow_process_nodes(db, workflows: list[WorkflowConfig], allowlist: list[str] = None) -> List[WorkflowProcessNode]:
Expand All @@ -262,7 +261,7 @@ def load_workflow_process_nodes(db, workflows: list[WorkflowConfig], allowlist:
# the output objects to the activity that generated them.
wfp_nodes = get_current_workflow_process_nodes(db, workflows, data_objs_by_id, allowlist)

wfp_nodes_by_data_object_id, wfp_nodes = _associate_workflow_process_nodes_to_data_objects(wfp_nodes, data_objs_by_id)
wfp_nodes_by_data_object_id, wfp_nodes = _map_nodes_to_data_objects(wfp_nodes, data_objs_by_id)

# Now populate the parent and children values for the
wfp_nodes = _resolve_relationships(wfp_nodes, wfp_nodes_by_data_object_id)
Expand Down
88 changes: 61 additions & 27 deletions tests/test_workflow_process.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from pytest import mark

from nmdc_automation.workflow_automation.workflow_process import (
get_required_data_objects_map, get_current_workflow_process_nodes, load_workflow_process_nodes)
get_required_data_objects_map,
get_current_workflow_process_nodes,
load_workflow_process_nodes,
_resolve_relationships,
_map_nodes_to_data_objects
)
from nmdc_automation.workflow_automation.workflows import load_workflow_configs
from tests.fixtures.db_utils import load_fixture, reset_db

Expand All @@ -11,7 +16,7 @@
)
def test_load_workflow_process_nodes(test_db, workflow_file, workflows_config_dir):
"""
Test
Test loading workflow process nodes from the database.
"""
metatranscriptome = False
if workflow_file == "workflows-mt.yaml":
Expand All @@ -22,27 +27,56 @@ def test_load_workflow_process_nodes(test_db, workflow_file, workflows_config_di
load_fixture(test_db, "data_generation_set.json")
load_fixture(test_db, "read_qc_analysis.json", "workflow_execution_set")

wfs = load_workflow_configs(workflows_config_dir / workflow_file)
workflow_configs = load_workflow_configs(workflows_config_dir / workflow_file)

# these are called by load_activities
data_objs_by_id = get_required_data_objects_map(test_db, wfs)
wf_execs = get_current_workflow_process_nodes(test_db, wfs, data_objs_by_id)
assert wf_execs
assert len(wf_execs) == 2
# sanity checking these - they are used in the next step
data_objs_by_id = get_required_data_objects_map(test_db, workflow_configs)
current_nodes = get_current_workflow_process_nodes(test_db, workflow_configs, data_objs_by_id)
assert current_nodes
assert len(current_nodes) == 2

acts = load_workflow_process_nodes(test_db, wfs)
workflow_process_nodes = load_workflow_process_nodes(test_db, workflow_configs)
# sanity check
assert acts
assert len(acts) == 2
assert workflow_process_nodes
assert len(workflow_process_nodes) == 2

# Omics and RQC share data_object_type for metagenome and metatranscriptome
# they can be distinguished by analyte category so we expect 1 of each
# for metagenome and metatranscriptome
data_gen = [act for act in acts if act.type == "nmdc:NucleotideSequencing"][0]
assert data_gen
assert data_gen.children
assert len(data_gen.children) == 1
assert data_gen.children[0].type == "nmdc:ReadQcAnalysis"
data_generation_nodes = [node for node in workflow_process_nodes if node.type == "nmdc:NucleotideSequencing"][0]
assert data_generation_nodes
assert data_generation_nodes.children
assert len(data_generation_nodes.children) == 1
assert data_generation_nodes.children[0].type == "nmdc:ReadQcAnalysis"


def test_resolve_relationships(test_db, workflows_config_dir):
"""
Test that the relationships between workflow process nodes are resolved
"""
allow_list = ["nmdc:omprc-11-metag1",]
reset_db(test_db)
load_fixture(test_db, "data_object_set.json")
load_fixture(test_db, "data_generation_set.json")
load_fixture(test_db, "read_qc_analysis.json", "workflow_execution_set")
load_fixture(test_db, "metagenome_assembly.json", "workflow_execution_set")
load_fixture(test_db, "metagenome_annotation.json", "workflow_execution_set")

workflow_config = load_workflow_configs(workflows_config_dir / "workflows.yaml")
data_objs_by_id = get_required_data_objects_map(test_db, workflow_config)
current_nodes = get_current_workflow_process_nodes(test_db, workflow_config, data_objs_by_id)
current_nodes_by_data_object_id, current_nodes = _map_nodes_to_data_objects(
current_nodes, data_objs_by_id)
assert current_nodes

workflow_process_graph = _resolve_relationships(current_nodes, current_nodes_by_data_object_id)
assert workflow_process_graph

for node in workflow_process_graph:
if node.type == 'nmdc:NucleotideSequencing':
assert node.children
else:
assert node.parent


def test_load_workflow_process_nodes_does_not_load_metagenome_sequencing(test_db, workflows_config_dir):
Expand All @@ -55,9 +89,9 @@ def test_load_workflow_process_nodes_does_not_load_metagenome_sequencing(test_db
load_fixture(test_db, "legacy_data_generation.json", "data_generation_set")
load_fixture(test_db, "metagenome_sequencing.json", "workflow_execution_set")

wfs = load_workflow_configs(workflows_config_dir / "workflows.yaml")
data_objs_by_id = get_required_data_objects_map(test_db, wfs)
wf_execs = get_current_workflow_process_nodes(test_db, wfs, data_objs_by_id, allowlist=[exp_omprc,])
workflow_config = load_workflow_configs(workflows_config_dir / "workflows.yaml")
data_objs_by_id = get_required_data_objects_map(test_db, workflow_config)
wf_execs = get_current_workflow_process_nodes(test_db, workflow_config, data_objs_by_id, allowlist=[exp_omprc,])
# We only expect the data generation to be loaded
assert wf_execs
assert len(wf_execs) == 1
Expand All @@ -80,20 +114,20 @@ def test_load_workflows(workflows_config_dir, workflow_file):

shared_wf_names = ["Sequencing Noninterleaved", "Sequencing Interleaved"]
if metatranscriptome:
exp_num_wfs = 9
exp_num_workflow_config = 9
exp_wf_names = ["Metatranscriptome Reads QC", "Metatranscriptome Reads QC Interleave",
"Metatranscriptome Assembly", "Metatranscriptome Annotation", "Expression Analysis Antisense",
"Expression Analysis Sense", "Expression Analysis Nonstranded", ]
else:
exp_num_wfs = 8
exp_num_workflow_config = 8
exp_wf_names = ["Reads QC", "Reads QC Interleave", "Metagenome Assembly", "Metagenome Annotation", "MAGs",
"Readbased Analysis", ]

wfs = load_workflow_configs(workflows_config_dir / workflow_file)
assert wfs
workflow_config = load_workflow_configs(workflows_config_dir / workflow_file)
assert workflow_config
wfm = {}
assert len(wfs) == len(exp_wf_names) + len(shared_wf_names)
for wf in wfs:
assert len(workflow_config) == len(exp_wf_names) + len(shared_wf_names)
for wf in workflow_config:
wfm[wf.name] = wf
for wf_name in exp_wf_names:
assert wf_name in wfm
Expand All @@ -120,9 +154,9 @@ def test_get_required_data_objects_by_id(test_db, workflows_config_dir, workflow
reset_db(test_db)
load_fixture(test_db, "data_object_set.json")

wfs = load_workflow_configs(workflows_config_dir / workflow_file)
workflow_config = load_workflow_configs(workflows_config_dir / workflow_file)

required_data_object_map = get_required_data_objects_map(test_db, wfs)
required_data_object_map = get_required_data_objects_map(test_db, workflow_config)
assert required_data_object_map
# get a unique list of the data object types
do_types = set()
Expand Down

0 comments on commit 33977ca

Please sign in to comment.