diff --git a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py index 4216cd46cfbc52..e26b60c4d3043e 100644 --- a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py +++ b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py @@ -177,7 +177,7 @@ def _to_blocking_response( else: continue - raise Exception("Queue listening stopped unexpectedly.") + raise RuntimeError("queue listening stopped unexpectedly.") def _to_stream_response( self, generator: Generator[StreamResponse, None, None] diff --git a/api/core/app/task_pipeline/exc.py b/api/core/app/task_pipeline/exc.py new file mode 100644 index 00000000000000..e4b4168d0881e0 --- /dev/null +++ b/api/core/app/task_pipeline/exc.py @@ -0,0 +1,17 @@ +class TaskPipilineError(ValueError): + pass + + +class RecordNotFoundError(TaskPipilineError): + def __init__(self, record_name: str, record_id: str): + super().__init__(f"{record_name} with id {record_id} not found") + + +class WorkflowRunNotFoundError(RecordNotFoundError): + def __init__(self, workflow_run_id: str): + super().__init__("WorkflowRun", workflow_run_id) + + +class WorkflowNodeExecutionNotFoundError(RecordNotFoundError): + def __init__(self, workflow_node_execution_id: str): + super().__init__("WorkflowNodeExecution", workflow_node_execution_id) diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index 72e4c796c34546..df7dbace0ef818 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -58,6 +58,8 @@ WorkflowRunStatus, ) +from .exc import WorkflowNodeExecutionNotFoundError, WorkflowRunNotFoundError + class WorkflowCycleManage: _application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity] @@ -898,7 +900,7 @@ def _refetch_workflow_run(self, workflow_run_id: str) -> WorkflowRun: workflow_run = db.session.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).first() if not workflow_run: - raise Exception(f"Workflow run not found: {workflow_run_id}") + raise WorkflowRunNotFoundError(workflow_run_id) return workflow_run @@ -911,6 +913,6 @@ def _refetch_workflow_node_execution(self, node_execution_id: str) -> WorkflowNo workflow_node_execution = self._wip_workflow_node_executions.get(node_execution_id) if not workflow_node_execution: - raise Exception(f"Workflow node execution not found: {node_execution_id}") + raise WorkflowNodeExecutionNotFoundError(node_execution_id) return workflow_node_execution