Skip to content

Commit

Permalink
feat!: Add Function status (#399)
Browse files Browse the repository at this point in the history
Signed-off-by: SdgJlbl <[email protected]>
  • Loading branch information
SdgJlbl authored Feb 16, 2024
1 parent 8070c93 commit 32e354c
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 23 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- BREAKING: Renamed `function` field of the Function pydantic model to `archive`([#393](https://github.com/Substra/substra/pull/393))
- BREAKING: Renamed ComputeTask status ([#397](https://github.com/Substra/substra/pull/397))
- BREAKING: Renamed ComputeTask status values ([#397](https://github.com/Substra/substra/pull/397))
- `download_logs` uses the new endpoint ([#398](https://github.com/Substra/substra/pull/398))
- BREAKING: Renamed Status to ComputeTaskStatus ([#399](https://github.com/Substra/substra/pull/399))

### Added

- Paths are now resolved on DatasampleSpec objects. Which means that users can pass relative paths ([#392](https://github.com/Substra/substra/pull/392))
- Added FunctionStatus ([#399](https://github.com/Substra/substra/pull/399))

## [0.49.0](https://github.com/Substra/substra/releases/tag/0.49.0) - 2023-10-18

Expand Down
2 changes: 1 addition & 1 deletion references/sdk.md
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ The ``filters`` argument is a dictionary, with those possible keys:
rank (List[int]): list tasks which are at given ranks.

status (List[str]): list tasks with given status.
The possible values are the values of `substra.models.Status`
The possible values are the values of `substra.models.ComputeTaskStatus`
metadata (dict)
{
"key": str # the key of the metadata to filter on
Expand Down
2 changes: 1 addition & 1 deletion references/sdk_models.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Asset creation specification base class.
- owner: <class 'str'>
- compute_plan_key: <class 'str'>
- metadata: typing.Dict[str, str]
- status: <enum 'Status'>
- status: <enum 'ComputeTaskStatus'>
- worker: <class 'str'>
- rank: typing.Optional[int]
- tag: <class 'str'>
Expand Down
3 changes: 2 additions & 1 deletion substra/sdk/backends/local/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ def _add_function(self, key, spec, spec_options=None):
metadata=spec.metadata if spec.metadata else dict(),
inputs=_schemas_list_to_models_list(spec.inputs, models.FunctionInput),
outputs=_schemas_list_to_models_list(spec.outputs, models.FunctionOutput),
status=models.FunctionStatus.ready,
)
return self._db.add(function)

Expand Down Expand Up @@ -429,7 +430,7 @@ def _add_task(self, key, spec, spec_options=None):
outputs=_output_from_spec(spec.outputs),
tag=spec.tag or "",
# TODO: the waiting status should be more granular now
status=models.Status.waiting_for_executor_slot,
status=models.ComputeTaskStatus.waiting_for_executor_slot,
metadata=spec.metadata if spec.metadata else dict(),
)

Expand Down
4 changes: 2 additions & 2 deletions substra/sdk/backends/local/compute/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def schedule_task(self, task: models.Task):
task: Task to execute
"""
with self._context(task.key) as task_dir:
task.status = models.Status.doing
task.status = models.ComputeTaskStatus.doing
task.start_date = datetime.datetime.now()
function = self._db.get_with_files(schemas.Type.Function, task.function.key)
input_multiplicity = {i.identifier: i.multiple for i in function.inputs}
Expand Down Expand Up @@ -377,7 +377,7 @@ def schedule_task(self, task: models.Task):
)

# Set status
task.status = models.Status.done
task.status = models.ComputeTaskStatus.done
task.end_date = datetime.datetime.now()

self._update_cp(compute_plan=compute_plan, update_live_performances=update_live_performances)
2 changes: 1 addition & 1 deletion substra/sdk/backends/local/dal.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def get_performances(self, key: str) -> models.Performances:
performances = models.Performances()

for task in list_tasks:
if task.status == models.Status.done:
if task.status == models.ComputeTaskStatus.done:
function = self.get(schemas.Type.Function, task.function.key)
perf_identifiers = [
output.identifier for output in function.outputs if output.kind == schemas.AssetKind.performance
Expand Down
47 changes: 42 additions & 5 deletions substra/sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ def list_task(
worker (List[str]): list tasks which ran on listed workers. Remote mode only.\n
rank (List[int]): list tasks which are at given ranks.\n
status (List[str]): list tasks with given status.
The possible values are the values of `substra.models.Status`
The possible values are the values of `substra.models.ComputeTaskStatus`
metadata (dict)
{
"key": str # the key of the metadata to filter on
Expand Down Expand Up @@ -1062,9 +1062,46 @@ def wait_task(
Not raised when `timeout == None`
"""
asset_getter = self.get_task
status_canceled = models.Status.canceled.value
status_failed = models.Status.failed.value
statuses_stopped = (models.Status.done.value, models.Status.canceled.value)
status_canceled = models.ComputeTaskStatus.canceled.value
status_failed = models.ComputeTaskStatus.failed.value
statuses_stopped = (models.ComputeTaskStatus.done.value, models.ComputeTaskStatus.canceled.value)
return self._wait(
key=key,
asset_getter=asset_getter,
polling_period=polling_period,
raise_on_failure=raise_on_failure,
status_canceled=status_canceled,
status_failed=status_failed,
statuses_stopped=statuses_stopped,
timeout=timeout,
)

@logit
def wait_function(
self, key: str, *, timeout: Optional[float] = None, polling_period: float = 1.0, raise_on_failure: bool = True
) -> models.Task:
"""Wait for the build of the given function to finish.
It is considered finished when the status is ready, failed or cancelled.
Args:
key (str): the key of the task to wait for.
timeout (float, optional): maximum time to wait, in seconds. If set to None, will hang until completion.
polling_period (float): time to wait between two checks, in seconds. Defaults to 2.0.
raise_on_failure (bool): whether to raise an exception if the execution fails. Defaults to True.
Returns:
models.Task: the task after completion
Raises:
exceptions.FutureFailureError: The task failed or have been cancelled.
exceptions.FutureTimeoutError: The task took more than the duration set in the timeout to complete.
Not raised when `timeout == None`
"""
asset_getter = self.get_function
status_canceled = models.FunctionStatus.canceled.value
status_failed = models.FunctionStatus.failed.value
statuses_stopped = (models.FunctionStatus.ready.value, models.FunctionStatus.canceled.value)
return self._wait(
key=key,
asset_getter=asset_getter,
Expand Down Expand Up @@ -1095,7 +1132,7 @@ def _wait(
if asset.status in statuses_stopped:
break

if asset.status == models.Status.failed.value and asset.error_type is not None:
if asset.status == models.ComputeTaskStatus.failed.value and asset.error_type is not None:
# when dealing with a failed task, wait for the error_type field of the task to be set
# i.e. wait for the registration of the failure report
break
Expand Down
16 changes: 14 additions & 2 deletions substra/sdk/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class MetadataFilterType(str, enum.Enum):
exists = "exists"


class Status(str, enum.Enum):
class ComputeTaskStatus(str, enum.Enum):
"""Status of the task"""

unknown = "STATUS_UNKNOWN"
Expand All @@ -57,6 +57,17 @@ class ComputePlanStatus(str, enum.Enum):
empty = "PLAN_STATUS_EMPTY"


class FunctionStatus(str, enum.Enum):
"""Status of the function"""

unknown = "FUNCTION_STATUS_UNKNOWN"
waiting = "FUNCTION_STATUS_WAITING"
building = "FUNCTION_STATUS_BUILDING"
ready = "FUNCTION_STATUS_READY"
failed = "FUNCTION_STATUS_FAILED"
canceled = "FUNCTION_STATUS_CANCELED"


class TaskErrorType(str, enum.Enum):
"""Types of errors that can occur in a task"""

Expand Down Expand Up @@ -178,6 +189,7 @@ class Function(_Model):
creation_date: datetime
inputs: List[FunctionInput]
outputs: List[FunctionOutput]
status: FunctionStatus

description: _File
archive: _File
Expand Down Expand Up @@ -268,7 +280,7 @@ class Task(_Model):
owner: str
compute_plan_key: str
metadata: Dict[str, str]
status: Status
status: ComputeTaskStatus
worker: str
rank: Optional[int] = None
tag: str
Expand Down
9 changes: 9 additions & 0 deletions tests/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
"opener": {"kind": "ASSET_DATA_MANAGER", "optional": False, "multiple": False},
},
"outputs": {"model": {"kind": "ASSET_MODEL", "multiple": False}},
"status": "FUNCTION_STATUS_READY",
}


Expand Down Expand Up @@ -106,6 +107,7 @@
"shared": {"kind": "ASSET_MODEL", "optional": True, "multiple": False},
},
"outputs": {"predictions": {"kind": "ASSET_MODEL", "multiple": False}},
"status": "FUNCTION_STATUS_READY",
}


Expand Down Expand Up @@ -133,6 +135,7 @@
"predictions": {"kind": "ASSET_MODEL", "optional": False, "multiple": False},
},
"outputs": {"performance": {"kind": "ASSET_PERFORMANCE", "multiple": False}},
"status": "FUNCTION_STATUS_READY",
}


Expand Down Expand Up @@ -162,6 +165,7 @@
"opener": {"kind": "ASSET_DATA_MANAGER", "optional": False, "multiple": False},
},
"outputs": {"model": {"kind": "ASSET_MODEL", "multiple": False}},
"status": "FUNCTION_STATUS_READY",
},
"owner": "MyOrg1MSP",
"creation_date": "2021-08-24T13:36:07.393646367Z",
Expand Down Expand Up @@ -234,6 +238,7 @@
"metadata": {"foo": "bar"},
"inputs": {"model": {"kind": "ASSET_MODEL", "optional": False, "multiple": True}},
"outputs": {"model": {"kind": "ASSET_MODEL", "multiple": False}},
"status": "FUNCTION_STATUS_READY",
},
"owner": "MyOrg1MSP",
"creation_date": "2021-08-24T13:36:07.393646367Z",
Expand Down Expand Up @@ -308,6 +313,7 @@
"local": {"kind": "ASSET_MODEL", "multiple": False},
"shared": {"kind": "ASSET_MODEL", "multiple": False},
},
"status": "FUNCTION_STATUS_READY",
},
"owner": "MyOrg1MSP",
"creation_date": "2021-08-24T13:36:07.393646367Z",
Expand Down Expand Up @@ -420,6 +426,7 @@
"local": {"kind": "ASSET_MODEL", "multiple": False},
"shared": {"kind": "ASSET_MODEL", "multiple": False},
},
"status": "FUNCTION_STATUS_READY",
},
"owner": "MyOrg1MSP",
"creation_date": "2021-08-24T13:36:07.393646367Z",
Expand Down Expand Up @@ -478,6 +485,7 @@
"shared": {"kind": "ASSET_MODEL", "optional": True, "multiple": False},
},
"outputs": {"predictions": {"kind": "ASSET_MODEL", "multiple": False}},
"status": "FUNCTION_STATUS_READY",
},
"owner": "MyOrg1MSP",
"creation_date": "2021-08-24T13:36:07.393646367Z",
Expand Down Expand Up @@ -536,6 +544,7 @@
"predictions": {"kind": "ASSET_MODEL", "optional": False, "multiple": False},
},
"outputs": {"performance": {"kind": "ASSET_PERFORMANCE", "multiple": False}},
"status": "FUNCTION_STATUS_READY",
},
"owner": "MyOrg1MSP",
"creation_date": "2021-08-24T13:36:07.393646367Z",
Expand Down
14 changes: 7 additions & 7 deletions tests/sdk/test_wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from substra.sdk import exceptions
from substra.sdk.models import ComputePlanStatus
from substra.sdk.models import Status
from substra.sdk.models import ComputeTaskStatus
from substra.sdk.models import TaskErrorType

