Skip to content

Commit

Permalink
implemented validation for reachable steps and invalid next ids
Browse files Browse the repository at this point in the history
  • Loading branch information
tmbo committed May 25, 2023
1 parent 0e08e93 commit dc3911d
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 18 deletions.
5 changes: 3 additions & 2 deletions rasa/core/policies/flow_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,10 @@ def find_startable_flow(
The predicted action and the events to run.
"""
for flow in flows.underlying_flows:
if not flow.steps:
first_step = flow.start_step()
if not first_step:
continue
first_step = flow.steps[0]

if isinstance(
first_step, IntentFlowStep
) and first_step.intent == tracker.latest_message.intent.get(
Expand Down
150 changes: 135 additions & 15 deletions rasa/shared/core/flows/flow.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,73 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Protocol, Text
from typing import Any, Dict, List, Optional, Protocol, Set, Text
from rasa.shared.exceptions import RasaException

import rasa.shared.utils.io


class UnreachableFlowStepException(RasaException):
"""Raised when a flow step is unreachable."""

def __init__(self, step: FlowStep, flow: Flow) -> None:
"""Initializes the exception."""
self.step = step
self.flow = flow

def __str__(self) -> Text:
"""Return a string representation of the exception."""
return (
f"Step '{self.step.id}' in flow '{self.flow.id}' can not be reached from the start step. "
f"Please make sure that all steps can be reached from the start step, e.g. by "
f"checking that another step points to this step."
)


class UnresolvedFlowStepIdException(RasaException):
"""Raised when a flow step is referenced but it's id can not be resolved."""

def __init__(
self, step_id: Text, flow: Flow, referenced_from: Optional[FlowStep]
) -> None:
"""Initializes the exception."""
self.step_id = step_id
self.flow = flow
self.referenced_from = referenced_from

def __str__(self) -> Text:
"""Return a string representation of the exception."""
if self.referenced_from:
exception_message = (
f"Step with id '{self.step_id}' could not be resolved. "
f"'Step '{self.referenced_from.id}' in flow '{self.flow.id}' "
f"referenced this step but it does not exist. "
)
else:
exception_message = (
f"Step '{self.step_id}' in flow '{self.flow.id}' can not be resolved. "
)

return exception_message + (
f"Please make sure that the step is defined in the same flow."
)


class UnresolvedFlowException(RasaException):
"""Raised when a flow is referenced but it's id can not be resolved."""

def __init__(self, flow_id: Text) -> None:
"""Initializes the exception."""
self.flow_id = flow_id

def __str__(self) -> Text:
"""Return a string representation of the exception."""
return (
f"Flow '{self.flow_id}' can not be resolved. "
f"Please make sure that the flow is defined."
)


class FlowsList:
"""Represents the configuration of a list of flow.
Expand Down Expand Up @@ -80,25 +142,27 @@ def flow_by_id(self, id: Optional[Text]) -> Optional[Flow]:
else:
return None

def steps_of_flow(self, flow_id: Text) -> List[FlowStep]:
"""Return all the steps of the flow."""
flow = self.flow_by_id(flow_id)
if flow:
return flow.steps
return []

def first_step(self, flow_id: Text) -> Optional[FlowStep]:
"""Return the first step of the flow."""
steps = self.steps_of_flow(flow_id)
return steps[0] if steps else None
flow = self.flow_by_id(flow_id)
return flow.start_step() if flow else None

def step_by_id(self, step_id: Text, flow_id: Text) -> FlowStep:
"""Return the step with the given id."""
for step in self.steps_of_flow(flow_id):
if step.id == step_id:
return step
else:
raise ValueError(f"Step with id '{step_id}' not found.")
flow = self.flow_by_id(flow_id)
if not flow:
raise UnresolvedFlowException(flow_id)

step = flow.step_for_id(step_id)
if not step:
raise UnresolvedFlowStepIdException(step_id, flow)

return step

def validate(self) -> None:
"""Validate the flows."""
for flow in self.underlying_flows:
flow.validate()


@dataclass
Expand All @@ -110,6 +174,7 @@ class Flow:
description: Optional[Text]
"""The description of the flow."""
steps: List[FlowStep]
"""The steps of the flow."""

@staticmethod
def from_json(flow_id: Text, flow_config: Dict[Text, Any]) -> Flow:
Expand Down Expand Up @@ -142,6 +207,61 @@ def as_json(self) -> Dict[Text, Any]:
"steps": [step.as_json() for step in self.steps],
}

def validate(self) -> None:
"""Validates the flow configuration.
This ensures that the flow semantically makes sense. E.g. it
checks:
-"""

self._validate_all_next_ids_are_availble_steps()
self._validate_all_steps_can_be_reached()

def _validate_all_next_ids_are_availble_steps(self) -> None:
"""Validates that all next links point to existing steps."""

available_steps = {step.id for step in self.steps}
for step in self.steps:
for link in step.next.links:
if link.target not in available_steps:
raise UnresolvedFlowStepIdException(link.target, self, step)

def _validate_all_steps_can_be_reached(self) -> None:
"""Validates that all steps can be reached from the start step."""

def _reachable_steps(
step: Optional[FlowStep], reached_steps: Set[Text]
) -> Set[FlowStep]:
"""Validates that the given step can be reached from the start step."""
if step is None or step.id in reached_steps:
return reached_steps

reached_steps.add(step.id)
for link in step.next.links:
reached_steps = _reachable_steps(
self.step_for_id(link.target), reached_steps
)
return reached_steps

reached_steps = _reachable_steps(self.start_step(), set())

for step in self.steps:
if step.id not in reached_steps:
raise UnreachableFlowStepException(step, self)

def step_for_id(self, step_id: Text) -> Optional[FlowStep]:
"""Returns the step with the given id."""
for step in self.steps:
if step.id == step_id:
return step
return None

def start_step(self) -> Optional[FlowStep]:
"""Returns the start step of this flow."""
if len(self.steps) == 0:
return None
return self.steps[0]


def step_from_json(flow_step_config: Dict[Text, Any]) -> FlowStep:
"""Used to read flow steps from parsed YAML.
Expand Down
5 changes: 4 additions & 1 deletion rasa/shared/core/flows/yaml_flows_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ def read_from_string(cls, string: Text, skip_validation: bool = False) -> FlowsL

yaml_content = rasa.shared.utils.io.read_yaml(string)

return FlowsList.from_json(yaml_content.get(KEY_FLOWS, {}))
flows = FlowsList.from_json(yaml_content.get(KEY_FLOWS, {}))
if not skip_validation:
flows.validate()
return flows


class YamlFlowsWriter:
Expand Down

0 comments on commit dc3911d

Please sign in to comment.