Skip to content

Commit

Permalink
feat(api): Allow recovering from errors that happen during the prepar…
Browse files Browse the repository at this point in the history
…ation part of an aspirate command (#16896)
  • Loading branch information
SyntaxColoring authored Nov 25, 2024
1 parent 3f3a938 commit 36487d8
Show file tree
Hide file tree
Showing 5 changed files with 460 additions and 181 deletions.
108 changes: 68 additions & 40 deletions api/src/opentrons/protocol_engine/commands/aspirate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
FlowRateMixin,
BaseLiquidHandlingResult,
aspirate_in_place,
prepare_for_aspirate,
)
from .movement_common import (
LiquidHandlingWellLocationMixin,
Expand Down Expand Up @@ -94,6 +95,17 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
pipette_id = params.pipetteId
labware_id = params.labwareId
well_name = params.wellName
well_location = params.wellLocation

state_update = StateUpdate()

final_location = self._state_view.geometry.get_well_position(
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
operation_volume=-params.volume,
pipette_id=pipette_id,
)

ready_to_aspirate = self._pipetting.get_is_ready_to_aspirate(
pipette_id=pipette_id
Expand All @@ -102,14 +114,32 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
current_well = None

if not ready_to_aspirate:
await self._movement.move_to_well(
move_result = await move_to_well(
movement=self._movement,
model_utils=self._model_utils,
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=WellLocation(origin=WellOrigin.TOP),
)
state_update.append(move_result.state_update)
if isinstance(move_result, DefinedErrorData):
return DefinedErrorData(move_result.public, state_update=state_update)

await self._pipetting.prepare_for_aspirate(pipette_id=pipette_id)
prepare_result = await prepare_for_aspirate(
pipette_id=pipette_id,
pipetting=self._pipetting,
model_utils=self._model_utils,
# Note that the retryLocation is the final location, inside the liquid,
# because that's where we'd want the client to try re-aspirating if this
# command fails and the run enters error recovery.
location_if_error={"retryLocation": final_location},
)
state_update.append(prepare_result.state_update)
if isinstance(prepare_result, DefinedErrorData):
return DefinedErrorData(
public=prepare_result.public, state_update=state_update
)

# set our current deck location to the well now that we've made
# an intermediate move for the "prepare for aspirate" step
Expand All @@ -125,12 +155,15 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=params.wellLocation,
well_location=well_location,
current_well=current_well,
operation_volume=-params.volume,
)
state_update.append(move_result.state_update)
if isinstance(move_result, DefinedErrorData):
return move_result
return DefinedErrorData(
public=move_result.public, state_update=state_update
)

aspirate_result = await aspirate_in_place(
pipette_id=pipette_id,
Expand All @@ -147,47 +180,42 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
pipetting=self._pipetting,
model_utils=self._model_utils,
)
state_update.append(aspirate_result.state_update)
if isinstance(aspirate_result, DefinedErrorData):
return DefinedErrorData(
public=aspirate_result.public,
state_update=StateUpdate.reduce(
move_result.state_update, aspirate_result.state_update
).set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id,
well_name,
params.pipetteId,
),
volume_added=CLEAR,
),
state_update_if_false_positive=StateUpdate.reduce(
move_result.state_update,
aspirate_result.state_update_if_false_positive,
state_update.set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id,
well_name,
params.pipetteId,
),
volume_added=CLEAR,
)
else:
return SuccessData(
public=AspirateResult(
volume=aspirate_result.public.volume,
position=move_result.public.position,
),
state_update=StateUpdate.reduce(
move_result.state_update, aspirate_result.state_update
).set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id, well_name, pipette_id
),
volume_added=-aspirate_result.public.volume
* self._state_view.geometry.get_nozzles_per_well(
labware_id,
well_name,
params.pipetteId,
),
),
return DefinedErrorData(
public=aspirate_result.public, state_update=state_update
)

state_update.set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id, well_name, pipette_id
),
volume_added=-aspirate_result.public.volume
* self._state_view.geometry.get_nozzles_per_well(
labware_id,
well_name,
params.pipetteId,
),
)

return SuccessData(
public=AspirateResult(
volume=aspirate_result.public.volume,
position=move_result.public.position,
),
state_update=state_update,
)


class Aspirate(
BaseCommand[
Expand Down
6 changes: 5 additions & 1 deletion api/src/opentrons/protocol_engine/execution/pipetting.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ def get_is_ready_to_aspirate(self, pipette_id: str) -> bool:
)

async def prepare_for_aspirate(self, pipette_id: str) -> None:
"""Prepare for pipette aspiration."""
"""Prepare for pipette aspiration.
Raises:
PipetteOverpressureError, propagated as-is from the hardware controller.
"""
hw_mount = self._state_view.pipettes.get_mount(pipette_id).to_hw_mount()
await self._hardware_api.prepare_for_aspirate(mount=hw_mount)

Expand Down
30 changes: 17 additions & 13 deletions api/src/opentrons/protocol_engine/state/update_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,26 +299,30 @@ class StateUpdate:

liquid_class_loaded: LiquidClassLoadedUpdate | NoChangeType = NO_CHANGE

def append(self, other: Self) -> Self:
"""Apply another `StateUpdate` "on top of" this one.
This object is mutated in-place, taking values from `other`.
If an attribute in `other` is `NO_CHANGE`, the value in this object is kept.
"""
fields = dataclasses.fields(other)
for field in fields:
other_value = other.__dict__[field.name]
if other_value != NO_CHANGE:
self.__dict__[field.name] = other_value
return self

@classmethod
def reduce(cls: typing.Type[Self], *args: Self) -> Self:
"""Fuse multiple state updates into a single one.
State updates that are later in the parameter list are preferred to those that are earlier;
NO_CHANGE is ignored.
"""
fields = dataclasses.fields(cls)
changes_dicts = [
{
field.name: update.__dict__[field.name]
for field in fields
if update.__dict__[field.name] != NO_CHANGE
}
for update in args
]
changes = {}
for changes_dict in changes_dicts:
changes.update(changes_dict)
return cls(**changes)
accumulator = cls()
for arg in args:
accumulator.append(arg)
return accumulator

# These convenience functions let the caller avoid the boilerplate of constructing a
# complicated dataclass tree.
Expand Down
Loading

0 comments on commit 36487d8

Please sign in to comment.