Skip to content

Commit

Permalink
feat: workflow continue on error (#11474)
Browse files Browse the repository at this point in the history
  • Loading branch information
zxhlyh authored Dec 11, 2024
1 parent 86dfdcb commit bec5451
Show file tree
Hide file tree
Showing 60 changed files with 1,481 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { VarBlockIcon } from '@/app/components/workflow/block-icon'
import { Line3 } from '@/app/components/base/icons/src/public/common'
import { isConversationVar, isENV, isSystemVar } from '@/app/components/workflow/nodes/_base/components/variable/utils'
import Tooltip from '@/app/components/base/tooltip'
import { isExceptionVariable } from '@/app/components/workflow/utils'

type WorkflowVariableBlockComponentProps = {
nodeKey: string
Expand Down Expand Up @@ -53,6 +54,7 @@ const WorkflowVariableBlockComponent = ({
const node = localWorkflowNodesMap![variables[0]]
const isEnv = isENV(variables)
const isChatVar = isConversationVar(variables)
const isException = isExceptionVariable(varName, node?.type)

useEffect(() => {
if (!editor.hasNodes([WorkflowVariableBlockNode]))
Expand Down Expand Up @@ -98,10 +100,10 @@ const WorkflowVariableBlockComponent = ({
</div>
)}
<div className='flex items-center text-primary-600'>
{!isEnv && !isChatVar && <Variable02 className='shrink-0 w-3.5 h-3.5' />}
{!isEnv && !isChatVar && <Variable02 className={cn('shrink-0 w-3.5 h-3.5', isException && 'text-text-warning')} />}
{isEnv && <Env className='shrink-0 w-3.5 h-3.5 text-util-colors-violet-violet-600' />}
{isChatVar && <BubbleX className='w-3.5 h-3.5 text-util-colors-teal-teal-700' />}
<div className={cn('shrink-0 ml-0.5 text-xs font-medium truncate', (isEnv || isChatVar) && 'text-gray-900')} title={varName}>{varName}</div>
<div className={cn('shrink-0 ml-0.5 text-xs font-medium truncate', (isEnv || isChatVar) && 'text-gray-900', isException && 'text-text-warning')} title={varName}>{varName}</div>
{
!node && !isEnv && !isChatVar && (
<RiErrorWarningFill className='ml-0.5 w-3 h-3 text-[#D92D20]' />
Expand Down
53 changes: 53 additions & 0 deletions web/app/components/workflow/custom-edge-linear-gradient-render.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
type CustomEdgeLinearGradientRenderProps = {
id: string
startColor: string
stopColor: string
position: {
x1: number
x2: number
y1: number
y2: number
}
}
const CustomEdgeLinearGradientRender = ({
id,
startColor,
stopColor,
position,
}: CustomEdgeLinearGradientRenderProps) => {
const {
x1,
x2,
y1,
y2,
} = position
return (
<defs>
<linearGradient
id={id}
gradientUnits='userSpaceOnUse'
x1={x1}
y1={y1}
x2={x2}
y2={y2}
>
<stop
offset='0%'
style={{
stopColor: startColor,
stopOpacity: 1,
}}
/>
<stop
offset='100%'
style={{
stopColor,
stopOpacity: 1,
}}
/>
</linearGradient>
</defs>
)
}

export default CustomEdgeLinearGradientRender
57 changes: 56 additions & 1 deletion web/app/components/workflow/custom-edge.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
memo,
useCallback,
useMemo,
useState,
} from 'react'
import { intersection } from 'lodash-es'
Expand All @@ -20,8 +21,12 @@ import type {
Edge,
OnSelectBlock,
} from './types'
import { NodeRunningStatus } from './types'
import { getEdgeColor } from './utils'
import { ITERATION_CHILDREN_Z_INDEX } from './constants'
import CustomEdgeLinearGradientRender from './custom-edge-linear-gradient-render'
import cn from '@/utils/classnames'
import { ErrorHandleTypeEnum } from '@/app/components/workflow/nodes/_base/components/error-handle/types'

const CustomEdge = ({
id,
Expand Down Expand Up @@ -53,6 +58,26 @@ const CustomEdge = ({
const { handleNodeAdd } = useNodesInteractions()
const { availablePrevBlocks } = useAvailableBlocks((data as Edge['data'])!.targetType, (data as Edge['data'])?.isInIteration)
const { availableNextBlocks } = useAvailableBlocks((data as Edge['data'])!.sourceType, (data as Edge['data'])?.isInIteration)
const {
_sourceRunningStatus,
_targetRunningStatus,
} = data

const linearGradientId = useMemo(() => {
if (
(
_sourceRunningStatus === NodeRunningStatus.Succeeded
|| _sourceRunningStatus === NodeRunningStatus.Failed
|| _sourceRunningStatus === NodeRunningStatus.Exception
) && (
_targetRunningStatus === NodeRunningStatus.Succeeded
|| _targetRunningStatus === NodeRunningStatus.Failed
|| _targetRunningStatus === NodeRunningStatus.Exception
|| _targetRunningStatus === NodeRunningStatus.Running
)
)
return id
}, [_sourceRunningStatus, _targetRunningStatus, id])

const handleOpenChange = useCallback((v: boolean) => {
setOpen(v)
Expand All @@ -73,14 +98,43 @@ const CustomEdge = ({
)
}, [handleNodeAdd, source, sourceHandleId, target, targetHandleId])

const stroke = useMemo(() => {
if (selected)
return getEdgeColor(NodeRunningStatus.Running)

if (linearGradientId)
return `url(#${linearGradientId})`

if (data?._connectedNodeIsHovering)
return getEdgeColor(NodeRunningStatus.Running, sourceHandleId === ErrorHandleTypeEnum.failBranch)

return getEdgeColor()
}, [data._connectedNodeIsHovering, linearGradientId, selected, sourceHandleId])

return (
<>
{
linearGradientId && (
<CustomEdgeLinearGradientRender
id={linearGradientId}
startColor={getEdgeColor(_sourceRunningStatus)}
stopColor={getEdgeColor(_targetRunningStatus)}
position={{
x1: sourceX,
y1: sourceY,
x2: targetX,
y2: targetY,
}}
/>
)
}
<BaseEdge
id={id}
path={edgePath}
style={{
stroke: (selected || data?._connectedNodeIsHovering || data?._run) ? '#2970FF' : '#D0D5DD',
stroke,
strokeWidth: 2,
opacity: data._waitingRun ? 0.7 : 1,
}}
/>
<EdgeLabelRenderer>
Expand All @@ -95,6 +149,7 @@ const CustomEdge = ({
position: 'absolute',
transform: `translate(-50%, -50%) translate(${labelX}px, ${labelY}px)`,
pointerEvents: 'all',
opacity: data._waitingRun ? 0.7 : 1,
}}
>
<BlockSelector
Expand Down
34 changes: 20 additions & 14 deletions web/app/components/workflow/hooks/use-edges-interactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,29 @@ export const useEdgesInteractions = () => {
edges,
setEdges,
} = store.getState()
const currentEdgeIndex = edges.findIndex(edge => edge.source === nodeId && edge.sourceHandle === branchId)
const edgeWillBeDeleted = edges.filter(edge => edge.source === nodeId && edge.sourceHandle === branchId)

if (currentEdgeIndex < 0)
if (!edgeWillBeDeleted.length)
return

const currentEdge = edges[currentEdgeIndex]
const newNodes = produce(getNodes(), (draft: Node[]) => {
const sourceNode = draft.find(node => node.id === currentEdge.source)
const targetNode = draft.find(node => node.id === currentEdge.target)

if (sourceNode)
sourceNode.data._connectedSourceHandleIds = sourceNode.data._connectedSourceHandleIds?.filter(handleId => handleId !== currentEdge.sourceHandle)

if (targetNode)
targetNode.data._connectedTargetHandleIds = targetNode.data._connectedTargetHandleIds?.filter(handleId => handleId !== currentEdge.targetHandle)
const nodes = getNodes()
const nodesConnectedSourceOrTargetHandleIdsMap = getNodesConnectedSourceOrTargetHandleIdsMap(
edgeWillBeDeleted.map(edge => ({ type: 'remove', edge })),
nodes,
)
const newNodes = produce(nodes, (draft: Node[]) => {
draft.forEach((node) => {
if (nodesConnectedSourceOrTargetHandleIdsMap[node.id]) {
node.data = {
...node.data,
...nodesConnectedSourceOrTargetHandleIdsMap[node.id],
}
}
})
})
setNodes(newNodes)
const newEdges = produce(edges, (draft) => {
draft.splice(currentEdgeIndex, 1)
return draft.filter(edge => !edgeWillBeDeleted.find(e => e.id === edge.id))
})
setEdges(newEdges)
handleSyncWorkflowDraft()
Expand Down Expand Up @@ -155,7 +159,9 @@ export const useEdgesInteractions = () => {

const newEdges = produce(edges, (draft) => {
draft.forEach((edge) => {
edge.data._run = false
edge.data._sourceRunningStatus = undefined
edge.data._targetRunningStatus = undefined
edge.data._waitingRun = false
})
})
setEdges(newEdges)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,7 @@ export const useNodesInteractions = () => {
const newNodes = produce(nodes, (draft) => {
draft.forEach((node) => {
node.data._runningStatus = undefined
node.data._waitingRun = false
})
})
setNodes(newNodes)
Expand Down
78 changes: 67 additions & 11 deletions web/app/components/workflow/hooks/use-workflow-run.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { useCallback } from 'react'
import {
getIncomers,
useReactFlow,
useStoreApi,
} from 'reactflow'
Expand All @@ -9,8 +8,8 @@ import { v4 as uuidV4 } from 'uuid'
import { usePathname } from 'next/navigation'
import { useWorkflowStore } from '../store'
import { useNodesSyncDraft } from '../hooks'
import type { Node } from '../types'
import {
BlockEnum,
NodeRunningStatus,
WorkflowRunningStatus,
} from '../types'
Expand All @@ -28,6 +27,7 @@ import { AudioPlayerManager } from '@/app/components/base/audio-btn/audio.player
import {
getFilesInLogs,
} from '@/app/components/base/file-uploader/utils'
import { ErrorHandleTypeEnum } from '@/app/components/workflow/nodes/_base/components/error-handle/types'

export const useWorkflowRun = () => {
const store = useStoreApi()
Expand Down Expand Up @@ -174,6 +174,8 @@ export const useWorkflowRun = () => {
setIterParallelLogMap,
} = workflowStore.getState()
const {
getNodes,
setNodes,
edges,
setEdges,
} = store.getState()
Expand All @@ -186,12 +188,20 @@ export const useWorkflowRun = () => {
status: WorkflowRunningStatus.Running,
}
}))

const nodes = getNodes()
const newNodes = produce(nodes, (draft) => {
draft.forEach((node) => {
node.data._waitingRun = true
})
})
setNodes(newNodes)
const newEdges = produce(edges, (draft) => {
draft.forEach((edge) => {
edge.data = {
...edge.data,
_run: false,
_sourceRunningStatus: undefined,
_targetRunningStatus: undefined,
_waitingRun: true,
}
})
})
Expand Down Expand Up @@ -311,13 +321,27 @@ export const useWorkflowRun = () => {
}
const newNodes = produce(nodes, (draft) => {
draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
draft[currentNodeIndex].data._waitingRun = false
})
setNodes(newNodes)
const incomeNodesId = getIncomers({ id: data.node_id } as Node, newNodes, edges).filter(node => node.data._runningStatus === NodeRunningStatus.Succeeded).map(node => node.id)
const newEdges = produce(edges, (draft) => {
draft.forEach((edge) => {
if (edge.target === data.node_id && incomeNodesId.includes(edge.source))
edge.data = { ...edge.data, _run: true } as any
const incomeEdges = draft.filter((edge) => {
return edge.target === data.node_id
})

incomeEdges.forEach((edge) => {
const incomeNode = nodes.find(node => node.id === edge.source)!
if (
(!incomeNode.data._runningBranchId && edge.sourceHandle === 'source')
|| (incomeNode.data._runningBranchId && edge.sourceHandle === incomeNode.data._runningBranchId)
) {
edge.data = {
...edge.data,
_sourceRunningStatus: incomeNode.data._runningStatus,
_targetRunningStatus: NodeRunningStatus.Running,
_waitingRun: false,
}
}
})
})
setEdges(newEdges)
Expand All @@ -336,6 +360,8 @@ export const useWorkflowRun = () => {
const {
getNodes,
setNodes,
edges,
setEdges,
} = store.getState()
const nodes = getNodes()
const nodeParentId = nodes.find(node => node.id === data.node_id)!.parentId
Expand Down Expand Up @@ -423,8 +449,31 @@ export const useWorkflowRun = () => {
const newNodes = produce(nodes, (draft) => {
const currentNode = draft.find(node => node.id === data.node_id)!
currentNode.data._runningStatus = data.status as any
if (data.status === NodeRunningStatus.Exception) {
if (data.execution_metadata.error_strategy === ErrorHandleTypeEnum.failBranch)
currentNode.data._runningBranchId = ErrorHandleTypeEnum.failBranch
}
else {
if (data.node_type === BlockEnum.IfElse)
currentNode.data._runningBranchId = data?.outputs?.selected_case_id

if (data.node_type === BlockEnum.QuestionClassifier)
currentNode.data._runningBranchId = data?.outputs?.class_id
}
})
setNodes(newNodes)
const newEdges = produce(edges, (draft) => {
const incomeEdges = draft.filter((edge) => {
return edge.target === data.node_id
})
incomeEdges.forEach((edge) => {
edge.data = {
...edge.data,
_targetRunningStatus: data.status as any,
}
})
})
setEdges(newEdges)
prevNodeId = data.node_id
}

Expand Down Expand Up @@ -474,13 +523,20 @@ export const useWorkflowRun = () => {
const newNodes = produce(nodes, (draft) => {
draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
draft[currentNodeIndex].data._iterationLength = data.metadata.iterator_length
draft[currentNodeIndex].data._waitingRun = false
})
setNodes(newNodes)
const newEdges = produce(edges, (draft) => {
const edge = draft.find(edge => edge.target === data.node_id && edge.source === prevNodeId)
const incomeEdges = draft.filter(edge => edge.target === data.node_id)

if (edge)
edge.data = { ...edge.data, _run: true } as any
incomeEdges.forEach((edge) => {
edge.data = {
...edge.data,
_sourceRunningStatus: nodes.find(node => node.id === edge.source)!.data._runningStatus,
_targetRunningStatus: NodeRunningStatus.Running,
_waitingRun: false,
}
})
})
setEdges(newEdges)

Expand Down
Loading

0 comments on commit bec5451

Please sign in to comment.