Skip to content

Commit

Permalink
fix: Replace generic exceptions with specific error classes in task p…
Browse files Browse the repository at this point in the history
…ipeline

Signed-off-by: -LAN- <[email protected]>
  • Loading branch information
laipz8200 committed Dec 24, 2024
1 parent e0c24c0 commit fcfce22
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def _to_blocking_response(
else:
continue

raise Exception("Queue listening stopped unexpectedly.")
raise RuntimeError("queue listening stopped unexpectedly.")

def _to_stream_response(
self, generator: Generator[StreamResponse, None, None]
Expand Down
17 changes: 17 additions & 0 deletions api/core/app/task_pipeline/exc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
class TaskPipilineError(ValueError):
pass


class RecordNotFoundError(TaskPipilineError):
def __init__(self, record_name: str, record_id: str):
super().__init__(f"{record_name} with id {record_id} not found")


class WorkflowRunNotFoundError(RecordNotFoundError):
def __init__(self, workflow_run_id: str):
super().__init__("WorkflowRun", workflow_run_id)


class WorkflowNodeExecutionNotFoundError(RecordNotFoundError):
def __init__(self, workflow_node_execution_id: str):
super().__init__("WorkflowNodeExecution", workflow_node_execution_id)
6 changes: 4 additions & 2 deletions api/core/app/task_pipeline/workflow_cycle_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
WorkflowRunStatus,
)

from .exc import WorkflowNodeExecutionNotFoundError, WorkflowRunNotFoundError


class WorkflowCycleManage:
_application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity]
Expand Down Expand Up @@ -898,7 +900,7 @@ def _refetch_workflow_run(self, workflow_run_id: str) -> WorkflowRun:
workflow_run = db.session.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).first()

if not workflow_run:
raise Exception(f"Workflow run not found: {workflow_run_id}")
raise WorkflowRunNotFoundError(workflow_run_id)

return workflow_run

Expand All @@ -911,6 +913,6 @@ def _refetch_workflow_node_execution(self, node_execution_id: str) -> WorkflowNo
workflow_node_execution = self._wip_workflow_node_executions.get(node_execution_id)

if not workflow_node_execution:
raise Exception(f"Workflow node execution not found: {node_execution_id}")
raise WorkflowNodeExecutionNotFoundError(node_execution_id)

return workflow_node_execution

0 comments on commit fcfce22

Please sign in to comment.