From 32e354c8b586a60cf571ee35c919484c3eeefa94 Mon Sep 17 00:00:00 2001 From: SdgJlbl Date: Fri, 16 Feb 2024 17:58:49 +0100 Subject: [PATCH] feat!: Add Function status (#399) Signed-off-by: SdgJlbl --- CHANGELOG.md | 4 +- references/sdk.md | 2 +- references/sdk_models.md | 2 +- substra/sdk/backends/local/backend.py | 3 +- substra/sdk/backends/local/compute/worker.py | 4 +- substra/sdk/backends/local/dal.py | 2 +- substra/sdk/client.py | 47 +++++++++++++++++--- substra/sdk/models.py | 16 ++++++- tests/datastore.py | 9 ++++ tests/sdk/test_wait.py | 14 +++--- tests/test_utils.py | 4 +- 11 files changed, 84 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cc67a31b7..fbb08d4bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,12 +10,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - BREAKING: Renamed `function` field of the Function pydantic model to `archive`([#393](https://github.com/Substra/substra/pull/393)) -- BREAKING: Renamed ComputeTask status ([#397](https://github.com/Substra/substra/pull/397)) +- BREAKING: Renamed ComputeTask status values ([#397](https://github.com/Substra/substra/pull/397)) - `download_logs` uses the new endpoint ([#398](https://github.com/Substra/substra/pull/398)) +- BREAKING: Renamed Status to ComputeTaskStatus ([#399](https://github.com/Substra/substra/pull/399)) ### Added - Paths are now resolved on DatasampleSpec objects. Which means that users can pass relative paths ([#392](https://github.com/Substra/substra/pull/392)) +- Added FunctionStatus ([#399](https://github.com/Substra/substra/pull/399)) ## [0.49.0](https://github.com/Substra/substra/releases/tag/0.49.0) - 2023-10-18 diff --git a/references/sdk.md b/references/sdk.md index 7cc92c7f0..6814d2c66 100644 --- a/references/sdk.md +++ b/references/sdk.md @@ -546,7 +546,7 @@ The ``filters`` argument is a dictionary, with those possible keys: rank (List[int]): list tasks which are at given ranks. status (List[str]): list tasks with given status. - The possible values are the values of `substra.models.Status` + The possible values are the values of `substra.models.ComputeTaskStatus` metadata (dict) { "key": str # the key of the metadata to filter on diff --git a/references/sdk_models.md b/references/sdk_models.md index 8d6b757b8..40fecad7d 100644 --- a/references/sdk_models.md +++ b/references/sdk_models.md @@ -48,7 +48,7 @@ Asset creation specification base class. - owner: - compute_plan_key: - metadata: typing.Dict[str, str] -- status: +- status: - worker: - rank: typing.Optional[int] - tag: diff --git a/substra/sdk/backends/local/backend.py b/substra/sdk/backends/local/backend.py index 0ae914d52..4ccd22253 100644 --- a/substra/sdk/backends/local/backend.py +++ b/substra/sdk/backends/local/backend.py @@ -301,6 +301,7 @@ def _add_function(self, key, spec, spec_options=None): metadata=spec.metadata if spec.metadata else dict(), inputs=_schemas_list_to_models_list(spec.inputs, models.FunctionInput), outputs=_schemas_list_to_models_list(spec.outputs, models.FunctionOutput), + status=models.FunctionStatus.ready, ) return self._db.add(function) @@ -429,7 +430,7 @@ def _add_task(self, key, spec, spec_options=None): outputs=_output_from_spec(spec.outputs), tag=spec.tag or "", # TODO: the waiting status should be more granular now - status=models.Status.waiting_for_executor_slot, + status=models.ComputeTaskStatus.waiting_for_executor_slot, metadata=spec.metadata if spec.metadata else dict(), ) diff --git a/substra/sdk/backends/local/compute/worker.py b/substra/sdk/backends/local/compute/worker.py index 7933ed65e..33ff40eb7 100644 --- a/substra/sdk/backends/local/compute/worker.py +++ b/substra/sdk/backends/local/compute/worker.py @@ -255,7 +255,7 @@ def schedule_task(self, task: models.Task): task: Task to execute """ with self._context(task.key) as task_dir: - task.status = models.Status.doing + task.status = models.ComputeTaskStatus.doing task.start_date = datetime.datetime.now() function = self._db.get_with_files(schemas.Type.Function, task.function.key) input_multiplicity = {i.identifier: i.multiple for i in function.inputs} @@ -377,7 +377,7 @@ def schedule_task(self, task: models.Task): ) # Set status - task.status = models.Status.done + task.status = models.ComputeTaskStatus.done task.end_date = datetime.datetime.now() self._update_cp(compute_plan=compute_plan, update_live_performances=update_live_performances) diff --git a/substra/sdk/backends/local/dal.py b/substra/sdk/backends/local/dal.py index e422d10fc..af34161e9 100644 --- a/substra/sdk/backends/local/dal.py +++ b/substra/sdk/backends/local/dal.py @@ -123,7 +123,7 @@ def get_performances(self, key: str) -> models.Performances: performances = models.Performances() for task in list_tasks: - if task.status == models.Status.done: + if task.status == models.ComputeTaskStatus.done: function = self.get(schemas.Type.Function, task.function.key) perf_identifiers = [ output.identifier for output in function.outputs if output.kind == schemas.AssetKind.performance diff --git a/substra/sdk/client.py b/substra/sdk/client.py index b0ea924c5..82bc273b1 100644 --- a/substra/sdk/client.py +++ b/substra/sdk/client.py @@ -763,7 +763,7 @@ def list_task( worker (List[str]): list tasks which ran on listed workers. Remote mode only.\n rank (List[int]): list tasks which are at given ranks.\n status (List[str]): list tasks with given status. - The possible values are the values of `substra.models.Status` + The possible values are the values of `substra.models.ComputeTaskStatus` metadata (dict) { "key": str # the key of the metadata to filter on @@ -1062,9 +1062,46 @@ def wait_task( Not raised when `timeout == None` """ asset_getter = self.get_task - status_canceled = models.Status.canceled.value - status_failed = models.Status.failed.value - statuses_stopped = (models.Status.done.value, models.Status.canceled.value) + status_canceled = models.ComputeTaskStatus.canceled.value + status_failed = models.ComputeTaskStatus.failed.value + statuses_stopped = (models.ComputeTaskStatus.done.value, models.ComputeTaskStatus.canceled.value) + return self._wait( + key=key, + asset_getter=asset_getter, + polling_period=polling_period, + raise_on_failure=raise_on_failure, + status_canceled=status_canceled, + status_failed=status_failed, + statuses_stopped=statuses_stopped, + timeout=timeout, + ) + + @logit + def wait_function( + self, key: str, *, timeout: Optional[float] = None, polling_period: float = 1.0, raise_on_failure: bool = True + ) -> models.Task: + """Wait for the build of the given function to finish. + + It is considered finished when the status is ready, failed or cancelled. + + Args: + key (str): the key of the task to wait for. + timeout (float, optional): maximum time to wait, in seconds. If set to None, will hang until completion. + polling_period (float): time to wait between two checks, in seconds. Defaults to 2.0. + raise_on_failure (bool): whether to raise an exception if the execution fails. Defaults to True. + + Returns: + models.Task: the task after completion + + Raises: + exceptions.FutureFailureError: The task failed or have been cancelled. + exceptions.FutureTimeoutError: The task took more than the duration set in the timeout to complete. + Not raised when `timeout == None` + """ + asset_getter = self.get_function + status_canceled = models.FunctionStatus.canceled.value + status_failed = models.FunctionStatus.failed.value + statuses_stopped = (models.FunctionStatus.ready.value, models.FunctionStatus.canceled.value) return self._wait( key=key, asset_getter=asset_getter, @@ -1095,7 +1132,7 @@ def _wait( if asset.status in statuses_stopped: break - if asset.status == models.Status.failed.value and asset.error_type is not None: + if asset.status == models.ComputeTaskStatus.failed.value and asset.error_type is not None: # when dealing with a failed task, wait for the error_type field of the task to be set # i.e. wait for the registration of the failure report break diff --git a/substra/sdk/models.py b/substra/sdk/models.py index e82d780ae..8b9c00e7c 100644 --- a/substra/sdk/models.py +++ b/substra/sdk/models.py @@ -30,7 +30,7 @@ class MetadataFilterType(str, enum.Enum): exists = "exists" -class Status(str, enum.Enum): +class ComputeTaskStatus(str, enum.Enum): """Status of the task""" unknown = "STATUS_UNKNOWN" @@ -57,6 +57,17 @@ class ComputePlanStatus(str, enum.Enum): empty = "PLAN_STATUS_EMPTY" +class FunctionStatus(str, enum.Enum): + """Status of the function""" + + unknown = "FUNCTION_STATUS_UNKNOWN" + waiting = "FUNCTION_STATUS_WAITING" + building = "FUNCTION_STATUS_BUILDING" + ready = "FUNCTION_STATUS_READY" + failed = "FUNCTION_STATUS_FAILED" + canceled = "FUNCTION_STATUS_CANCELED" + + class TaskErrorType(str, enum.Enum): """Types of errors that can occur in a task""" @@ -178,6 +189,7 @@ class Function(_Model): creation_date: datetime inputs: List[FunctionInput] outputs: List[FunctionOutput] + status: FunctionStatus description: _File archive: _File @@ -268,7 +280,7 @@ class Task(_Model): owner: str compute_plan_key: str metadata: Dict[str, str] - status: Status + status: ComputeTaskStatus worker: str rank: Optional[int] = None tag: str diff --git a/tests/datastore.py b/tests/datastore.py index 2b5a230fa..9847d77e9 100644 --- a/tests/datastore.py +++ b/tests/datastore.py @@ -78,6 +78,7 @@ "opener": {"kind": "ASSET_DATA_MANAGER", "optional": False, "multiple": False}, }, "outputs": {"model": {"kind": "ASSET_MODEL", "multiple": False}}, + "status": "FUNCTION_STATUS_READY", } @@ -106,6 +107,7 @@ "shared": {"kind": "ASSET_MODEL", "optional": True, "multiple": False}, }, "outputs": {"predictions": {"kind": "ASSET_MODEL", "multiple": False}}, + "status": "FUNCTION_STATUS_READY", } @@ -133,6 +135,7 @@ "predictions": {"kind": "ASSET_MODEL", "optional": False, "multiple": False}, }, "outputs": {"performance": {"kind": "ASSET_PERFORMANCE", "multiple": False}}, + "status": "FUNCTION_STATUS_READY", } @@ -162,6 +165,7 @@ "opener": {"kind": "ASSET_DATA_MANAGER", "optional": False, "multiple": False}, }, "outputs": {"model": {"kind": "ASSET_MODEL", "multiple": False}}, + "status": "FUNCTION_STATUS_READY", }, "owner": "MyOrg1MSP", "creation_date": "2021-08-24T13:36:07.393646367Z", @@ -234,6 +238,7 @@ "metadata": {"foo": "bar"}, "inputs": {"model": {"kind": "ASSET_MODEL", "optional": False, "multiple": True}}, "outputs": {"model": {"kind": "ASSET_MODEL", "multiple": False}}, + "status": "FUNCTION_STATUS_READY", }, "owner": "MyOrg1MSP", "creation_date": "2021-08-24T13:36:07.393646367Z", @@ -308,6 +313,7 @@ "local": {"kind": "ASSET_MODEL", "multiple": False}, "shared": {"kind": "ASSET_MODEL", "multiple": False}, }, + "status": "FUNCTION_STATUS_READY", }, "owner": "MyOrg1MSP", "creation_date": "2021-08-24T13:36:07.393646367Z", @@ -420,6 +426,7 @@ "local": {"kind": "ASSET_MODEL", "multiple": False}, "shared": {"kind": "ASSET_MODEL", "multiple": False}, }, + "status": "FUNCTION_STATUS_READY", }, "owner": "MyOrg1MSP", "creation_date": "2021-08-24T13:36:07.393646367Z", @@ -478,6 +485,7 @@ "shared": {"kind": "ASSET_MODEL", "optional": True, "multiple": False}, }, "outputs": {"predictions": {"kind": "ASSET_MODEL", "multiple": False}}, + "status": "FUNCTION_STATUS_READY", }, "owner": "MyOrg1MSP", "creation_date": "2021-08-24T13:36:07.393646367Z", @@ -536,6 +544,7 @@ "predictions": {"kind": "ASSET_MODEL", "optional": False, "multiple": False}, }, "outputs": {"performance": {"kind": "ASSET_PERFORMANCE", "multiple": False}}, + "status": "FUNCTION_STATUS_READY", }, "owner": "MyOrg1MSP", "creation_date": "2021-08-24T13:36:07.393646367Z", diff --git a/tests/sdk/test_wait.py b/tests/sdk/test_wait.py index 58c14ecff..9e7204b7c 100644 --- a/tests/sdk/test_wait.py +++ b/tests/sdk/test_wait.py @@ -4,7 +4,7 @@ from substra.sdk import exceptions from substra.sdk.models import ComputePlanStatus -from substra.sdk.models import Status +from substra.sdk.models import ComputeTaskStatus from substra.sdk.models import TaskErrorType from .. import datastore @@ -21,8 +21,8 @@ def _param_name_maker(arg): @pytest.mark.parametrize( ("asset_dict", "function_name", "status", "expectation"), [ - (datastore.TRAINTASK, "wait_task", Status.done, does_not_raise()), - (datastore.TRAINTASK, "wait_task", Status.canceled, pytest.raises(exceptions.FutureFailureError)), + (datastore.TRAINTASK, "wait_task", ComputeTaskStatus.done, does_not_raise()), + (datastore.TRAINTASK, "wait_task", ComputeTaskStatus.canceled, pytest.raises(exceptions.FutureFailureError)), (datastore.COMPUTE_PLAN, "wait_compute_plan", ComputePlanStatus.done, does_not_raise()), ( datastore.COMPUTE_PLAN, @@ -49,7 +49,7 @@ def test_wait(client, mocker, asset_dict, function_name, status, expectation): def test_wait_task_failed(client, mocker): # We need an error type to stop the iteration - item = {**datastore.TRAINTASK, "status": Status.failed, "error_type": TaskErrorType.internal} + item = {**datastore.TRAINTASK, "status": ComputeTaskStatus.failed, "error_type": TaskErrorType.internal} mock_requests(mocker, "get", item) with pytest.raises(exceptions.FutureFailureError): client.wait_task(key=item["key"]) @@ -58,9 +58,9 @@ def test_wait_task_failed(client, mocker): @pytest.mark.parametrize( ("asset_dict", "function_name", "status"), [ - (datastore.TRAINTASK, "wait_task", Status.waiting_for_parent_tasks), - (datastore.TRAINTASK, "wait_task", Status.waiting_for_builder_slot), - (datastore.TRAINTASK, "wait_task", Status.waiting_for_executor_slot), + (datastore.TRAINTASK, "wait_task", ComputeTaskStatus.waiting_for_parent_tasks), + (datastore.TRAINTASK, "wait_task", ComputeTaskStatus.waiting_for_builder_slot), + (datastore.TRAINTASK, "wait_task", ComputeTaskStatus.waiting_for_executor_slot), (datastore.COMPUTE_PLAN, "wait_compute_plan", ComputePlanStatus.todo), ], ids=_param_name_maker, diff --git a/tests/test_utils.py b/tests/test_utils.py index 46019b4f9..29f302e99 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -81,8 +81,8 @@ def test_check_metadata_search_filter(filters, expected, exception): ), ( schemas.Type.Task, - {"status": [substra.models.Status.done.value]}, - {"status": [substra.models.Status.done.value]}, + {"status": [substra.models.ComputeTaskStatus.done.value]}, + {"status": [substra.models.ComputeTaskStatus.done.value]}, None, ), (schemas.Type.Task, {"rank": [1]}, {"rank": ["1"]}, None),