From a90825bbe41e33e167878242e2801d5e6914e913 Mon Sep 17 00:00:00 2001 From: Joel Klinger Date: Fri, 22 Nov 2024 09:44:04 +0000 Subject: [PATCH] [feature/PI-618-bulk_etl] bulk etl local --- .../extract_bulk}/__init__.py | 0 .../worker/bulk/extract_bulk/extract_bulk.py | 79 ++ .../extract_bulk}/make/make.py | 0 .../tests/extract_bulk_input.ldif | 165 +++ .../tests/extract_bulk_output.json | 209 ++++ .../tests/test_extract_bulk_worker.py | 85 ++ .../worker/{ => bulk}/load_bulk/__init__.py | 0 .../worker/{ => bulk}/load_bulk/load_bulk.py | 2 +- .../worker/{ => bulk}/load_bulk/make/make.py | 0 .../load_bulk/tests/test_load_bulk_worker.py | 195 ++++ .../load_bulk_reduce/load_bulk_reduce.py | 0 .../{ => bulk}/load_bulk_reduce/make/make.py | 0 .../tests/_test_load_bulk_reduce.py | 0 .../transform_bulk}/__init__.py | 0 .../transform_bulk}/make/make.py | 0 .../tests/test_transform_bulk_worker.py | 176 ++++ .../tests/transform_bulk_output.json | 941 ++++++++++++++++++ .../transform_bulk/transform_bulk.py | 73 +- .../worker/{ => bulk}/transform_bulk/utils.py | 0 .../load_bulk/tests/_test_load_bulk_worker.py | 225 ----- .../tests/_test_transform_bulk_worker.py | 300 ------ .../extract_update}/__init__.py | 0 .../extract_update/extract_update.py} | 0 .../extract_update}/make/make.py | 0 .../tests/_test_extract_update_worker.py} | 26 +- .../load_update}/__init__.py | 0 .../{ => update}/load_update/load_update.py | 0 .../load_update}/make/make.py | 0 .../tests/_test_load_update_worker.py | 14 +- .../update/transform_update/__init__.py | 0 .../update/transform_update/make/make.py | 4 + .../tests/_test_transform_update_worker.py | 13 +- .../transform_update/transform_update.py | 3 +- .../{ => update}/transform_update/utils.py | 0 src/layers/domain/core/device/v1.py | 4 - 35 files changed, 1937 insertions(+), 577 deletions(-) rename src/etl/sds/worker/{extract => bulk/extract_bulk}/__init__.py (100%) create mode 100644 src/etl/sds/worker/bulk/extract_bulk/extract_bulk.py rename src/etl/sds/worker/{extract => bulk/extract_bulk}/make/make.py (100%) create mode 100644 src/etl/sds/worker/bulk/extract_bulk/tests/extract_bulk_input.ldif create mode 100644 src/etl/sds/worker/bulk/extract_bulk/tests/extract_bulk_output.json create mode 100644 src/etl/sds/worker/bulk/extract_bulk/tests/test_extract_bulk_worker.py rename src/etl/sds/worker/{ => bulk}/load_bulk/__init__.py (100%) rename src/etl/sds/worker/{ => bulk}/load_bulk/load_bulk.py (97%) rename src/etl/sds/worker/{ => bulk}/load_bulk/make/make.py (100%) create mode 100644 src/etl/sds/worker/bulk/load_bulk/tests/test_load_bulk_worker.py rename src/etl/sds/worker/{ => bulk}/load_bulk_reduce/load_bulk_reduce.py (100%) rename src/etl/sds/worker/{ => bulk}/load_bulk_reduce/make/make.py (100%) rename src/etl/sds/worker/{ => bulk}/load_bulk_reduce/tests/_test_load_bulk_reduce.py (100%) rename src/etl/sds/worker/{load_update => bulk/transform_bulk}/__init__.py (100%) rename src/etl/sds/worker/{load_update => bulk/transform_bulk}/make/make.py (100%) create mode 100644 src/etl/sds/worker/bulk/transform_bulk/tests/test_transform_bulk_worker.py create mode 100644 src/etl/sds/worker/bulk/transform_bulk/tests/transform_bulk_output.json rename src/etl/sds/worker/{ => bulk}/transform_bulk/transform_bulk.py (52%) rename src/etl/sds/worker/{ => bulk}/transform_bulk/utils.py (100%) delete mode 100644 src/etl/sds/worker/load_bulk/tests/_test_load_bulk_worker.py delete mode 100644 src/etl/sds/worker/transform_bulk/tests/_test_transform_bulk_worker.py rename src/etl/sds/worker/{transform_bulk => update/extract_update}/__init__.py (100%) rename src/etl/sds/worker/{extract/extract.py => update/extract_update/extract_update.py} (100%) rename src/etl/sds/worker/{transform_bulk => update/extract_update}/make/make.py (100%) rename src/etl/sds/worker/{extract/tests/test_extract_worker.py => update/extract_update/tests/_test_extract_update_worker.py} (93%) rename src/etl/sds/worker/{transform_update => update/load_update}/__init__.py (100%) rename src/etl/sds/worker/{ => update}/load_update/load_update.py (100%) rename src/etl/sds/worker/{transform_update => update/load_update}/make/make.py (100%) rename src/etl/sds/worker/{ => update}/load_update/tests/_test_load_update_worker.py (98%) create mode 100644 src/etl/sds/worker/update/transform_update/__init__.py create mode 100644 src/etl/sds/worker/update/transform_update/make/make.py rename src/etl/sds/worker/{ => update}/transform_update/tests/_test_transform_update_worker.py (99%) rename src/etl/sds/worker/{ => update}/transform_update/transform_update.py (99%) rename src/etl/sds/worker/{ => update}/transform_update/utils.py (100%) diff --git a/src/etl/sds/worker/extract/__init__.py b/src/etl/sds/worker/bulk/extract_bulk/__init__.py similarity index 100% rename from src/etl/sds/worker/extract/__init__.py rename to src/etl/sds/worker/bulk/extract_bulk/__init__.py diff --git a/src/etl/sds/worker/bulk/extract_bulk/extract_bulk.py b/src/etl/sds/worker/bulk/extract_bulk/extract_bulk.py new file mode 100644 index 000000000..7f0daf5c9 --- /dev/null +++ b/src/etl/sds/worker/bulk/extract_bulk/extract_bulk.py @@ -0,0 +1,79 @@ +import json +from collections import deque +from dataclasses import asdict +from io import BytesIO +from typing import TYPE_CHECKING + +import boto3 +from etl_utils.constants import WorkerKey +from etl_utils.io import EtlEncoder, pkl_dump_lz4 +from etl_utils.ldif.ldif import filter_and_group_ldif_from_s3_by_property, parse_ldif +from etl_utils.worker.action import apply_action +from etl_utils.worker.model import WorkerActionResponse, WorkerEnvironment +from etl_utils.worker.worker_step_chain import execute_step_chain +from event.json import json_loads +from nhs_context_logging import log_action +from sds.domain.constants import FILTER_TERMS +from sds.domain.parse import parse_sds_record + +_log_action_without_inputs = lambda function: log_action(log_args=[], log_result=False)( + function +) + +if TYPE_CHECKING: + from mypy_boto3_s3 import S3Client + + +S3_CLIENT = boto3.client("s3") +ENVIRONMENT = WorkerEnvironment.build() + + +@_log_action_without_inputs +def _read(s3_client: "S3Client", s3_input_path: str) -> deque[tuple[dict]]: + filtered_ldif_by_group = filter_and_group_ldif_from_s3_by_property( + s3_path=s3_input_path, + filter_terms=FILTER_TERMS, + group_field="nhsMhsPartyKey", + s3_client=s3_client, + ) + return deque( + tuple(parse_ldif(file_opener=BytesIO, path_or_data=filtered_ldif)) + for filtered_ldif in filtered_ldif_by_group + ) + + +def extract( + s3_client: "S3Client", s3_input_path: str, s3_output_path: str, max_records: int +) -> WorkerActionResponse: + unprocessed_records = _read(s3_client=s3_client, s3_input_path=s3_input_path) + + processed_records = [] + + exception = apply_action( + unprocessed_records=unprocessed_records, + processed_records=processed_records, + action=lambda record: [[parse_sds_record(*r).dict() for r in record]], + record_serializer=lambda dns_and_records: json_loads( + json.dumps([r[1] for r in dns_and_records], cls=EtlEncoder) + ), + ) + + return WorkerActionResponse( + unprocessed_records=unprocessed_records, + processed_records=processed_records, + exception=exception, + s3_input_path=s3_input_path, + s3_output_path=s3_output_path, + ) + + +def handler(event, context): + response = execute_step_chain( + action=extract, + s3_client=S3_CLIENT, + s3_input_path=ENVIRONMENT.s3_path(WorkerKey.EXTRACT), + s3_output_path=ENVIRONMENT.s3_path(WorkerKey.TRANSFORM), + unprocessed_dumper=lambda **kwargs: None, + processed_dumper=pkl_dump_lz4, + ) + return asdict(response) diff --git a/src/etl/sds/worker/extract/make/make.py b/src/etl/sds/worker/bulk/extract_bulk/make/make.py similarity index 100% rename from src/etl/sds/worker/extract/make/make.py rename to src/etl/sds/worker/bulk/extract_bulk/make/make.py diff --git a/src/etl/sds/worker/bulk/extract_bulk/tests/extract_bulk_input.ldif b/src/etl/sds/worker/bulk/extract_bulk/tests/extract_bulk_input.ldif new file mode 100644 index 000000000..bd04e7ee3 --- /dev/null +++ b/src/etl/sds/worker/bulk/extract_bulk/tests/extract_bulk_input.ldif @@ -0,0 +1,165 @@ +dn: uniqueIdentifier=00000000000a,ou=Services,o=nhs +objectClass: nhsMhs +objectClass: top +nhsApproverURP: myApprover +nhsDateApproved: 20010101010101 +nhsDateDNSApproved: 20010101010101 +nhsDateRequested: 20010101010101 +nhsDNSApprover: myApprover +nhsIDCode: AAA +nhsMHSAckRequested: never +nhsMhsCPAId: 00000000000a +nhsMHSDuplicateElimination: never +nhsMHSEndPoint: https://test.C3O9X.nhs.uk/ +nhsMhsFQDN: test.C3O9X.nhs.uk +nhsMHsIN: READ_PRACTITIONER_ROLE_R4_V001 +nhsMhsIPAddress: 0.0.0.0 +nhsMhsManufacturerOrg: LSP04 +nhsMHSPartyKey: AAA-111111 +nhsMHsSN: urn:nhs:names:services:ers +nhsMhsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V001 +nhsProductKey: 111 +nhsProductName: My AAA Product +nhsProductVersion: 2005.02 +nhsRequestorURP: myRequestor +uniqueIdentifier: 00000000000a +nhsContractPropertyTemplateKey: 14 +nhsEPInteractionType: FHIR +nhsMHSIsAuthenticated: none + +dn: uniqueIdentifier=000000000001,ou=Services,o=nhs +objectClass: nhsAS +objectClass: top +nhsApproverURP: myApprover +nhsAsClient: AAA +nhsAsSvcIA: urn:nhs:names:services:pds:QUPA_IN040000UK01 +nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V001 +nhsDateApproved: 20010101010101 +nhsDateRequested: 20010101010101 +nhsIDCode: AAA +nhsMhsManufacturerOrg: LSP04 +nhsMHSPartyKey: AAA-111111 +nhsProductKey: 111 +nhsProductName: My AAA Product +nhsProductVersion: 2005.02 +nhsRequestorURP: myRequestor +nhsTempUid: 111 +uniqueIdentifier: 000000000001 + +dn: uniqueIdentifier=000000000002,ou=Services,o=nhs +objectClass: nhsAS +objectClass: top +nhsApproverURP: myApprover +nhsAsClient: BBB +nhsAsSvcIA: urn:nhs:names:services:pds:QUPA_IN040000UK02 +nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V002 +nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V003 +nhsDateApproved: 20020202020202 +nhsDateRequested: 20020202020202 +nhsIDCode: BBB +nhsMhsManufacturerOrg: LSP04 +nhsMHSPartyKey: BBB-111111 +nhsProductKey: 222 +nhsProductName: My BBB Product +nhsProductVersion: 2005.02 +nhsRequestorURP: myRequestor +nhsTempUid: 222 +uniqueIdentifier: 000000000002 + +dn: uniqueIdentifier=00000000000b,ou=Services,o=nhs +objectClass: nhsMhs +objectClass: top +nhsApproverURP: myApprover +nhsDateApproved: 20020202020202 +nhsDateDNSApproved: 20020202020202 +nhsDateRequested: 20020202020202 +nhsDNSApprover: myApprover +nhsIDCode: BBB +nhsMHSAckRequested: never +nhsMhsCPAId: 00000000000b +nhsMHSDuplicateElimination: never +nhsMHSEndPoint: https://test.C3O9X.nhs.uk/ +nhsMhsFQDN: test.C3O9X.nhs.uk +nhsMHsIN: READ_PRACTITIONER_ROLE_R4_V002 +nhsMhsIPAddress: 0.0.0.0 +nhsMhsManufacturerOrg: LSP04 +nhsMHSPartyKey: BBB-111111 +nhsMHsSN: urn:nhs:names:services:ers +nhsMhsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V002 +nhsProductKey: 111 +nhsProductName: My BBB Product +nhsProductVersion: 2005.02 +nhsRequestorURP: myRequestor +uniqueIdentifier: 00000000000b +nhsContractPropertyTemplateKey: 14 +nhsEPInteractionType: FHIR +nhsMHSIsAuthenticated: none + +dn: uniqueIdentifier=000000000003,ou=Services,o=nhs +objectClass: nhsAS +objectClass: top +nhsApproverURP: myApprover +nhsAsClient: BBB +nhsAsSvcIA: urn:nhs:names:services:pds:QUPA_IN040000UK03 +nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V002 +nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V003 +nhsDateApproved: 20030303030303 +nhsDateRequested: 20030303030303 +nhsIDCode: BBB +nhsMhsManufacturerOrg: LSP04 +nhsMHSPartyKey: BBB-111111 +nhsProductKey: 333 +nhsProductName: My BBB Product +nhsProductVersion: 2005.02 +nhsRequestorURP: myRequestor +nhsTempUid: 333 +uniqueIdentifier: 000000000003 + +dn: uniqueIdentifier=000000000004,ou=Services,o=nhs +objectClass: nhsAS +objectClass: top +nhsApproverURP: myApprover +nhsAsClient: AAA +nhsAsSvcIA: urn:nhs:names:services:pds:QUPA_IN040000UK04 +nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V001 +nhsDateApproved: 20040404040404 +nhsDateRequested: 20040404040404 +nhsIDCode: AAA +nhsMhsManufacturerOrg: LSP04 +nhsMHSPartyKey: AAA-111111 +nhsProductKey: 444 +nhsProductName: My AAA Product +nhsProductVersion: 2005.02 +nhsRequestorURP: myRequestor +nhsTempUid: 444 +uniqueIdentifier: 000000000004 + + +dn: uniqueIdentifier=00000000000c,ou=Services,o=nhs +objectClass: nhsMhs +objectClass: top +nhsApproverURP: myApprover +nhsDateApproved: 20020202020202 +nhsDateDNSApproved: 20020202020202 +nhsDateRequested: 20020202020202 +nhsDNSApprover: myApprover +nhsIDCode: BBB +nhsMHSAckRequested: never +nhsMhsCPAId: 00000000000c +nhsMHSDuplicateElimination: never +nhsMHSEndPoint: https://test.C3O9X.nhs.uk/ +nhsMhsFQDN: test.C3O9X.nhs.uk +nhsMHsIN: READ_PRACTITIONER_ROLE_R4_V002 +nhsMhsIPAddress: 0.0.0.0 +nhsMhsManufacturerOrg: LSP04 +nhsMHSPartyKey: BBB-111111 +nhsMHsSN: urn:nhs:names:services:ers +nhsMhsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V003 +nhsProductKey: 111 +nhsProductName: My BBB Product +nhsProductVersion: 2005.02 +nhsRequestorURP: myRequestor +uniqueIdentifier: 00000000000c +nhsContractPropertyTemplateKey: 14 +nhsEPInteractionType: FHIR +nhsMHSIsAuthenticated: none diff --git a/src/etl/sds/worker/bulk/extract_bulk/tests/extract_bulk_output.json b/src/etl/sds/worker/bulk/extract_bulk/tests/extract_bulk_output.json new file mode 100644 index 000000000..e97a7ad78 --- /dev/null +++ b/src/etl/sds/worker/bulk/extract_bulk/tests/extract_bulk_output.json @@ -0,0 +1,209 @@ +[ + [ + { + "change_type": "add", + "nhs_approver_urp": "myApprover", + "nhs_contract_property_template_key": "14", + "nhs_date_approved": "20010101010101", + "nhs_date_dns_approved": "20010101010101", + "nhs_date_requested": "20010101010101", + "nhs_dns_approver": "myApprover", + "nhs_ep_interaction_type": "fhir", + "nhs_id_code": "AAA", + "nhs_mhs_ack_requested": "never", + "nhs_mhs_actor": null, + "nhs_mhs_cpa_id": "00000000000a", + "nhs_mhs_duplicate_elimination": "never", + "nhs_mhs_end_point": "https://test.C3O9X.nhs.uk/", + "nhs_mhs_fqdn": "test.C3O9X.nhs.uk", + "nhs_mhs_in": "READ_PRACTITIONER_ROLE_R4_V001", + "nhs_mhs_ip_address": "0.0.0.0", + "nhs_mhs_is_authenticated": "none", + "nhs_mhs_manufacturer_org": "LSP04", + "nhs_mhs_party_key": "AAA-111111", + "nhs_mhs_persist_duration": null, + "nhs_mhs_retries": null, + "nhs_mhs_retry_interval": null, + "nhs_mhs_service_description": null, + "nhs_mhs_sn": "urn:nhs:names:services:ers", + "nhs_mhs_svc_ia": "urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V001", + "nhs_mhs_sync_reply_mode": null, + "nhs_product_key": "111", + "nhs_product_name": "My AAA Product", + "nhs_product_version": "2005.02", + "nhs_requestor_urp": "myRequestor", + "object_class": "nhsMhs", + "unique_identifier": "00000000000a" + }, + { + "change_type": "add", + "description": null, + "nhs_approver_urp": "myApprover", + "nhs_as_acf": null, + "nhs_as_category_bag": null, + "nhs_as_client": ["AAA"], + "nhs_as_svc_ia": [ + "urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V001", + "urn:nhs:names:services:pds:QUPA_IN040000UK01" + ], + "nhs_date_approved": "20010101010101", + "nhs_date_requested": "20010101010101", + "nhs_id_code": "AAA", + "nhs_mhs_manufacturer_org": "LSP04", + "nhs_mhs_party_key": "AAA-111111", + "nhs_product_key": "111", + "nhs_product_name": "My AAA Product", + "nhs_product_version": "2005.02", + "nhs_requestor_urp": "myRequestor", + "nhs_temp_uid": "111", + "object_class": "nhsAS", + "unique_identifier": "000000000001" + }, + { + "change_type": "add", + "description": null, + "nhs_approver_urp": "myApprover", + "nhs_as_acf": null, + "nhs_as_category_bag": null, + "nhs_as_client": ["AAA"], + "nhs_as_svc_ia": [ + "urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V001", + "urn:nhs:names:services:pds:QUPA_IN040000UK04" + ], + "nhs_date_approved": "20040404040404", + "nhs_date_requested": "20040404040404", + "nhs_id_code": "AAA", + "nhs_mhs_manufacturer_org": "LSP04", + "nhs_mhs_party_key": "AAA-111111", + "nhs_product_key": "444", + "nhs_product_name": "My AAA Product", + "nhs_product_version": "2005.02", + "nhs_requestor_urp": "myRequestor", + "nhs_temp_uid": "444", + "object_class": "nhsAS", + "unique_identifier": "000000000004" + } + ], + [ + { + "change_type": "add", + "description": null, + "nhs_approver_urp": "myApprover", + "nhs_as_acf": null, + "nhs_as_category_bag": null, + "nhs_as_client": ["BBB"], + "nhs_as_svc_ia": [ + "urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V002", + "urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V003", + "urn:nhs:names:services:pds:QUPA_IN040000UK02" + ], + "nhs_date_approved": "20020202020202", + "nhs_date_requested": "20020202020202", + "nhs_id_code": "BBB", + "nhs_mhs_manufacturer_org": "LSP04", + "nhs_mhs_party_key": "BBB-111111", + "nhs_product_key": "222", + "nhs_product_name": "My BBB Product", + "nhs_product_version": "2005.02", + "nhs_requestor_urp": "myRequestor", + "nhs_temp_uid": "222", + "object_class": "nhsAS", + "unique_identifier": "000000000002" + }, + { + "change_type": "add", + "nhs_approver_urp": "myApprover", + "nhs_contract_property_template_key": "14", + "nhs_date_approved": "20020202020202", + "nhs_date_dns_approved": "20020202020202", + "nhs_date_requested": "20020202020202", + "nhs_dns_approver": "myApprover", + "nhs_ep_interaction_type": "fhir", + "nhs_id_code": "BBB", + "nhs_mhs_ack_requested": "never", + "nhs_mhs_actor": null, + "nhs_mhs_cpa_id": "00000000000b", + "nhs_mhs_duplicate_elimination": "never", + "nhs_mhs_end_point": "https://test.C3O9X.nhs.uk/", + "nhs_mhs_fqdn": "test.C3O9X.nhs.uk", + "nhs_mhs_in": "READ_PRACTITIONER_ROLE_R4_V002", + "nhs_mhs_ip_address": "0.0.0.0", + "nhs_mhs_is_authenticated": "none", + "nhs_mhs_manufacturer_org": "LSP04", + "nhs_mhs_party_key": "BBB-111111", + "nhs_mhs_persist_duration": null, + "nhs_mhs_retries": null, + "nhs_mhs_retry_interval": null, + "nhs_mhs_service_description": null, + "nhs_mhs_sn": "urn:nhs:names:services:ers", + "nhs_mhs_svc_ia": "urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V002", + "nhs_mhs_sync_reply_mode": null, + "nhs_product_key": "111", + "nhs_product_name": "My BBB Product", + "nhs_product_version": "2005.02", + "nhs_requestor_urp": "myRequestor", + "object_class": "nhsMhs", + "unique_identifier": "00000000000b" + }, + { + "change_type": "add", + "description": null, + "nhs_approver_urp": "myApprover", + "nhs_as_acf": null, + "nhs_as_category_bag": null, + "nhs_as_client": ["BBB"], + "nhs_as_svc_ia": [ + "urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V002", + "urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V003", + "urn:nhs:names:services:pds:QUPA_IN040000UK03" + ], + "nhs_date_approved": "20030303030303", + "nhs_date_requested": "20030303030303", + "nhs_id_code": "BBB", + "nhs_mhs_manufacturer_org": "LSP04", + "nhs_mhs_party_key": "BBB-111111", + "nhs_product_key": "333", + "nhs_product_name": "My BBB Product", + "nhs_product_version": "2005.02", + "nhs_requestor_urp": "myRequestor", + "nhs_temp_uid": "333", + "object_class": "nhsAS", + "unique_identifier": "000000000003" + }, + { + "change_type": "add", + "nhs_approver_urp": "myApprover", + "nhs_contract_property_template_key": "14", + "nhs_date_approved": "20020202020202", + "nhs_date_dns_approved": "20020202020202", + "nhs_date_requested": "20020202020202", + "nhs_dns_approver": "myApprover", + "nhs_ep_interaction_type": "fhir", + "nhs_id_code": "BBB", + "nhs_mhs_ack_requested": "never", + "nhs_mhs_actor": null, + "nhs_mhs_cpa_id": "00000000000c", + "nhs_mhs_duplicate_elimination": "never", + "nhs_mhs_end_point": "https://test.C3O9X.nhs.uk/", + "nhs_mhs_fqdn": "test.C3O9X.nhs.uk", + "nhs_mhs_in": "READ_PRACTITIONER_ROLE_R4_V002", + "nhs_mhs_ip_address": "0.0.0.0", + "nhs_mhs_is_authenticated": "none", + "nhs_mhs_manufacturer_org": "LSP04", + "nhs_mhs_party_key": "BBB-111111", + "nhs_mhs_persist_duration": null, + "nhs_mhs_retries": null, + "nhs_mhs_retry_interval": null, + "nhs_mhs_service_description": null, + "nhs_mhs_sn": "urn:nhs:names:services:ers", + "nhs_mhs_svc_ia": "urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V003", + "nhs_mhs_sync_reply_mode": null, + "nhs_product_key": "111", + "nhs_product_name": "My BBB Product", + "nhs_product_version": "2005.02", + "nhs_requestor_urp": "myRequestor", + "object_class": "nhsMhs", + "unique_identifier": "00000000000c" + } + ] +] diff --git a/src/etl/sds/worker/bulk/extract_bulk/tests/test_extract_bulk_worker.py b/src/etl/sds/worker/bulk/extract_bulk/tests/test_extract_bulk_worker.py new file mode 100644 index 000000000..ad0e119be --- /dev/null +++ b/src/etl/sds/worker/bulk/extract_bulk/tests/test_extract_bulk_worker.py @@ -0,0 +1,85 @@ +import json +import os +from pathlib import Path +from typing import Callable +from unittest import mock + +import pytest +from etl_utils.constants import LDIF_RECORD_DELIMITER, WorkerKey +from etl_utils.io import EtlEncoder, pkl_dumps_lz4 +from etl_utils.io.test.io_utils import pkl_loads_lz4 +from etl_utils.worker.model import WorkerEnvironment +from event.json import json_load, json_loads +from moto import mock_aws +from mypy_boto3_s3 import S3Client + +BUCKET_NAME = "my-bucket" +PATH_TO_HERE = Path(__file__).parent + + +@pytest.fixture +def mock_s3_client(): + with mock_aws(), mock.patch.dict( + os.environ, + {"ETL_BUCKET": BUCKET_NAME, "AWS_DEFAULT_REGION": "us-east-1"}, + clear=True, + ): + from etl.sds.worker.bulk.extract_bulk import extract_bulk + + extract_bulk.ENVIRONMENT = WorkerEnvironment.build() + extract_bulk.S3_CLIENT.create_bucket(Bucket=BUCKET_NAME) + yield extract_bulk.S3_CLIENT + + +@pytest.fixture +def put_object(mock_s3_client: S3Client): + return lambda key, body: ( + mock_s3_client.put_object(Bucket=BUCKET_NAME, Key=key, Body=body) + ) + + +@pytest.fixture +def get_object(mock_s3_client: S3Client) -> bytes: + return lambda key: ( + mock_s3_client.get_object(Bucket=BUCKET_NAME, Key=key)["Body"].read() + ) + + +def _split_ldif(data: str) -> list[str]: + return list(filter(bool, data.split(LDIF_RECORD_DELIMITER))) + + +def test_extract_worker_pass( + put_object: Callable[[str], None], get_object: Callable[[str], bytes] +): + from etl.sds.worker.bulk.extract_bulk import extract_bulk + + # Initial state + with open(PATH_TO_HERE / "extract_bulk_input.ldif") as f: + input_data = f.read() + + put_object(key=WorkerKey.EXTRACT, body=input_data) + put_object(key=WorkerKey.TRANSFORM, body=pkl_dumps_lz4({})) + + # Execute the extract worker + response = extract_bulk.handler(event={}, context=None) + assert response == { + "stage_name": "extract", + "processed_records": 2, + "unprocessed_records": 0, + "error_message": None, + } + + # Final state + final_unprocessed_data: str = get_object(key=WorkerKey.EXTRACT).decode() + output_data = get_object(key=WorkerKey.TRANSFORM) + n_final_unprocessed = len(_split_ldif(final_unprocessed_data)) + + with open(PATH_TO_HERE / "extract_bulk_output.json") as f: + expected_output_data = json_load(f) + + assert ( + json_loads(json.dumps(pkl_loads_lz4(output_data), cls=EtlEncoder)) + == expected_output_data + ) + assert n_final_unprocessed == 0 diff --git a/src/etl/sds/worker/load_bulk/__init__.py b/src/etl/sds/worker/bulk/load_bulk/__init__.py similarity index 100% rename from src/etl/sds/worker/load_bulk/__init__.py rename to src/etl/sds/worker/bulk/load_bulk/__init__.py diff --git a/src/etl/sds/worker/load_bulk/load_bulk.py b/src/etl/sds/worker/bulk/load_bulk/load_bulk.py similarity index 97% rename from src/etl/sds/worker/load_bulk/load_bulk.py rename to src/etl/sds/worker/bulk/load_bulk/load_bulk.py index 6dbb63616..76f8d016d 100644 --- a/src/etl/sds/worker/load_bulk/load_bulk.py +++ b/src/etl/sds/worker/bulk/load_bulk/load_bulk.py @@ -56,7 +56,7 @@ def handler(event: dict, context): s3_input_path=worker_event.s3_input_path, s3_output_path=None, unprocessed_dumper=pkl_dump_lz4, - processed_dumper=CACHE.REPOSITORY.write_bulk, + processed_dumper=CACHE.REPOSITORY.write, max_records=max_records, ) return asdict(response) diff --git a/src/etl/sds/worker/load_bulk/make/make.py b/src/etl/sds/worker/bulk/load_bulk/make/make.py similarity index 100% rename from src/etl/sds/worker/load_bulk/make/make.py rename to src/etl/sds/worker/bulk/load_bulk/make/make.py diff --git a/src/etl/sds/worker/bulk/load_bulk/tests/test_load_bulk_worker.py b/src/etl/sds/worker/bulk/load_bulk/tests/test_load_bulk_worker.py new file mode 100644 index 000000000..534092da5 --- /dev/null +++ b/src/etl/sds/worker/bulk/load_bulk/tests/test_load_bulk_worker.py @@ -0,0 +1,195 @@ +import os +from collections import deque +from itertools import chain +from typing import Callable +from unittest import mock + +import pytest +from domain.core.cpm_product.v1 import CpmProduct +from domain.core.device.v1 import Device +from domain.core.device_reference_data.v1 import DeviceReferenceData +from domain.core.product_team.v1 import ProductTeam +from domain.repository.cpm_product_repository.v1 import CpmProductRepository +from domain.repository.device_reference_data_repository.v1 import ( + DeviceReferenceDataRepository, +) +from domain.repository.device_repository.v1 import DeviceRepository +from domain.repository.product_team_repository.v1 import ProductTeamRepository +from etl_utils.constants import WorkerKey +from etl_utils.io import pkl_dumps_lz4 +from etl_utils.io.test.io_utils import pkl_loads_lz4 +from event.json import json_load +from moto import mock_aws +from mypy_boto3_s3 import S3Client +from sds.epr.constants import AS_DEVICE_SUFFIX, MHS_DEVICE_SUFFIX + +from etl.sds.worker.bulk.transform_bulk.tests.test_transform_bulk_worker import ( + PATH_TO_HERE as PATH_TO_TRANSFORM_OUTPUT, +) +from test_helpers.dynamodb import mock_table + +BUCKET_NAME = "my-bucket" +TABLE_NAME = "my-table" + + +@pytest.fixture +def mock_s3_client(): + with mock_aws(), mock.patch.dict( + os.environ, + { + "ETL_BUCKET": BUCKET_NAME, + "TABLE_NAME": TABLE_NAME, + "AWS_DEFAULT_REGION": "us-east-1", + }, + clear=True, + ): + from etl.sds.worker.bulk.load_bulk import load_bulk + + load_bulk.CACHE.S3_CLIENT.create_bucket(Bucket=BUCKET_NAME) + yield load_bulk.CACHE.S3_CLIENT + + +@pytest.fixture +def put_object(mock_s3_client: S3Client): + return lambda key, body: ( + mock_s3_client.put_object(Bucket=BUCKET_NAME, Key=key, Body=body) + ) + + +@pytest.fixture +def get_object(mock_s3_client: S3Client) -> bytes: + return lambda key: ( + mock_s3_client.get_object(Bucket=BUCKET_NAME, Key=key)["Body"].read() + ) + + +def test_load_worker_pass( + put_object: Callable[[str], None], get_object: Callable[[str], bytes] +): + from etl.sds.worker.bulk.load_bulk import load_bulk + + # Initial state + with open(PATH_TO_TRANSFORM_OUTPUT / "transform_bulk_output.json") as f: + input_data = json_load(f) + + put_object( + key=WorkerKey.LOAD, + body=pkl_dumps_lz4(deque(input_data)), + ) + + # Execute the load worker + with mock_table(TABLE_NAME) as dynamodb_client: + load_bulk.CACHE.REPOSITORY.client = dynamodb_client + response = load_bulk.handler( + event={"s3_input_path": f"s3://{BUCKET_NAME}/{WorkerKey.LOAD}"}, + context=None, + ) + + assert response == { + "stage_name": "load", + "processed_records": len(input_data), + "unprocessed_records": 0, + "error_message": None, + } + + # Final state + final_unprocessed_data = pkl_loads_lz4(get_object(key=WorkerKey.LOAD)) + assert final_unprocessed_data == deque([]) + + product_team_repo = ProductTeamRepository( + table_name=TABLE_NAME, dynamodb_client=dynamodb_client + ) + (product_team,) = product_team_repo.search() + product_team_by_key = product_team_repo.read("EPR-LSP04") + product_team_by_id = product_team_repo.read( + "LSP04.ec12b569-ea31-4c28-9559-17d533c78441" + ) + assert product_team == product_team_by_key + assert product_team == product_team_by_id + + product_repo = CpmProductRepository( + table_name=TABLE_NAME, dynamodb_client=dynamodb_client + ) + products = product_repo.search(product_team_id=product_team.id) + product_team_a = product_repo.read( + product_team_id=product_team.id, id="AAA-111111" + ) + product_team_b = product_repo.read( + product_team_id=product_team.id, id="BBB-111111" + ) + assert products == [product_team_b, product_team_a] + + device_repo = DeviceRepository( + table_name=TABLE_NAME, dynamodb_client=dynamodb_client + ) + devices = list( + chain.from_iterable( + device_repo.search( + product_team_id=product_team.id, product_id=product.id + ) + for product in products + ) + ) + mhs_devices = [d for d in devices if d.name.endswith(MHS_DEVICE_SUFFIX)] + as_devices = [d for d in devices if d.name.endswith(AS_DEVICE_SUFFIX)] + assert len(devices) == len(mhs_devices) + len(as_devices) + assert len(mhs_devices) == 2 + assert len(as_devices) == 4 + + device_ref_data_repo = DeviceReferenceDataRepository( + table_name=TABLE_NAME, dynamodb_client=dynamodb_client + ) + device_ref_datas = list( + chain.from_iterable( + device_ref_data_repo.search( + product_team_id=product_team.id, product_id=product.id + ) + for product in products + ) + ) + assert len(device_ref_datas) == 4 + + (input_product_team,) = ( + ProductTeam(**data) + for item in input_data + for object_type_name, data in item.items() + if object_type_name == "ProductTeam" + ) + + input_products = sorted( + ( + CpmProduct(**data) + for item in input_data + for object_type_name, data in item.items() + if object_type_name == "CpmProduct" + ), + key=lambda product: str(product.id), + ) + + input_devices = sorted( + ( + Device(**data) + for item in input_data + for object_type_name, data in item.items() + if object_type_name == "Device" + ), + key=lambda device: (str(device.product_id), device.id), + ) + + input_device_ref_data = sorted( + ( + DeviceReferenceData(**data) + for item in input_data + for object_type_name, data in item.items() + if object_type_name == "DeviceReferenceData" + ), + key=lambda device_ref_data: ( + str(device_ref_data.product_id), + device_ref_data.id, + ), + ) + + assert product_team == input_product_team + assert products == input_products + assert devices == input_devices + assert device_ref_datas == input_device_ref_data diff --git a/src/etl/sds/worker/load_bulk_reduce/load_bulk_reduce.py b/src/etl/sds/worker/bulk/load_bulk_reduce/load_bulk_reduce.py similarity index 100% rename from src/etl/sds/worker/load_bulk_reduce/load_bulk_reduce.py rename to src/etl/sds/worker/bulk/load_bulk_reduce/load_bulk_reduce.py diff --git a/src/etl/sds/worker/load_bulk_reduce/make/make.py b/src/etl/sds/worker/bulk/load_bulk_reduce/make/make.py similarity index 100% rename from src/etl/sds/worker/load_bulk_reduce/make/make.py rename to src/etl/sds/worker/bulk/load_bulk_reduce/make/make.py diff --git a/src/etl/sds/worker/load_bulk_reduce/tests/_test_load_bulk_reduce.py b/src/etl/sds/worker/bulk/load_bulk_reduce/tests/_test_load_bulk_reduce.py similarity index 100% rename from src/etl/sds/worker/load_bulk_reduce/tests/_test_load_bulk_reduce.py rename to src/etl/sds/worker/bulk/load_bulk_reduce/tests/_test_load_bulk_reduce.py diff --git a/src/etl/sds/worker/load_update/__init__.py b/src/etl/sds/worker/bulk/transform_bulk/__init__.py similarity index 100% rename from src/etl/sds/worker/load_update/__init__.py rename to src/etl/sds/worker/bulk/transform_bulk/__init__.py diff --git a/src/etl/sds/worker/load_update/make/make.py b/src/etl/sds/worker/bulk/transform_bulk/make/make.py similarity index 100% rename from src/etl/sds/worker/load_update/make/make.py rename to src/etl/sds/worker/bulk/transform_bulk/make/make.py diff --git a/src/etl/sds/worker/bulk/transform_bulk/tests/test_transform_bulk_worker.py b/src/etl/sds/worker/bulk/transform_bulk/tests/test_transform_bulk_worker.py new file mode 100644 index 000000000..0cf8dc514 --- /dev/null +++ b/src/etl/sds/worker/bulk/transform_bulk/tests/test_transform_bulk_worker.py @@ -0,0 +1,176 @@ +import os +from collections import deque +from pathlib import Path +from typing import Callable +from unittest import mock + +import pytest +from etl_utils.constants import WorkerKey +from etl_utils.io import pkl_dumps_lz4 +from etl_utils.io.test.io_utils import pkl_loads_lz4 +from event.json import json_load +from moto import mock_aws +from mypy_boto3_s3 import S3Client + +from etl.sds.worker.bulk.extract_bulk.tests.test_extract_bulk_worker import ( + PATH_TO_HERE as PATH_TO_EXTRACT_OUTPUT, +) +from test_helpers.dynamodb import mock_table +from test_helpers.uuid import consistent_uuid + +BUCKET_NAME = "my-bucket" +TABLE_NAME = "my-table" +PATH_TO_HERE = Path(__file__).parent + + +@pytest.fixture +def mock_s3_client(): + with mock_aws(), mock_table(TABLE_NAME) as dynamodb_client, mock.patch.dict( + os.environ, + { + "ETL_BUCKET": BUCKET_NAME, + "TABLE_NAME": TABLE_NAME, + "AWS_DEFAULT_REGION": "us-east-1", + }, + clear=True, + ): + from etl.sds.worker.bulk.transform_bulk import transform_bulk + + transform_bulk.EPR_PRODUCT_TEAM_REPOSITORY.client = dynamodb_client + transform_bulk.S3_CLIENT.create_bucket(Bucket=BUCKET_NAME) + yield transform_bulk.S3_CLIENT + + +@pytest.fixture +def put_object(mock_s3_client: S3Client): + return lambda key, body: ( + mock_s3_client.put_object(Bucket=BUCKET_NAME, Key=key, Body=body) + ) + + +@pytest.fixture +def get_object(mock_s3_client: S3Client) -> bytes: + return lambda key: ( + mock_s3_client.get_object(Bucket=BUCKET_NAME, Key=key)["Body"].read() + ) + + +class ConsistentUUID: + def __init__(self): + self.seed = 1 + + def create(self): + uuid = consistent_uuid(self.seed) + self.seed += 1 + return uuid + + +class MockId: + def __init__(self, id): + self.id = id + + +def _assert_domain_object_equal(a, b): + for k in a.keys() - { + "id", + "product_id", + "product_team_id", + "device_reference_data", + "questionnaire_responses", + "created_on", + "updated_on", + "deleted_on", + "tags", + }: + assert a[k] == b[k], f"{k}: {a[k]} not same as {b[k]}" + + +def test_transform_worker( + put_object: Callable[[str], None], + get_object: Callable[[str], bytes], +): + + from etl.sds.worker.bulk.transform_bulk import transform_bulk + + initial_processed_data = deque([]) + + # Initial state + with open(PATH_TO_EXTRACT_OUTPUT / "extract_bulk_output.json") as f: + input_data = deque(json_load(f)) + + put_object(key=WorkerKey.TRANSFORM, body=pkl_dumps_lz4(input_data)) + put_object(key=WorkerKey.LOAD, body=pkl_dumps_lz4(initial_processed_data)) + + # Execute the transform worker + response = transform_bulk.handler(event={}, context=None) + + assert response == [ + { + "stage_name": "transform", + "processed_records": 13, + "unprocessed_records": 0, + "s3_input_path": "s3://my-bucket/input--load/unprocessed.0", + "error_message": None, + } + ] + + # Final state + final_unprocessed_data: str = get_object(key=WorkerKey.TRANSFORM) + output_data: str = get_object(key=f"{WorkerKey.LOAD}.0") + n_final_unprocessed = len(pkl_loads_lz4(final_unprocessed_data)) + + assert n_final_unprocessed == 0 + + with open(PATH_TO_HERE / "transform_bulk_output.json") as f: + template_output: list[dict[str, dict]] = json_load(f) + + created_objects: list[dict[str, dict]] = pkl_loads_lz4(output_data) + + assert len(created_objects) == len(template_output) + assert all(a.keys() == b.keys() for a, b in zip(created_objects, template_output)) + + current_product_id = None + device_reference_data_ids = None + for nested_created_item, nested_template_item in zip( + created_objects, template_output + ): + (object_type_name,) = nested_created_item.keys() + + created_item = nested_created_item[object_type_name] + template_item = nested_template_item[object_type_name] + + if object_type_name == "CpmProduct": + current_product_id = created_item["id"] + device_reference_data_ids = [] + elif object_type_name == "DeviceReferenceData": + device_reference_data_ids.append(created_item["id"]) + + # Verify that the product id is internally consistent + if created_item.get("product_id"): + assert created_item.get("product_id") == current_product_id + + # Verify that the device reference data ids are internally consistent + if created_item.get("device_reference_data"): + assert all( + id in device_reference_data_ids + for id in created_item.get("device_reference_data") + ) + + # Check that this object is broadly consistent with the expectation + assert created_item.keys() == template_item.keys() + _assert_domain_object_equal(created_item, template_item) + + # Check that any questionnaire responses are broadly consistent with the expectation + if "questionnaire_responses" in created_item: + created_questionnaire_response = created_item["questionnaire_responses"] + template_questionnaire_response = template_item["questionnaire_responses"] + assert ( + template_questionnaire_response.keys() + == created_questionnaire_response.keys() + ) + for questionnaire_name in template_questionnaire_response: + for created_response, template_response in zip( + created_questionnaire_response[questionnaire_name], + template_questionnaire_response[questionnaire_name], + ): + _assert_domain_object_equal(created_response, template_response) diff --git a/src/etl/sds/worker/bulk/transform_bulk/tests/transform_bulk_output.json b/src/etl/sds/worker/bulk/transform_bulk/tests/transform_bulk_output.json new file mode 100644 index 000000000..c95e6f701 --- /dev/null +++ b/src/etl/sds/worker/bulk/transform_bulk/tests/transform_bulk_output.json @@ -0,0 +1,941 @@ +[ + { + "ProductTeam": { + "id": "LSP04.ec12b569-ea31-4c28-9559-17d533c78441", + "name": "LSP04 (EPR)", + "ods_code": "LSP04", + "status": "active", + "created_on": "2024-11-20T10:54:19.001479+00:00", + "updated_on": null, + "deleted_on": null, + "keys": [{ "key_type": "epr_id", "key_value": "EPR-LSP04" }] + } + }, + { + "CpmProduct": { + "id": "P.VNT-MLW", + "product_team_id": "LSP04.ec12b569-ea31-4c28-9559-17d533c78441", + "name": "My AAA Product", + "ods_code": "LSP04", + "status": "active", + "created_on": "2024-11-20T10:54:19.001733+00:00", + "updated_on": "2024-11-20T10:54:19.002057+00:00", + "deleted_on": null, + "keys": [{ "key_type": "party_key", "key_value": "AAA-111111" }] + } + }, + { + "DeviceReferenceData": { + "id": "3dfd9f0b-0cf3-4c3d-92cd-e379adfaecf6", + "name": "AAA-111111 - MHS Message Sets", + "product_id": "P.VNT-MLW", + "product_team_id": "LSP04.ec12b569-ea31-4c28-9559-17d533c78441", + "ods_code": "LSP04", + "questionnaire_responses": { + "spine_mhs_message_sets/1": [ + { + "id": "5e4cab5e-bcbd-47f6-afa2-7a2b86853ca5", + "questionnaire_name": "spine_mhs_message_sets", + "questionnaire_version": "1", + "data": { + "MHS IN": "READ_PRACTITIONER_ROLE_R4_V001", + "MHS SN": "urn:nhs:names:services:ers", + "Interaction ID": "urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V001" + }, + "created_on": "2024-11-20T10:54:19.000224+00:00" + } + ] + }, + "created_on": "2024-11-20T10:54:19.002130+00:00", + "updated_on": "2024-11-20T10:54:19.002249+00:00", + "deleted_on": null + } + }, + { + "DeviceReferenceData": { + "id": "d3d0e6b8-8277-4849-8e8d-5cb035af5536", + "name": "AAA-111111 - AS Additional Interactions", + "product_id": "P.VNT-MLW", + "product_team_id": "LSP04.ec12b569-ea31-4c28-9559-17d533c78441", + "ods_code": "LSP04", + "questionnaire_responses": { + "spine_as_additional_interactions/1": [ + { + "id": "170711a5-2e77-453b-a04f-4723cf8e9f87", + "questionnaire_name": "spine_as_additional_interactions", + "questionnaire_version": "1", + "data": { + "Interaction ID": "urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V001" + }, + "created_on": "2024-11-20T10:54:19.000557+00:00" + }, + { + "id": "cb431ce5-5de2-42cd-9430-2e0b0ad91639", + "questionnaire_name": "spine_as_additional_interactions", + "questionnaire_version": "1", + "data": { + "Interaction ID": "urn:nhs:names:services:pds:QUPA_IN040000UK01" + }, + "created_on": "2024-11-20T10:54:19.000872+00:00" + }, + { + "id": "36fa5061-f9b7-4980-8cf7-abafdb25f670", + "questionnaire_name": "spine_as_additional_interactions", + "questionnaire_version": "1", + "data": { + "Interaction ID": "urn:nhs:names:services:pds:QUPA_IN040000UK04" + }, + "created_on": "2024-11-20T10:54:19.001186+00:00" + } + ] + }, + "created_on": "2024-11-20T10:54:19.003795+00:00", + "updated_on": "2024-11-20T10:54:19.004000+00:00", + "deleted_on": null + } + }, + { + "Device": { + "id": "192133bc-7077-4163-9387-9856969dd412", + "name": "AAA-111111 - Message Handling System", + "status": "active", + "product_id": "P.VNT-MLW", + "product_team_id": "LSP04.ec12b569-ea31-4c28-9559-17d533c78441", + "ods_code": "LSP04", + "created_on": "2024-11-20T10:54:19.002315+00:00", + "updated_on": "2024-11-20T10:54:19.003736+00:00", + "deleted_on": null, + "keys": [{ "key_type": "cpa_id", "key_value": "00000000000a" }], + "tags": [ + [["nhs_mhs_party_key", "aaa-111111"]], + [ + [ + "nhs_mhs_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v001" + ] + ], + [["nhs_id_code", "aaa"]], + [ + ["nhs_id_code", "aaa"], + ["nhs_mhs_party_key", "aaa-111111"] + ], + [ + ["nhs_mhs_party_key", "aaa-111111"], + [ + "nhs_mhs_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v001" + ] + ], + [ + ["nhs_id_code", "aaa"], + ["nhs_mhs_party_key", "aaa-111111"], + [ + "nhs_mhs_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v001" + ] + ], + [ + ["nhs_id_code", "aaa"], + [ + "nhs_mhs_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v001" + ] + ] + ], + "questionnaire_responses": { + "spine_mhs/1": [ + { + "id": "277d020e-0389-4113-b44f-f659d2b0ee59", + "questionnaire_name": "spine_mhs", + "questionnaire_version": "1", + "data": { + "Approver URP": "myApprover", + "Contract Property Template Key": "14", + "Date Approved": "20010101010101", + "Date DNS Approved": "20010101010101", + "Date Requested": "20010101010101", + "DNS Approver": "myApprover", + "Interaction Type": "fhir", + "Managing Organization": "AAA", + "Reliability Configuration Ack Requested": "never", + "MHS CPA ID": "00000000000a", + "Reliability Configuration Duplication Elimination": "never", + "Address": "https://test.C3O9X.nhs.uk/", + "MHS FQDN": "test.C3O9X.nhs.uk", + "MHS IP Address": "0.0.0.0", + "MHS Is Authenticated": "none", + "MHS Manufacturer Organisation": "LSP04", + "MHS Party key": "AAA-111111", + "Product Key": "111", + "Product Name": "My AAA Product", + "Product Version": "2005.02", + "Requestor URP": "myRequestor", + "Unique Identifier": "00000000000a" + }, + "created_on": "2024-11-20T10:54:18.995983+00:00" + } + ] + }, + "device_reference_data": { "3dfd9f0b-0cf3-4c3d-92cd-e379adfaecf6": ["*"] } + } + }, + { + "Device": { + "id": "44d37e03-1bba-4300-970e-f083604917c0", + "name": "AAA-111111/000000000001 - Accredited System", + "status": "active", + "product_id": "P.VNT-MLW", + "product_team_id": "LSP04.ec12b569-ea31-4c28-9559-17d533c78441", + "ods_code": "LSP04", + "created_on": "2024-11-20T10:54:19.004056+00:00", + "updated_on": "2024-11-20T10:54:19.005653+00:00", + "deleted_on": null, + "keys": [ + { "key_type": "accredited_system_id", "key_value": "000000000001" } + ], + "tags": [ + [ + ["nhs_as_client", "aaa"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk04"], + ["nhs_mhs_party_key", "aaa-111111"] + ], + [ + ["nhs_as_client", "aaa"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v001" + ], + ["nhs_mhs_party_key", "aaa-111111"] + ], + [ + ["nhs_as_client", "aaa"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk01"], + ["nhs_mhs_manufacturer_org", "lsp04"] + ], + [ + ["nhs_as_client", "aaa"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk01"] + ], + [ + ["nhs_as_client", "aaa"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk01"], + ["nhs_mhs_party_key", "aaa-111111"] + ], + [ + ["nhs_as_client", "aaa"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v001" + ] + ], + [ + ["nhs_as_client", "aaa"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v001" + ], + ["nhs_mhs_manufacturer_org", "lsp04"], + ["nhs_mhs_party_key", "aaa-111111"] + ], + [ + ["nhs_as_client", "aaa"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk04"], + ["nhs_mhs_manufacturer_org", "lsp04"], + ["nhs_mhs_party_key", "aaa-111111"] + ], + [ + ["nhs_as_client", "aaa"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk01"], + ["nhs_mhs_manufacturer_org", "lsp04"], + ["nhs_mhs_party_key", "aaa-111111"] + ], + [ + ["nhs_as_client", "aaa"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk04"], + ["nhs_mhs_manufacturer_org", "lsp04"] + ], + [ + ["nhs_as_client", "aaa"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v001" + ], + ["nhs_mhs_manufacturer_org", "lsp04"] + ], + [ + ["nhs_as_client", "aaa"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk04"] + ] + ], + "questionnaire_responses": { + "spine_as/1": [ + { + "id": "8e3f6c89-8d86-4827-9168-1866b141d9f3", + "questionnaire_name": "spine_as", + "questionnaire_version": "1", + "data": { + "Approver URP": "myApprover", + "Client ODS Codes": ["AAA"], + "Date Approved": "20010101010101", + "Date Requested": "20010101010101", + "ODS Code": "AAA", + "MHS Manufacturer Organisation": "LSP04", + "Party Key": "AAA-111111", + "Product Key": "111", + "Product Name": "My AAA Product", + "Product Version": "2005.02", + "Requestor URP": "myRequestor", + "Temp UID": "111", + "ASID": "000000000001" + }, + "created_on": "2024-11-20T10:54:18.997768+00:00" + } + ] + }, + "device_reference_data": { + "3dfd9f0b-0cf3-4c3d-92cd-e379adfaecf6": ["*.Interaction ID"], + "d3d0e6b8-8277-4849-8e8d-5cb035af5536": ["*.Interaction ID"] + } + } + }, + { + "Device": { + "id": "4ec9c8a8-37ff-4085-bf3f-e93e9e3ac23e", + "name": "AAA-111111/000000000004 - Accredited System", + "status": "active", + "product_id": "P.VNT-MLW", + "product_team_id": "LSP04.ec12b569-ea31-4c28-9559-17d533c78441", + "ods_code": "LSP04", + "created_on": "2024-11-20T10:54:19.005712+00:00", + "updated_on": "2024-11-20T10:54:19.007286+00:00", + "deleted_on": null, + "keys": [ + { "key_type": "accredited_system_id", "key_value": "000000000004" } + ], + "tags": [ + [ + ["nhs_as_client", "aaa"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk04"], + ["nhs_mhs_party_key", "aaa-111111"] + ], + [ + ["nhs_as_client", "aaa"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v001" + ], + ["nhs_mhs_party_key", "aaa-111111"] + ], + [ + ["nhs_as_client", "aaa"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk01"], + ["nhs_mhs_manufacturer_org", "lsp04"] + ], + [ + ["nhs_as_client", "aaa"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk01"] + ], + [ + ["nhs_as_client", "aaa"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk01"], + ["nhs_mhs_party_key", "aaa-111111"] + ], + [ + ["nhs_as_client", "aaa"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v001" + ] + ], + [ + ["nhs_as_client", "aaa"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v001" + ], + ["nhs_mhs_manufacturer_org", "lsp04"], + ["nhs_mhs_party_key", "aaa-111111"] + ], + [ + ["nhs_as_client", "aaa"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk04"], + ["nhs_mhs_manufacturer_org", "lsp04"], + ["nhs_mhs_party_key", "aaa-111111"] + ], + [ + ["nhs_as_client", "aaa"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk01"], + ["nhs_mhs_manufacturer_org", "lsp04"], + ["nhs_mhs_party_key", "aaa-111111"] + ], + [ + ["nhs_as_client", "aaa"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk04"], + ["nhs_mhs_manufacturer_org", "lsp04"] + ], + [ + ["nhs_as_client", "aaa"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v001" + ], + ["nhs_mhs_manufacturer_org", "lsp04"] + ], + [ + ["nhs_as_client", "aaa"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk04"] + ] + ], + "questionnaire_responses": { + "spine_as/1": [ + { + "id": "b9669830-a25d-41ce-9aa4-0f41a65aaf64", + "questionnaire_name": "spine_as", + "questionnaire_version": "1", + "data": { + "Approver URP": "myApprover", + "Client ODS Codes": ["AAA"], + "Date Approved": "20040404040404", + "Date Requested": "20040404040404", + "ODS Code": "AAA", + "MHS Manufacturer Organisation": "LSP04", + "Party Key": "AAA-111111", + "Product Key": "444", + "Product Name": "My AAA Product", + "Product Version": "2005.02", + "Requestor URP": "myRequestor", + "Temp UID": "444", + "ASID": "000000000004" + }, + "created_on": "2024-11-20T10:54:18.999535+00:00" + } + ] + }, + "device_reference_data": { + "3dfd9f0b-0cf3-4c3d-92cd-e379adfaecf6": ["*.Interaction ID"], + "d3d0e6b8-8277-4849-8e8d-5cb035af5536": ["*.Interaction ID"] + } + } + }, + { + "CpmProduct": { + "id": "P.LU6-Y7D", + "product_team_id": "LSP04.ec12b569-ea31-4c28-9559-17d533c78441", + "name": "My BBB Product", + "ods_code": "LSP04", + "status": "active", + "created_on": "2024-11-20T10:54:19.017417+00:00", + "updated_on": "2024-11-20T10:54:19.017614+00:00", + "deleted_on": null, + "keys": [{ "key_type": "party_key", "key_value": "BBB-111111" }] + } + }, + { + "DeviceReferenceData": { + "id": "b362b7c1-317c-4e1f-86e4-d8cbd0f9524e", + "name": "BBB-111111 - MHS Message Sets", + "product_id": "P.LU6-Y7D", + "product_team_id": "LSP04.ec12b569-ea31-4c28-9559-17d533c78441", + "ods_code": "LSP04", + "questionnaire_responses": { + "spine_mhs_message_sets/1": [ + { + "id": "719577ca-919e-486d-b0d1-e5e06388051c", + "questionnaire_name": "spine_mhs_message_sets", + "questionnaire_version": "1", + "data": { + "MHS IN": "READ_PRACTITIONER_ROLE_R4_V002", + "MHS SN": "urn:nhs:names:services:ers", + "Interaction ID": "urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V002" + }, + "created_on": "2024-11-20T10:54:19.015021+00:00" + }, + { + "id": "417fe8ec-2fc1-4a38-80fc-7cb0ab5245e9", + "questionnaire_name": "spine_mhs_message_sets", + "questionnaire_version": "1", + "data": { + "MHS IN": "READ_PRACTITIONER_ROLE_R4_V002", + "MHS SN": "urn:nhs:names:services:ers", + "Interaction ID": "urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V003" + }, + "created_on": "2024-11-20T10:54:19.015692+00:00" + } + ] + }, + "created_on": "2024-11-20T10:54:19.017673+00:00", + "updated_on": "2024-11-20T10:54:19.017837+00:00", + "deleted_on": null + } + }, + { + "DeviceReferenceData": { + "id": "b4186163-23ec-41fa-abc2-5fe8b0fe9cab", + "name": "BBB-111111 - AS Additional Interactions", + "product_id": "P.LU6-Y7D", + "product_team_id": "LSP04.ec12b569-ea31-4c28-9559-17d533c78441", + "ods_code": "LSP04", + "questionnaire_responses": { + "spine_as_additional_interactions/1": [ + { + "id": "4fc814dd-6309-4c8f-92ac-ef450292c058", + "questionnaire_name": "spine_as_additional_interactions", + "questionnaire_version": "1", + "data": { + "Interaction ID": "urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V002" + }, + "created_on": "2024-11-20T10:54:19.016015+00:00" + }, + { + "id": "048c60e7-f664-4b02-b1ad-78460c47ba17", + "questionnaire_name": "spine_as_additional_interactions", + "questionnaire_version": "1", + "data": { + "Interaction ID": "urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V003" + }, + "created_on": "2024-11-20T10:54:19.016328+00:00" + }, + { + "id": "f43844e2-0514-4140-a110-b3cb400e6d9d", + "questionnaire_name": "spine_as_additional_interactions", + "questionnaire_version": "1", + "data": { + "Interaction ID": "urn:nhs:names:services:pds:QUPA_IN040000UK02" + }, + "created_on": "2024-11-20T10:54:19.016638+00:00" + }, + { + "id": "4a379a26-65b0-4d2d-9fef-9a92b92df293", + "questionnaire_name": "spine_as_additional_interactions", + "questionnaire_version": "1", + "data": { + "Interaction ID": "urn:nhs:names:services:pds:QUPA_IN040000UK03" + }, + "created_on": "2024-11-20T10:54:19.016947+00:00" + } + ] + }, + "created_on": "2024-11-20T10:54:19.019795+00:00", + "updated_on": "2024-11-20T10:54:19.020066+00:00", + "deleted_on": null + } + }, + { + "Device": { + "id": "a5c17f6a-560d-4553-af40-9095ab84e807", + "name": "BBB-111111 - Message Handling System", + "status": "active", + "product_id": "P.LU6-Y7D", + "product_team_id": "LSP04.ec12b569-ea31-4c28-9559-17d533c78441", + "ods_code": "LSP04", + "created_on": "2024-11-20T10:54:19.017931+00:00", + "updated_on": "2024-11-20T10:54:19.019737+00:00", + "deleted_on": null, + "keys": [ + { "key_type": "cpa_id", "key_value": "00000000000b" }, + { "key_type": "cpa_id", "key_value": "00000000000c" } + ], + "tags": [ + [ + ["nhs_id_code", "bbb"], + ["nhs_mhs_party_key", "bbb-111111"], + [ + "nhs_mhs_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v003" + ] + ], + [ + ["nhs_mhs_party_key", "bbb-111111"], + [ + "nhs_mhs_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v002" + ] + ], + [ + ["nhs_mhs_party_key", "bbb-111111"], + [ + "nhs_mhs_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v003" + ] + ], + [ + ["nhs_id_code", "bbb"], + [ + "nhs_mhs_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v002" + ] + ], + [ + [ + "nhs_mhs_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v002" + ] + ], + [ + ["nhs_id_code", "bbb"], + [ + "nhs_mhs_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v003" + ] + ], + [ + [ + "nhs_mhs_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v003" + ] + ], + [ + ["nhs_id_code", "bbb"], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_id_code", "bbb"], + ["nhs_mhs_party_key", "bbb-111111"], + [ + "nhs_mhs_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v002" + ] + ], + [["nhs_mhs_party_key", "bbb-111111"]], + [["nhs_id_code", "bbb"]] + ], + "questionnaire_responses": { + "spine_mhs/1": [ + { + "id": "2dfdf37f-53e6-43be-9a96-9430e24604ac", + "questionnaire_name": "spine_mhs", + "questionnaire_version": "1", + "data": { + "Approver URP": "myApprover", + "Contract Property Template Key": "14", + "Date Approved": "20020202020202", + "Date DNS Approved": "20020202020202", + "Date Requested": "20020202020202", + "DNS Approver": "myApprover", + "Interaction Type": "fhir", + "Managing Organization": "BBB", + "Reliability Configuration Ack Requested": "never", + "MHS CPA ID": "00000000000b", + "Reliability Configuration Duplication Elimination": "never", + "Address": "https://test.C3O9X.nhs.uk/", + "MHS FQDN": "test.C3O9X.nhs.uk", + "MHS IP Address": "0.0.0.0", + "MHS Is Authenticated": "none", + "MHS Manufacturer Organisation": "LSP04", + "MHS Party key": "BBB-111111", + "Product Key": "111", + "Product Name": "My BBB Product", + "Product Version": "2005.02", + "Requestor URP": "myRequestor", + "Unique Identifier": "00000000000b" + }, + "created_on": "2024-11-20T10:54:19.010898+00:00" + } + ] + }, + "device_reference_data": { "b362b7c1-317c-4e1f-86e4-d8cbd0f9524e": ["*"] } + } + }, + { + "Device": { + "id": "6924122e-86da-45b3-9c2e-b6a3609b1ed9", + "name": "BBB-111111/000000000002 - Accredited System", + "status": "active", + "product_id": "P.LU6-Y7D", + "product_team_id": "LSP04.ec12b569-ea31-4c28-9559-17d533c78441", + "ods_code": "LSP04", + "created_on": "2024-11-20T10:54:19.020121+00:00", + "updated_on": "2024-11-20T10:54:19.021912+00:00", + "deleted_on": null, + "keys": [ + { "key_type": "accredited_system_id", "key_value": "000000000002" } + ], + "tags": [ + [ + ["nhs_as_client", "bbb"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v003" + ], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_as_client", "bbb"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk02"] + ], + [ + ["nhs_as_client", "bbb"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk03"] + ], + [ + ["nhs_as_client", "bbb"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk02"], + ["nhs_mhs_manufacturer_org", "lsp04"] + ], + [ + ["nhs_as_client", "bbb"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v002" + ], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_as_client", "bbb"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk03"], + ["nhs_mhs_manufacturer_org", "lsp04"], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_as_client", "bbb"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v003" + ], + ["nhs_mhs_manufacturer_org", "lsp04"] + ], + [ + ["nhs_as_client", "bbb"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v002" + ] + ], + [ + ["nhs_as_client", "bbb"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk02"], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_as_client", "bbb"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk03"], + ["nhs_mhs_manufacturer_org", "lsp04"] + ], + [ + ["nhs_as_client", "bbb"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk02"], + ["nhs_mhs_manufacturer_org", "lsp04"], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_as_client", "bbb"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v003" + ], + ["nhs_mhs_manufacturer_org", "lsp04"], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_as_client", "bbb"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v002" + ], + ["nhs_mhs_manufacturer_org", "lsp04"], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_as_client", "bbb"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v003" + ] + ], + [ + ["nhs_as_client", "bbb"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk03"], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_as_client", "bbb"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v002" + ], + ["nhs_mhs_manufacturer_org", "lsp04"] + ] + ], + "questionnaire_responses": { + "spine_as/1": [ + { + "id": "c793754b-1eb9-49d3-99dc-a57e5d2589b1", + "questionnaire_name": "spine_as", + "questionnaire_version": "1", + "data": { + "Approver URP": "myApprover", + "Client ODS Codes": ["BBB"], + "Date Approved": "20020202020202", + "Date Requested": "20020202020202", + "ODS Code": "BBB", + "MHS Manufacturer Organisation": "LSP04", + "Party Key": "BBB-111111", + "Product Key": "222", + "Product Name": "My BBB Product", + "Product Version": "2005.02", + "Requestor URP": "myRequestor", + "Temp UID": "222", + "ASID": "000000000002" + }, + "created_on": "2024-11-20T10:54:19.012568+00:00" + } + ] + }, + "device_reference_data": { + "b362b7c1-317c-4e1f-86e4-d8cbd0f9524e": ["*.Interaction ID"], + "b4186163-23ec-41fa-abc2-5fe8b0fe9cab": ["*.Interaction ID"] + } + } + }, + { + "Device": { + "id": "22129a26-6179-421b-9822-e375408b0d08", + "name": "BBB-111111/000000000003 - Accredited System", + "status": "active", + "product_id": "P.LU6-Y7D", + "product_team_id": "LSP04.ec12b569-ea31-4c28-9559-17d533c78441", + "ods_code": "LSP04", + "created_on": "2024-11-20T10:54:19.021975+00:00", + "updated_on": "2024-11-20T10:54:19.023792+00:00", + "deleted_on": null, + "keys": [ + { "key_type": "accredited_system_id", "key_value": "000000000003" } + ], + "tags": [ + [ + ["nhs_as_client", "bbb"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v003" + ], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_as_client", "bbb"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk02"] + ], + [ + ["nhs_as_client", "bbb"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk03"] + ], + [ + ["nhs_as_client", "bbb"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk02"], + ["nhs_mhs_manufacturer_org", "lsp04"] + ], + [ + ["nhs_as_client", "bbb"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v002" + ], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_as_client", "bbb"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk03"], + ["nhs_mhs_manufacturer_org", "lsp04"], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_as_client", "bbb"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v003" + ], + ["nhs_mhs_manufacturer_org", "lsp04"] + ], + [ + ["nhs_as_client", "bbb"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v002" + ] + ], + [ + ["nhs_as_client", "bbb"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk02"], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_as_client", "bbb"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk03"], + ["nhs_mhs_manufacturer_org", "lsp04"] + ], + [ + ["nhs_as_client", "bbb"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk02"], + ["nhs_mhs_manufacturer_org", "lsp04"], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_as_client", "bbb"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v003" + ], + ["nhs_mhs_manufacturer_org", "lsp04"], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_as_client", "bbb"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v002" + ], + ["nhs_mhs_manufacturer_org", "lsp04"], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_as_client", "bbb"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v003" + ] + ], + [ + ["nhs_as_client", "bbb"], + ["nhs_as_svc_ia", "urn:nhs:names:services:pds:qupa_in040000uk03"], + ["nhs_mhs_party_key", "bbb-111111"] + ], + [ + ["nhs_as_client", "bbb"], + [ + "nhs_as_svc_ia", + "urn:nhs:names:services:ers:read_practitioner_role_r4_v002" + ], + ["nhs_mhs_manufacturer_org", "lsp04"] + ] + ], + "questionnaire_responses": { + "spine_as/1": [ + { + "id": "1aa1c3db-a8e2-4509-b69f-9627baa4dcf7", + "questionnaire_name": "spine_as", + "questionnaire_version": "1", + "data": { + "Approver URP": "myApprover", + "Client ODS Codes": ["BBB"], + "Date Approved": "20030303030303", + "Date Requested": "20030303030303", + "ODS Code": "BBB", + "MHS Manufacturer Organisation": "LSP04", + "Party Key": "BBB-111111", + "Product Key": "333", + "Product Name": "My BBB Product", + "Product Version": "2005.02", + "Requestor URP": "myRequestor", + "Temp UID": "333", + "ASID": "000000000003" + }, + "created_on": "2024-11-20T10:54:19.014329+00:00" + } + ] + }, + "device_reference_data": { + "b362b7c1-317c-4e1f-86e4-d8cbd0f9524e": ["*.Interaction ID"], + "b4186163-23ec-41fa-abc2-5fe8b0fe9cab": ["*.Interaction ID"] + } + } + } +] diff --git a/src/etl/sds/worker/transform_bulk/transform_bulk.py b/src/etl/sds/worker/bulk/transform_bulk/transform_bulk.py similarity index 52% rename from src/etl/sds/worker/transform_bulk/transform_bulk.py rename to src/etl/sds/worker/bulk/transform_bulk/transform_bulk.py index 6a8c9d979..6dd5ec5d4 100644 --- a/src/etl/sds/worker/transform_bulk/transform_bulk.py +++ b/src/etl/sds/worker/bulk/transform_bulk/transform_bulk.py @@ -3,18 +3,25 @@ from typing import TYPE_CHECKING import boto3 -from domain.core.device import Device -from domain.core.event import ExportedEventTypeDef +from domain.core.aggregate_root import AggregateRoot +from domain.repository.questionnaire_repository.v1.questionnaire_repository import ( + QuestionnaireRepository, +) +from domain.repository.questionnaire_repository.v1.questionnaires import ( + QuestionnaireInstance, +) from etl_utils.constants import WorkerKey from etl_utils.io import pkl_dump_lz4, pkl_dumps_lz4, pkl_load_lz4 from etl_utils.smart_open import smart_open from etl_utils.worker.action import apply_action from etl_utils.worker.model import WorkerActionResponse, WorkerEvent from etl_utils.worker.worker_step_chain import execute_step_chain +from event.aws.client import dynamodb_client from event.environment import BaseEnvironment -from sds.cpm_translation import translate +from sds.epr.bulk_create.bulk_create import create_complete_epr_product +from sds.epr.bulk_create.epr_product_team_repository import EprProductTeamRepository -from etl.sds.worker.transform_bulk.utils import smart_open_if_exists +from etl.sds.worker.bulk.transform_bulk.utils import smart_open_if_exists if TYPE_CHECKING: from mypy_boto3_s3 import S3Client @@ -22,6 +29,7 @@ class TransformWorkerEnvironment(BaseEnvironment): ETL_BUCKET: str + TABLE_NAME: str def s3_path(self, key) -> str: return f"s3://{self.ETL_BUCKET}/{key}" @@ -33,13 +41,33 @@ def s3_path(self, key) -> str: SERIALISED_EMPTY_DEQUE = pkl_dumps_lz4(deque([])) -def export_devices(devices: list[Device]) -> list[dict]: - """ - Serialises the current state of provided devices as dictionaries. - This is needed for the transform lambda in "bulk" mode. - """ - _devices = [device.state() for device in devices] - return _devices +AS_DEVICE_QUESTIONNAIRE = QuestionnaireRepository().read(QuestionnaireInstance.SPINE_AS) +MHS_DEVICE_QUESTIONNAIRE = QuestionnaireRepository().read( + QuestionnaireInstance.SPINE_MHS +) +MESSAGE_SETS_QUESTIONNAIRE = QuestionnaireRepository().read( + QuestionnaireInstance.SPINE_MHS_MESSAGE_SETS +) +ADDITIONAL_INTERACTIONS_QUESTIONNAIRE = QuestionnaireRepository().read( + QuestionnaireInstance.SPINE_AS_ADDITIONAL_INTERACTIONS +) +ACCREDITED_SYSTEM_FIELD_MAPPING = QuestionnaireRepository().read_field_mapping( + QuestionnaireInstance.SPINE_AS +) +MHS_DEVICE_FIELD_MAPPING = QuestionnaireRepository().read_field_mapping( + QuestionnaireInstance.SPINE_MHS +) +MESSAGE_SETS_FIELD_MAPPING = QuestionnaireRepository().read_field_mapping( + QuestionnaireInstance.SPINE_MHS_MESSAGE_SETS +) +EPR_PRODUCT_TEAM_REPOSITORY = EprProductTeamRepository( + table_name=ENVIRONMENT.TABLE_NAME, dynamodb_client=dynamodb_client() +) + + +def export_domain_objects(objects: list[AggregateRoot]) -> list[dict]: + _obj = [{obj.__class__.__name__: obj.state()} for obj in objects] + return _obj def transform( @@ -49,11 +77,6 @@ def transform( max_records: int, unprocessed_records_cache: dict, ) -> WorkerActionResponse: - """ - Note that 'bulk' flag is passed through `translate` in order to optimise adding - tags to Device. See more details in `set_device_tags_bulk` by clicking through - `translate`. - """ unprocessed_records = unprocessed_records_cache.get("unprocessed_records") if not unprocessed_records: with smart_open(s3_path=s3_input_path, s3_client=s3_client) as f: @@ -64,13 +87,25 @@ def transform( s3_client=s3_client, empty_content=SERIALISED_EMPTY_DEQUE, ) as f: - processed_records: deque[ExportedEventTypeDef] = pkl_load_lz4(f) + processed_records = pkl_load_lz4(f) + + product_team_ids = {p.ods_code: p.id for p in EPR_PRODUCT_TEAM_REPOSITORY.search()} exception = apply_action( unprocessed_records=unprocessed_records, processed_records=processed_records, - action=lambda record: export_devices( - translate(obj=record, repository=None, bulk=True) + action=lambda record: export_domain_objects( + create_complete_epr_product( + party_key_group=record, + mhs_device_questionnaire=MHS_DEVICE_QUESTIONNAIRE, + message_set_questionnaire=MESSAGE_SETS_QUESTIONNAIRE, + additional_interactions_questionnaire=ADDITIONAL_INTERACTIONS_QUESTIONNAIRE, + accredited_system_questionnaire=AS_DEVICE_QUESTIONNAIRE, + mhs_device_field_mapping=MHS_DEVICE_FIELD_MAPPING, + message_set_field_mapping=MESSAGE_SETS_FIELD_MAPPING, + accredited_system_field_mapping=ACCREDITED_SYSTEM_FIELD_MAPPING, + product_team_ids=product_team_ids, + ) ), max_records=max_records, ) diff --git a/src/etl/sds/worker/transform_bulk/utils.py b/src/etl/sds/worker/bulk/transform_bulk/utils.py similarity index 100% rename from src/etl/sds/worker/transform_bulk/utils.py rename to src/etl/sds/worker/bulk/transform_bulk/utils.py diff --git a/src/etl/sds/worker/load_bulk/tests/_test_load_bulk_worker.py b/src/etl/sds/worker/load_bulk/tests/_test_load_bulk_worker.py deleted file mode 100644 index 8ae30e643..000000000 --- a/src/etl/sds/worker/load_bulk/tests/_test_load_bulk_worker.py +++ /dev/null @@ -1,225 +0,0 @@ -import os -from collections import deque -from typing import Callable, Generator -from unittest import mock -from uuid import UUID - -import pytest -from domain.core.device import Device, DeviceCreatedEvent, DeviceType -from domain.core.device_key import DeviceKeyType -from domain.repository.compression import pkl_loads_gzip -from domain.repository.device_repository import DeviceRepository -from domain.repository.keys import TableKey -from domain.repository.marshall import unmarshall -from etl_utils.constants import WorkerKey -from etl_utils.io import pkl_dumps_lz4 -from etl_utils.io.test.io_utils import pkl_loads_lz4 -from moto import mock_aws -from mypy_boto3_s3 import S3Client - -from test_helpers.dynamodb import mock_table - -BUCKET_NAME = "my-bucket" -TABLE_NAME = "my-table" - - -class MockDeviceRepository(DeviceRepository): - def all_devices(self) -> Generator[Device, None, None]: - response = self.client.scan(TableName=self.table_name) - items = map(unmarshall, response["Items"]) - devices = list(TableKey.DEVICE.filter(items, key="sk")) - for device in devices: - if not device.get("root"): - continue - if device.get("tags"): # Only compress if tags not empty - device["tags"] = [ - pkl_loads_gzip(tag) for tag in pkl_loads_gzip(device["tags"]) - ] - yield Device(**device) - - def count(self, by: DeviceType | DeviceKeyType): - return sum( - ( - device.device_type is by - if isinstance(by, DeviceType) - else any(key.key_type is by for key in device.keys) - ) - for device in self.all_devices() - ) - - -@pytest.fixture -def mock_s3_client(): - with mock_aws(), mock.patch.dict( - os.environ, - { - "ETL_BUCKET": BUCKET_NAME, - "TABLE_NAME": TABLE_NAME, - "AWS_DEFAULT_REGION": "us-east-1", - }, - clear=True, - ): - from etl.sds.worker.load_bulk import load_bulk - - load_bulk.CACHE.S3_CLIENT.create_bucket(Bucket=BUCKET_NAME) - yield load_bulk.CACHE.S3_CLIENT - - -@pytest.fixture -def put_object(mock_s3_client: S3Client): - return lambda key, body: ( - mock_s3_client.put_object(Bucket=BUCKET_NAME, Key=key, Body=body) - ) - - -@pytest.fixture -def get_object(mock_s3_client: S3Client) -> bytes: - return lambda key: ( - mock_s3_client.get_object(Bucket=BUCKET_NAME, Key=key)["Body"].read() - ) - - -@pytest.fixture -def repository(): - with mock_table(TABLE_NAME) as dynamodb_client: - yield MockDeviceRepository( - table_name=TABLE_NAME, dynamodb_client=dynamodb_client - ) - - -def device_factory(id: int) -> Device: - ods_code = "ABC" - device = Device( - id=UUID(int=id), - name=f"device-{id}", - device_type=DeviceType.PRODUCT, - product_team_id=UUID(int=1), - ods_code=ods_code, - ) - event = DeviceCreatedEvent(**device.dict()) - device.add_event(event) - device.add_key( - key_type=DeviceKeyType.ACCREDITED_SYSTEM_ID, key_value=f"{ods_code}:{id}" - ) - return device - - -@pytest.mark.parametrize( - ("n_initial_unprocessed", "n_initial_processed"), - [(0, 10), (5, 5), (10, 0)], - ids=["processed-only", "partly-processed", "unprocessed-only"], -) -def test_load_worker_pass( - n_initial_unprocessed: int, - n_initial_processed: int, - put_object: Callable[[str], None], - get_object: Callable[[str], bytes], - repository: MockDeviceRepository, -): - from etl.sds.worker.load_bulk import load_bulk - - # Initial state - initial_unprocessed_data = [ - device_factory(id=i + 1) for i in range(n_initial_unprocessed) - ] - initial_processed_data = [ - device_factory(id=(i + 1) * 1000) for i in range(n_initial_processed) - ] - put_object( - key=WorkerKey.LOAD, - body=pkl_dumps_lz4(deque(map(Device.state, initial_unprocessed_data))), - ) - repository.write_bulk(map(Device.state, initial_processed_data)) - - # Execute the load worker - response = load_bulk.handler( - event={"s3_input_path": f"s3://{BUCKET_NAME}/{WorkerKey.LOAD}"}, context=None - ) - assert response == { - "stage_name": "load", - "processed_records": n_initial_unprocessed, - "unprocessed_records": 0, - "error_message": None, - } - - # Final state - final_unprocessed_data = pkl_loads_lz4(get_object(key=WorkerKey.LOAD)) - final_processed_data: list[Device] = list(repository.all_devices()) - - initial_ids = sorted( - device.id for device in initial_unprocessed_data + initial_processed_data - ) - final_processed_ids = sorted(device.id for device in final_processed_data) - - # Confirm that everything has now been processed, and that there is no - # unprocessed data left in the bucket - assert final_processed_ids == initial_ids - assert final_unprocessed_data == deque([]) - - -@pytest.mark.parametrize( - ("n_initial_unprocessed", "n_initial_processed"), - [(0, 10), (5, 5), (10, 0)], - ids=["processed-only", "partly-processed", "unprocessed-only"], -) -def test_load_worker_pass_max_records( - n_initial_unprocessed: int, - n_initial_processed: int, - put_object: Callable[[str], None], - get_object: Callable[[str], bytes], - repository: MockDeviceRepository, -): - MAX_RECORDS = 7 - - from etl.sds.worker.load_bulk import load_bulk - - # Initial state - initial_unprocessed_data = [ - device_factory(id=i + 1) for i in range(n_initial_unprocessed) - ] - initial_processed_data = [ - device_factory(id=(i + 1) * 1000) for i in range(n_initial_processed) - ] - put_object( - key=WorkerKey.LOAD, - body=pkl_dumps_lz4(deque(map(Device.state, initial_unprocessed_data))), - ) - repository.write_bulk(map(Device.state, initial_processed_data)) - - n_unprocessed_records = len(initial_unprocessed_data) - while n_unprocessed_records > 0: - n_processed_records_expected = min(n_unprocessed_records, MAX_RECORDS) - n_unprocessed_records_expected = ( - n_unprocessed_records - n_processed_records_expected - ) - - # Execute the load worker - response = load_bulk.handler( - event={ - "max_records": MAX_RECORDS, - "s3_input_path": f"s3://{BUCKET_NAME}/{WorkerKey.LOAD}", - }, - context=None, - ) - assert response == { - "stage_name": "load", - "processed_records": n_processed_records_expected, - "unprocessed_records": n_unprocessed_records_expected, - "error_message": None, - } - - n_unprocessed_records = response["unprocessed_records"] - - # Final state - final_unprocessed_data = pkl_loads_lz4(get_object(key=WorkerKey.LOAD)) - final_processed_data: list[Device] = list(repository.all_devices()) - - initial_ids = sorted( - device.id for device in initial_unprocessed_data + initial_processed_data - ) - final_processed_ids = sorted(device.id for device in final_processed_data) - - # Confirm that everything has now been processed, and that there is no - # unprocessed data left in the bucket - assert final_processed_ids == initial_ids - assert final_unprocessed_data == deque([]) diff --git a/src/etl/sds/worker/transform_bulk/tests/_test_transform_bulk_worker.py b/src/etl/sds/worker/transform_bulk/tests/_test_transform_bulk_worker.py deleted file mode 100644 index 49f573699..000000000 --- a/src/etl/sds/worker/transform_bulk/tests/_test_transform_bulk_worker.py +++ /dev/null @@ -1,300 +0,0 @@ -import os -import re -from collections import deque -from itertools import permutations -from math import ceil -from typing import Callable -from unittest import mock - -import pytest -from botocore.exceptions import ClientError -from etl_utils.constants import WorkerKey -from etl_utils.io import pkl_dumps_lz4 -from etl_utils.io.test.io_utils import pkl_loads_lz4 -from etl_utils.smart_open import smart_open -from moto import mock_aws -from mypy_boto3_s3 import S3Client - -from etl.sds.worker.transform_bulk.utils import smart_open_if_exists - -BUCKET_NAME = "my-bucket" -TABLE_NAME = "my-table" - -GOOD_SDS_RECORD_AS_JSON = { - "description": None, - "distinguished_name": { - "organisation": "nhs", - "organisational_unit": "services", - "unique_identifier": None, - }, - "nhs_approver_urp": "uniqueIdentifier=562983788547,uniqueIdentifier=883298590547,uid=503560389549,ou=People,o=nhs", - "nhs_as_acf": None, - "nhs_as_category_bag": None, - "nhs_as_client": ["RVL"], - "nhs_as_svc_ia": ["urn:nhs:names:services:pds:QUPA_IN040000UK01"], - "nhs_date_approved": "20090601140104", - "nhs_date_requested": "20090601135904", - "nhs_id_code": "RVL", - "nhs_mhs_manufacturer_org": "LSP04", - "nhs_mhs_party_key": "RVL-806539", - "nhs_product_key": "634", - "nhs_product_name": "Cerner Millennium", - "nhs_product_version": "2005.02", - "nhs_requestor_urp": "uniqueIdentifier=977624345541,uniqueIdentifier=883298590547,uid=503560389549,ou=People,o=nhs", - "nhs_temp_uid": "9713", - "object_class": "nhsAS", - "unique_identifier": "000428682512", -} - -BAD_SDS_RECORD_AS_JSON = {} - -FATAL_SDS_RECORD_AS_JSON = " bytes: - return lambda key: ( - mock_s3_client.get_object(Bucket=BUCKET_NAME, Key=key)["Body"].read() - ) - - -def test_transform_worker_pass_no_dupes( - put_object: Callable[[str], None], - get_object: Callable[[str], bytes], -): - from etl.sds.worker.transform_bulk import transform_bulk - - initial_unprocessed_data = deque([GOOD_SDS_RECORD_AS_JSON]) - initial_processed_data = deque([]) - - # Initial state - n_initial_unprocessed = len(initial_unprocessed_data) - n_initial_processed = len(initial_processed_data) - put_object(key=WorkerKey.TRANSFORM, body=pkl_dumps_lz4(initial_unprocessed_data)) - put_object(key=WorkerKey.LOAD, body=pkl_dumps_lz4(initial_processed_data)) - - # Execute the transform worker - response = transform_bulk.handler(event={}, context=None) - - assert response == [ - { - "stage_name": "transform", - "processed_records": n_initial_processed + n_initial_unprocessed, - "unprocessed_records": 0, - "s3_input_path": "s3://my-bucket/input--load/unprocessed.0", - "error_message": None, - } - ] - - # Final state - final_unprocessed_data: str = get_object(key=WorkerKey.TRANSFORM) - final_processed_data: str = get_object(key=f"{WorkerKey.LOAD}.0") - n_final_unprocessed = len(pkl_loads_lz4(final_unprocessed_data)) - n_final_processed = len(pkl_loads_lz4(final_processed_data)) - - # Confirm that everything has now been processed, and that there is no - # unprocessed data left in the bucket - assert n_final_processed == n_initial_processed + n_initial_unprocessed - assert n_final_unprocessed == 0 - - -@pytest.mark.parametrize("max_records", range(1, 10)) -def test_transform_worker_pass_no_dupes_max_records( - put_object: Callable[[str], None], - get_object: Callable[[str], bytes], - max_records: int, -): - from etl.sds.worker.transform_bulk import transform_bulk - - initial_unprocessed_data = deque([GOOD_SDS_RECORD_AS_JSON] * 10) - initial_processed_data = deque([]) - - # Initial state - n_initial_unprocessed = len(initial_unprocessed_data) - n_initial_processed = len(initial_processed_data) - put_object(key=WorkerKey.TRANSFORM, body=pkl_dumps_lz4(initial_unprocessed_data)) - put_object(key=WorkerKey.LOAD, body=pkl_dumps_lz4(initial_processed_data)) - - expected_responses = [] - expected_iterations = min( - ceil(n_initial_unprocessed / max_records), transform_bulk.FAN_OUT - ) - - total_processed = 0 - for i in range(expected_iterations): - chunk_size = min(n_initial_unprocessed - total_processed, max_records) - - total_processed += chunk_size - expected_responses.append( - { - "stage_name": "transform", - "processed_records": chunk_size, - "unprocessed_records": (n_initial_unprocessed - total_processed), - "s3_input_path": f"s3://my-bucket/input--load/unprocessed.{i}", - "error_message": None, - } - ) - - # Execute the transform worker - responses = transform_bulk.handler(event={"max_records": max_records}, context=None) - - assert responses == expected_responses - - # Final state - final_unprocessed_data: str = get_object(key=WorkerKey.TRANSFORM) - n_final_unprocessed = len(pkl_loads_lz4(final_unprocessed_data)) - - n_final_processed = 0 - for i in range(expected_iterations): - final_processed_data: str = get_object(key=f"{WorkerKey.LOAD}.{i}") - n_final_processed += len(pkl_loads_lz4(final_processed_data)) - - # Confirm that everything has now been processed, and that there is no - # unprocessed data left in the bucket - assert n_final_processed == n_initial_processed + n_initial_unprocessed - assert n_final_unprocessed == n_initial_unprocessed - total_processed - - -@pytest.mark.parametrize( - "initial_unprocessed_data", - permutations( - [BAD_SDS_RECORD_AS_JSON, GOOD_SDS_RECORD_AS_JSON, GOOD_SDS_RECORD_AS_JSON] - ), -) -def test_transform_worker_bad_record( - initial_unprocessed_data: str, - put_object: Callable[[str], None], - get_object: Callable[[str], bytes], -): - from etl.sds.worker.transform_bulk import transform_bulk - - _initial_unprocessed_data = pkl_dumps_lz4(deque(initial_unprocessed_data)) - bad_record_index = initial_unprocessed_data.index(BAD_SDS_RECORD_AS_JSON) - - # Initial state - n_initial_unprocessed = len(initial_unprocessed_data) - put_object(key=WorkerKey.TRANSFORM, body=_initial_unprocessed_data) - - # Execute the transform worker - responses = transform_bulk.handler(event={}, context=None) - assert len(responses) == 1 - - responses[0]["error_message"] = responses[0]["error_message"].split("\n")[:6] - - assert responses[0] == { - "stage_name": "transform", - "processed_records": bad_record_index, - "unprocessed_records": n_initial_unprocessed - bad_record_index, - "s3_input_path": "s3://my-bucket/input--load/unprocessed.0", - "error_message": [ - "The following errors were encountered", - " -- Error 1 (KeyError) --", - f" Failed to parse record {bad_record_index}", - " {}", - " 'object_class'", - "Traceback (most recent call last):", - ], - } - - # Final state - final_unprocessed_data: str = get_object(key=WorkerKey.TRANSFORM) - final_processed_data: str = get_object(key=f"{WorkerKey.LOAD}.0") - n_final_unprocessed = len(pkl_loads_lz4(final_unprocessed_data)) - n_final_processed = len(pkl_loads_lz4(final_processed_data)) - - # Confirm that there are still unprocessed records, and that there may have been - # some records processed successfully - assert n_final_unprocessed > 0 - assert n_final_processed == bad_record_index - assert n_final_unprocessed == n_initial_unprocessed - bad_record_index - - -def test_transform_worker_fatal_record( - put_object: Callable[[str], None], get_object: Callable[[str], str] -): - from etl.sds.worker.transform_bulk import transform_bulk - - # Initial state - initial_unprocessed_data = FATAL_SDS_RECORD_AS_JSON - put_object(key=WorkerKey.TRANSFORM, body=initial_unprocessed_data) - - # Execute the transform worker - responses = transform_bulk.handler(event={}, context=None) - assert len(responses) == 1 - - # The line number in the error changes for each example, so - # substitute it for the value 'NUMBER' - responses[0]["error_message"] = re.sub( - r"Line \d{1,2}", "Line NUMBER", responses[0]["error_message"] - ) - responses[0]["error_message"] = responses[0]["error_message"].split("\n")[:3] - - assert responses[0] == { - "stage_name": "transform", - "processed_records": None, - "unprocessed_records": None, - "s3_input_path": "s3://my-bucket/input--load/unprocessed.0", - "error_message": [ - "The following errors were encountered", - " -- Error 1 (RuntimeError) --", - " LZ4F_decompress failed with code: ERROR_frameType_unknown", - ], - } - - # Final state - final_unprocessed_data: str = get_object(key=WorkerKey.TRANSFORM).decode() - - with pytest.raises(ClientError): - get_object(key=f"{WorkerKey.LOAD}.0") - - # Confirm that no changes were persisted - assert final_unprocessed_data == initial_unprocessed_data - - -def test_smart_open_if_exists(mock_s3_client: "S3Client"): - s3_path = f"s3://{BUCKET_NAME}/some_file" - initial_content = b"hiya" - final_content = b"bye" - - # Set initial content via a default value - with smart_open_if_exists( - s3_client=mock_s3_client, s3_path=s3_path, empty_content=initial_content - ) as f: - assert f.read() == initial_content - - # Overwrite with standard smart_open - with smart_open(s3_client=mock_s3_client, s3_path=s3_path, mode="wb") as f: - f.write(final_content) - - # Verify not overwritten with default value - with smart_open_if_exists( - s3_client=mock_s3_client, s3_path=s3_path, empty_content=initial_content - ) as f: - assert f.read() == final_content diff --git a/src/etl/sds/worker/transform_bulk/__init__.py b/src/etl/sds/worker/update/extract_update/__init__.py similarity index 100% rename from src/etl/sds/worker/transform_bulk/__init__.py rename to src/etl/sds/worker/update/extract_update/__init__.py diff --git a/src/etl/sds/worker/extract/extract.py b/src/etl/sds/worker/update/extract_update/extract_update.py similarity index 100% rename from src/etl/sds/worker/extract/extract.py rename to src/etl/sds/worker/update/extract_update/extract_update.py diff --git a/src/etl/sds/worker/transform_bulk/make/make.py b/src/etl/sds/worker/update/extract_update/make/make.py similarity index 100% rename from src/etl/sds/worker/transform_bulk/make/make.py rename to src/etl/sds/worker/update/extract_update/make/make.py diff --git a/src/etl/sds/worker/extract/tests/test_extract_worker.py b/src/etl/sds/worker/update/extract_update/tests/_test_extract_update_worker.py similarity index 93% rename from src/etl/sds/worker/extract/tests/test_extract_worker.py rename to src/etl/sds/worker/update/extract_update/tests/_test_extract_update_worker.py index 0dcfda2ec..428f3417a 100644 --- a/src/etl/sds/worker/extract/tests/test_extract_worker.py +++ b/src/etl/sds/worker/update/extract_update/tests/_test_extract_update_worker.py @@ -80,11 +80,11 @@ def mock_s3_client(): {"ETL_BUCKET": BUCKET_NAME, "AWS_DEFAULT_REGION": "us-east-1"}, clear=True, ): - from etl.sds.worker.extract import extract + from etl.sds.worker.extract_update import extract_update - extract.ENVIRONMENT = WorkerEnvironment.build() - extract.S3_CLIENT.create_bucket(Bucket=BUCKET_NAME) - yield extract.S3_CLIENT + extract_update.ENVIRONMENT = WorkerEnvironment.build() + extract_update.S3_CLIENT.create_bucket(Bucket=BUCKET_NAME) + yield extract_update.S3_CLIENT @pytest.fixture @@ -120,7 +120,7 @@ def test_extract_worker_pass( put_object: Callable[[str], None], get_object: Callable[[str], bytes], ): - from etl.sds.worker.extract import extract + from etl.sds.worker.extract_update import extract_update # Initial state n_initial_unprocessed = len(_split_ldif(initial_unprocessed_data)) @@ -129,7 +129,7 @@ def test_extract_worker_pass( put_object(key=WorkerKey.TRANSFORM, body=pkl_dumps_lz4(initial_processed_data)) # Execute the extract worker - response = extract.handler(event={}, context=None) + response = extract_update.handler(event={}, context=None) assert response == { "stage_name": "extract", "processed_records": 10, @@ -166,7 +166,7 @@ def test_extract_worker_pass_max_records( put_object: Callable[[str], None], get_object: Callable[[str], bytes], ): - from etl.sds.worker.extract import extract + from etl.sds.worker.extract_update import extract_update # Initial state n_initial_unprocessed = len(_split_ldif(initial_unprocessed_data)) @@ -184,7 +184,9 @@ def test_extract_worker_pass_max_records( n_total_processed_records_expected += n_newly_processed_records_expected # Execute the extract worker - response = extract.handler(event={"max_records": max_records}, context=None) + response = extract_update.handler( + event={"max_records": max_records}, context=None + ) assert response == { "stage_name": "extract", "processed_records": n_total_processed_records_expected, @@ -215,7 +217,7 @@ def test_extract_worker_bad_record( put_object: Callable[[str], None], get_object: Callable[[str], bytes], ): - from etl.sds.worker.extract import extract + from etl.sds.worker.extract_update import extract_update _initial_unprocessed_data = "\n".join(initial_unprocessed_data) bad_record_index = initial_unprocessed_data.index(BAD_SDS_RECORD) @@ -228,7 +230,7 @@ def test_extract_worker_bad_record( put_object(key=WorkerKey.TRANSFORM, body=initial_processed_data) # Execute the extract worker - response = extract.handler(event={}, context=None) + response = extract_update.handler(event={}, context=None) response["error_message"] = response["error_message"].split("\n")[:24] assert response == { @@ -285,7 +287,7 @@ def test_extract_worker_fatal_record( put_object: Callable[[str], None], get_object: Callable[[str], bytes], ): - from etl.sds.worker.extract import extract + from etl.sds.worker.extract_update import extract_update # Initial state _initial_unprocessed_data = "\n".join(initial_unprocessed_data) @@ -296,7 +298,7 @@ def test_extract_worker_fatal_record( put_object(key=WorkerKey.TRANSFORM, body=initial_processed_data) # Execute the extract worker - response = extract.handler(event={}, context=None) + response = extract_update.handler(event={}, context=None) # The line number in the error changes for each example, so # substitute it for the value 'NUMBER' diff --git a/src/etl/sds/worker/transform_update/__init__.py b/src/etl/sds/worker/update/load_update/__init__.py similarity index 100% rename from src/etl/sds/worker/transform_update/__init__.py rename to src/etl/sds/worker/update/load_update/__init__.py diff --git a/src/etl/sds/worker/load_update/load_update.py b/src/etl/sds/worker/update/load_update/load_update.py similarity index 100% rename from src/etl/sds/worker/load_update/load_update.py rename to src/etl/sds/worker/update/load_update/load_update.py diff --git a/src/etl/sds/worker/transform_update/make/make.py b/src/etl/sds/worker/update/load_update/make/make.py similarity index 100% rename from src/etl/sds/worker/transform_update/make/make.py rename to src/etl/sds/worker/update/load_update/make/make.py diff --git a/src/etl/sds/worker/load_update/tests/_test_load_update_worker.py b/src/etl/sds/worker/update/load_update/tests/_test_load_update_worker.py similarity index 98% rename from src/etl/sds/worker/load_update/tests/_test_load_update_worker.py rename to src/etl/sds/worker/update/load_update/tests/_test_load_update_worker.py index 12e971460..f25daec20 100644 --- a/src/etl/sds/worker/load_update/tests/_test_load_update_worker.py +++ b/src/etl/sds/worker/update/load_update/tests/_test_load_update_worker.py @@ -5,19 +5,19 @@ import pytest from domain.core.device import Device -from etl_utils.constants import WorkerKey -from etl_utils.io import pkl_dumps_lz4 -from etl_utils.io.test.io_utils import pkl_loads_lz4 -from moto import mock_aws -from mypy_boto3_s3 import S3Client - -from etl.sds.worker.load_bulk.tests._test_load_bulk_worker import ( +from etl.sds.worker.load_bulk.tests.test_load_bulk_worker import ( BUCKET_NAME, TABLE_NAME, MockDeviceRepository, device_factory, ) from etl.sds.worker.transform_update.utils import export_events +from etl_utils.constants import WorkerKey +from etl_utils.io import pkl_dumps_lz4 +from etl_utils.io.test.io_utils import pkl_loads_lz4 +from moto import mock_aws +from mypy_boto3_s3 import S3Client + from test_helpers.dynamodb import mock_table diff --git a/src/etl/sds/worker/update/transform_update/__init__.py b/src/etl/sds/worker/update/transform_update/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/etl/sds/worker/update/transform_update/make/make.py b/src/etl/sds/worker/update/transform_update/make/make.py new file mode 100644 index 000000000..f33cd7068 --- /dev/null +++ b/src/etl/sds/worker/update/transform_update/make/make.py @@ -0,0 +1,4 @@ +from builder.lambda_build import build + +if __name__ == "__main__": + build(__file__) diff --git a/src/etl/sds/worker/transform_update/tests/_test_transform_update_worker.py b/src/etl/sds/worker/update/transform_update/tests/_test_transform_update_worker.py similarity index 99% rename from src/etl/sds/worker/transform_update/tests/_test_transform_update_worker.py rename to src/etl/sds/worker/update/transform_update/tests/_test_transform_update_worker.py index bc30891a4..f565465bf 100644 --- a/src/etl/sds/worker/transform_update/tests/_test_transform_update_worker.py +++ b/src/etl/sds/worker/update/transform_update/tests/_test_transform_update_worker.py @@ -9,13 +9,7 @@ import pytest from domain.core.device import DeviceType from domain.core.root import Root -from etl_utils.constants import WorkerKey -from etl_utils.io import pkl_dumps_lz4 -from etl_utils.io.test.io_utils import pkl_loads_lz4 -from moto import mock_aws -from mypy_boto3_s3 import S3Client - -from etl.sds.worker.transform_bulk.tests._test_transform_bulk_worker import ( +from etl.sds.worker.transform_bulk.tests.test_transform_bulk_worker import ( BAD_SDS_RECORD_AS_JSON, BUCKET_NAME, FATAL_SDS_RECORD_AS_JSON, @@ -24,6 +18,11 @@ TABLE_NAME, ) from etl.sds.worker.transform_update.utils import export_events +from etl_utils.constants import WorkerKey +from etl_utils.io import pkl_dumps_lz4 +from etl_utils.io.test.io_utils import pkl_loads_lz4 +from moto import mock_aws +from mypy_boto3_s3 import S3Client @pytest.fixture diff --git a/src/etl/sds/worker/transform_update/transform_update.py b/src/etl/sds/worker/update/transform_update/transform_update.py similarity index 99% rename from src/etl/sds/worker/transform_update/transform_update.py rename to src/etl/sds/worker/update/transform_update/transform_update.py index c78628168..7d63a4e03 100644 --- a/src/etl/sds/worker/transform_update/transform_update.py +++ b/src/etl/sds/worker/update/transform_update/transform_update.py @@ -5,6 +5,7 @@ import boto3 from domain.core.event import ExportedEventTypeDef from domain.repository.device_repository import DeviceRepository +from etl.sds.worker.transform_update.utils import export_events from etl_utils.constants import WorkerKey from etl_utils.io import pkl_dump_lz4, pkl_load_lz4 from etl_utils.smart_open import smart_open @@ -15,8 +16,6 @@ from event.environment import BaseEnvironment from sds.cpm_translation import translate -from etl.sds.worker.transform_update.utils import export_events - if TYPE_CHECKING: from mypy_boto3_s3 import S3Client diff --git a/src/etl/sds/worker/transform_update/utils.py b/src/etl/sds/worker/update/transform_update/utils.py similarity index 100% rename from src/etl/sds/worker/transform_update/utils.py rename to src/etl/sds/worker/update/transform_update/utils.py diff --git a/src/layers/domain/core/device/v1.py b/src/layers/domain/core/device/v1.py index 9036a60d8..d38c5a52c 100644 --- a/src/layers/domain/core/device/v1.py +++ b/src/layers/domain/core/device/v1.py @@ -159,10 +159,6 @@ class DeviceTagsClearedEvent(Event): @dataclass(kw_only=True, slots=True) class QuestionnaireResponseUpdatedEvent(Event): - """ - This is adding the initial questionnaire response from the event body request. - """ - id: str questionnaire_responses: dict[str, list[QuestionnaireResponse]] keys: list[DeviceKey]