Skip to content

Commit

Permalink
Merge branch 'feat/continue-on-error' into deploy/dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Novice Lee authored and Novice Lee committed Dec 5, 2024
2 parents 86cb047 + e061bac commit 726649e
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 6 deletions.
1 change: 1 addition & 0 deletions api/core/app/apps/advanced_chat/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ def _process_stream_response(
error=event.error,
conversation_id=self._conversation.id,
trace_manager=trace_manager,
exceptions_count=event.exceptions_count,
)

yield self._workflow_finish_to_stream_response(
Expand Down
1 change: 1 addition & 0 deletions api/core/app/apps/workflow/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ def _process_stream_response(
"error": event.error if isinstance(event, QueueWorkflowFailedEvent) else event.get_stop_reason(),
"conversation_id": None,
"trace_manager": trace_manager,
"exceptions_count": event.exceptions_count,
}
workflow_run = self._handle_workflow_run_failed(**handle_args)

Expand Down
3 changes: 2 additions & 1 deletion api/core/app/task_pipeline/workflow_cycle_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def _handle_workflow_run_failed(
error: str,
conversation_id: Optional[str] = None,
trace_manager: Optional[TraceQueueManager] = None,
exceptions_count: int = 0,
) -> WorkflowRun:
"""
Workflow run failed
Expand All @@ -243,7 +244,7 @@ def _handle_workflow_run_failed(
workflow_run.total_tokens = total_tokens
workflow_run.total_steps = total_steps
workflow_run.finished_at = datetime.now(UTC).replace(tzinfo=None)

workflow_run.exceptions_count = exceptions_count
db.session.commit()

running_workflow_node_executions = (
Expand Down
15 changes: 12 additions & 3 deletions api/core/workflow/graph_engine/graph_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ def run(self) -> Generator[GraphEngineEvent, None, None]:
try:
yield item
if isinstance(item, NodeRunFailedEvent):
yield GraphRunFailedEvent(error=item.route_node_state.failed_reason or "Unknown error.")
yield GraphRunFailedEvent(
error=item.route_node_state.failed_reason or "Unknown error.",
exceptions_count=len(handle_exceptions),
)
return
elif isinstance(item, NodeRunSucceededEvent):
if item.node_type == NodeType.END:
Expand All @@ -178,7 +181,7 @@ def run(self) -> Generator[GraphEngineEvent, None, None]:
].strip()
except Exception as e:
logger.exception("Graph run failed")
yield GraphRunFailedEvent(error=str(e))
yield GraphRunFailedEvent(error=str(e), exceptions_count=len(handle_exceptions))
return
# count exceptions to determine partial success
exceptions_count = len(handle_exceptions)
Expand All @@ -196,7 +199,7 @@ def run(self) -> Generator[GraphEngineEvent, None, None]:
return
except Exception as e:
logger.exception("Unknown Error when graph running")
yield GraphRunFailedEvent(error=str(e), exceptions_count=len(handle_exceptions))
yield GraphRunFailedEvent(error=str(e), exceptions_count=exceptions_count)
self._release_thread()
raise e

Expand Down Expand Up @@ -387,6 +390,12 @@ def _run(
break

next_node_id = final_node_id
elif (
node_instance.node_data.error_strategy == ErrorStrategy.FAIL_BRANCH
and node_instance.should_continue_on_error
and previous_route_node_state.status == RouteNodeState.Status.EXCEPTION
):
break
else:
parallel_generator = self._run_parallel_branches(
edge_mappings=edge_mappings,
Expand Down
2 changes: 0 additions & 2 deletions api/tests/integration_tests/workflow/nodes/test_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.nodes.code.code_node import CodeNode
from core.workflow.nodes.code.entities import CodeNodeData
from core.workflow.nodes.event.event import RunCompletedEvent
from models.enums import UserFrom
from models.workflow import WorkflowNodeExecutionStatus, WorkflowType
from tests.integration_tests.workflow.nodes.__mock.code_executor import setup_code_executor_mock

CODE_MAX_STRING_LENGTH = int(getenv("CODE_MAX_STRING_LENGTH", "10000"))

Expand Down

0 comments on commit 726649e

Please sign in to comment.