Skip to content

Commit

Permalink
Merge branch 'feat/continue-on-error' into deploy/dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Novice Lee authored and Novice Lee committed Dec 5, 2024
2 parents 09f565f + e307f1c commit 8517b62
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 25 deletions.
29 changes: 27 additions & 2 deletions api/core/app/apps/workflow/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,38 @@ def _process_stream_response(
"conversation_id": None,
"trace_manager": trace_manager,
}
if isinstance(event, QueueWorkflowFailedEvent):
handle_args["exceptions_count"] = event.exceptions_count
workflow_run = self._handle_workflow_run_failed(**handle_args)

# save workflow app log
self._save_workflow_app_log(workflow_run)

yield self._workflow_finish_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
handle_args = {
"workflow_run": workflow_run,
"start_at": graph_runtime_state.start_at,
"total_tokens": graph_runtime_state.total_tokens,
"total_steps": graph_runtime_state.node_run_steps,
"status": WorkflowRunStatus.FAILED
if isinstance(event, QueueWorkflowFailedEvent)
else WorkflowRunStatus.STOPPED,
"error": event.error if isinstance(event, QueueWorkflowFailedEvent) else event.get_stop_reason(),
"conversation_id": None,
"trace_manager": trace_manager,
"exceptions_count": event.exceptions_count,
}
workflow_run = self._handle_workflow_run_partial_success(**handle_args)

# save workflow app log
self._save_workflow_app_log(workflow_run)

yield self._workflow_finish_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
Expand Down
30 changes: 17 additions & 13 deletions api/core/workflow/graph_engine/graph_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def _run(
previous_node_id=previous_node_id,
thread_pool_id=self.thread_pool_id,
)

node_instance = cast(BaseNode[BaseNodeData], node_instance)
try:
# run node
generator = self._run_node(
Expand Down Expand Up @@ -314,13 +314,13 @@ def _run(
break

if len(edge_mappings) == 1:
edge = edge_mappings[0]
if (
previous_route_node_state.status == RouteNodeState.Status.EXCEPTION
and node_instance.node_data.error_strategy == ErrorStrategy.FAIL_BRANCH
and edge.run_condition is None
):
break
edge = edge_mappings[0]

if edge.run_condition:
result = ConditionManager.get_condition_handler(
init_params=self.init_params,
Expand Down Expand Up @@ -651,7 +651,9 @@ def _run_node(
)

elif run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
if node_instance.should_continue_on_error:
if node_instance.should_continue_on_error and self.graph.edge_mapping.get(
node_instance.node_id
):
run_result.edge_source_handle = FailBranchSourceHandle.SUCCESS
if run_result.metadata and run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS):
# plus state total_tokens
Expand Down Expand Up @@ -827,15 +829,17 @@ def _handle_continue_on_error(
"error_type": error_result.error_type,
},
)

return NodeRunResult(
**node_error_args,
outputs={
"error_message": error_result.error,
"error_type": error_result.error_type,
},
edge_source_handle=FailBranchSourceHandle.FAILED,
)
elif node_instance.node_data.error_strategy is ErrorStrategy.FAIL_BRANCH:
if self.graph.edge_mapping.get(node_instance.node_id):
node_error_args["edge_source_handle"] = FailBranchSourceHandle.FAILED
return NodeRunResult(
**node_error_args,
outputs={
"error_message": error_result.error,
"error_type": error_result.error_type,
},
)
return error_result


class GraphRunFailedError(Exception):
Expand Down
23 changes: 13 additions & 10 deletions api/services/workflow_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
from collections.abc import Sequence
from datetime import UTC, datetime
from typing import Optional
from typing import Optional, cast

from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
Expand All @@ -11,6 +11,8 @@
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.errors import WorkflowNodeRunFailedError
from core.workflow.nodes import NodeType
from core.workflow.nodes.base.entities import BaseNodeData
from core.workflow.nodes.base.node import BaseNode
from core.workflow.nodes.enums import ErrorStrategy
from core.workflow.nodes.event import RunCompletedEvent
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
Expand Down Expand Up @@ -226,7 +228,7 @@ def run_draft_workflow_node(
user_inputs=user_inputs,
user_id=account.id,
)

node_instance = cast(BaseNode[BaseNodeData], node_instance)
node_run_result: NodeRunResult | None = None
for event in generator:
if isinstance(event, RunCompletedEvent):
Expand All @@ -238,6 +240,7 @@ def run_draft_workflow_node(

if not node_run_result:
raise ValueError("Node run failed with no run result")
# single step debug mode error handling return
if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error:
node_error_args = {
"status": WorkflowNodeExecutionStatus.EXCEPTION,
Expand All @@ -254,14 +257,14 @@ def run_draft_workflow_node(
"error_type": node_run_result.error_type,
},
)

node_run_result = NodeRunResult(
**node_error_args,
outputs={
"error_message": node_run_result.error,
"error_type": node_run_result.error_type,
},
)
else:
node_run_result = NodeRunResult(
**node_error_args,
outputs={
"error_message": node_run_result.error,
"error_type": node_run_result.error_type,
},
)
run_succeeded = node_run_result.status in (
WorkflowNodeExecutionStatus.SUCCEEDED,
WorkflowNodeExecutionStatus.EXCEPTION,
Expand Down

0 comments on commit 8517b62

Please sign in to comment.