from .. import datastore
Expand All @@ -21,8 +21,8 @@ def _param_name_maker(arg):
@pytest.mark.parametrize(
("asset_dict", "function_name", "status", "expectation"),
[
(datastore.TRAINTASK, "wait_task", Status.done, does_not_raise()),
(datastore.TRAINTASK, "wait_task", Status.canceled, pytest.raises(exceptions.FutureFailureError)),
(datastore.TRAINTASK, "wait_task", ComputeTaskStatus.done, does_not_raise()),
(datastore.TRAINTASK, "wait_task", ComputeTaskStatus.canceled, pytest.raises(exceptions.FutureFailureError)),
(datastore.COMPUTE_PLAN, "wait_compute_plan", ComputePlanStatus.done, does_not_raise()),
(
datastore.COMPUTE_PLAN,
Expand All @@ -49,7 +49,7 @@ def test_wait(client, mocker, asset_dict, function_name, status, expectation):

def test_wait_task_failed(client, mocker):
# We need an error type to stop the iteration
item = {**datastore.TRAINTASK, "status": Status.failed, "error_type": TaskErrorType.internal}
item = {**datastore.TRAINTASK, "status": ComputeTaskStatus.failed, "error_type": TaskErrorType.internal}
mock_requests(mocker, "get", item)
with pytest.raises(exceptions.FutureFailureError):
client.wait_task(key=item["key"])
Expand All @@ -58,9 +58,9 @@ def test_wait_task_failed(client, mocker):
@pytest.mark.parametrize(
("asset_dict", "function_name", "status"),
[
(datastore.TRAINTASK, "wait_task", Status.waiting_for_parent_tasks),
(datastore.TRAINTASK, "wait_task", Status.waiting_for_builder_slot),
(datastore.TRAINTASK, "wait_task", Status.waiting_for_executor_slot),
(datastore.TRAINTASK, "wait_task", ComputeTaskStatus.waiting_for_parent_tasks),
(datastore.TRAINTASK, "wait_task", ComputeTaskStatus.waiting_for_builder_slot),
(datastore.TRAINTASK, "wait_task", ComputeTaskStatus.waiting_for_executor_slot),
(datastore.COMPUTE_PLAN, "wait_compute_plan", ComputePlanStatus.todo),
],
ids=_param_name_maker,
Expand Down
4 changes: 2 additions & 2 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ def test_check_metadata_search_filter(filters, expected, exception):
),
(
schemas.Type.Task,
{"status": [substra.models.Status.done.value]},
{"status": [substra.models.Status.done.value]},
{"status": [substra.models.ComputeTaskStatus.done.value]},
{"status": [substra.models.ComputeTaskStatus.done.value]},
None,
),
(schemas.Type.Task, {"rank": [1]}, {"rank": ["1"]}, None),
Expand Down

0 comments on commit 32e354c

Please sign in to comment.