Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Fix WorkflowRepairService and AsyncSystemTaskExecutor race condition (#…
Browse files Browse the repository at this point in the history
…3836)

Issue Summary:
There's a race condition in the system involving async system tasks and the WorkflowRepairService. For example, when a SUB_WORKFLOW task starts, the WorkflowRepairService sometimes erroneously reinserts the task into the processing queue because it perceives the task as out-of-sync between the ExecutorDAO and the queueDAO. This issue stems from the AsyncSystemTaskExecutor updating a task's status only after it removes it from the queue, creating a window where the WorkflowRepairService can wrongly assess the task state. This leads to duplicate subworkflows/http/… tasks being executed concurrently, which complicates maintaining idempotency of Tasks.

Proposed Solution:
To resolve the issue, it's suggested that the AsyncSystemTaskExecutor should update the status of tasks before removing them from the queue. This should close the window where the WorkflowRepairService can misidentify the task state and prevent unnecessary re-queuing of tasks. An edge case we’ve considered is if the process crashes after the task is updated but before it's removed from the queue. If that happens, the executor will simply remove the task from the queue the next time it runs, thereby not affecting system correctness.

Co-authored-by: Jaim Silva <[email protected]>
  • Loading branch information
VerstraeteBert and silvavelosa authored Nov 12, 2023
1 parent be790a4 commit de2ca10
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public void execute(WorkflowSystemTask systemTask, String taskId) {
}

boolean hasTaskExecutionCompleted = false;
boolean shouldRemoveTaskFromQueue = false;
String workflowId = task.getWorkflowInstanceId();
// if we are here the Task object is updated and needs to be persisted regardless of an
// exception
Expand All @@ -130,7 +131,7 @@ public void execute(WorkflowSystemTask systemTask, String taskId) {
String.format(
"Workflow is in %s state", workflow.getStatus().toString()));
}
queueDAO.remove(queueName, task.getTaskId());
shouldRemoveTaskFromQueue = true;
return;
}

Expand All @@ -156,13 +157,12 @@ public void execute(WorkflowSystemTask systemTask, String taskId) {
// Update message in Task queue based on Task status
// Remove asyncComplete system tasks from the queue that are not in SCHEDULED state
if (isTaskAsyncComplete && task.getStatus() != TaskModel.Status.SCHEDULED) {
queueDAO.remove(queueName, task.getTaskId());
shouldRemoveTaskFromQueue = true;
hasTaskExecutionCompleted = true;
} else if (task.getStatus().isTerminal()) {
task.setEndTime(System.currentTimeMillis());
queueDAO.remove(queueName, task.getTaskId());
shouldRemoveTaskFromQueue = true;
hasTaskExecutionCompleted = true;
LOGGER.debug("{} removed from queue: {}", task, queueName);
} else {
task.setCallbackAfterSeconds(systemTaskCallbackTime);
systemTask
Expand All @@ -188,6 +188,10 @@ public void execute(WorkflowSystemTask systemTask, String taskId) {
LOGGER.error("Error executing system task - {}, with id: {}", systemTask, taskId, e);
} finally {
executionDAOFacade.updateTask(task);
if (shouldRemoveTaskFromQueue) {
queueDAO.remove(queueName, task.getTaskId());
LOGGER.debug("{} removed from queue: {}", task, queueName);
}
// if the current task execution has completed, then the workflow needs to be evaluated
if (hasTaskExecutionCompleted) {
workflowExecutor.decide(workflowId);
Expand Down

0 comments on commit de2ca10

Please sign in to comment.