Skip to content

Commit

Permalink
Merge branch 'fix/remove-the-retry-index-field' into deploy/dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Novice Lee authored and Novice Lee committed Dec 23, 2024
2 parents 3090215 + 6802329 commit 4c3ca07
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 83 deletions.
30 changes: 16 additions & 14 deletions api/core/app/apps/advanced_chat/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]
else:
continue

raise Exception("Queue listening stopped unexpectedly.")
raise ValueError("queue listening stopped unexpectedly.")

def _to_stream_response(
self, generator: Generator[StreamResponse, None, None]
Expand Down Expand Up @@ -295,6 +295,8 @@ def _process_stream_response(
event,
QueueNodeRetryEvent,
):
if not workflow_run:
raise ValueError("workflow run not initialized.")
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)
Expand All @@ -309,7 +311,7 @@ def _process_stream_response(
yield response
elif isinstance(event, QueueNodeStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)

Expand Down Expand Up @@ -350,45 +352,45 @@ def _process_stream_response(

elif isinstance(event, QueueParallelBranchRunStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_parallel_branch_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_parallel_branch_finished_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationStartEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationNextEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_next_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationCompletedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueWorkflowSucceededEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("workflow run not initialized.")

workflow_run = self._handle_workflow_run_success(
workflow_run=workflow_run,
Expand All @@ -407,10 +409,10 @@ def _process_stream_response(
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")

workflow_run = self._handle_workflow_run_partial_success(
workflow_run=workflow_run,
Expand All @@ -430,10 +432,10 @@ def _process_stream_response(
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
elif isinstance(event, QueueWorkflowFailedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")

workflow_run = self._handle_workflow_run_failed(
workflow_run=workflow_run,
Expand Down Expand Up @@ -523,7 +525,7 @@ def _process_stream_response(
yield self._message_replace_to_stream_response(answer=event.text)
elif isinstance(event, QueueAdvancedChatMessageEndEvent):
if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")

output_moderation_answer = self._handle_output_moderation_when_task_finished(self._task_state.answer)
if output_moderation_answer:
Expand Down
30 changes: 16 additions & 14 deletions api/core/app/apps/workflow/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]
else:
continue

raise Exception("Queue listening stopped unexpectedly.")
raise ValueError("queue listening stopped unexpectedly.")

def _to_stream_response(
self, generator: Generator[StreamResponse, None, None]
Expand Down Expand Up @@ -218,7 +218,7 @@ def _wrapper_process_stream_response(
break
else:
yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id)
except Exception as e:
except Exception:
logger.exception(f"Fails to get audio trunk, task_id: {task_id}")
break
if tts_publisher:
Expand Down Expand Up @@ -258,6 +258,8 @@ def _process_stream_response(
event,
QueueNodeRetryEvent,
):
if not workflow_run:
raise ValueError("workflow run not initialized.")
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)
Expand All @@ -272,7 +274,7 @@ def _process_stream_response(
yield response
elif isinstance(event, QueueNodeStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)

Expand Down Expand Up @@ -308,45 +310,45 @@ def _process_stream_response(

elif isinstance(event, QueueParallelBranchRunStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_parallel_branch_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_parallel_branch_finished_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationStartEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationNextEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_next_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationCompletedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueWorkflowSucceededEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")

workflow_run = self._handle_workflow_run_success(
workflow_run=workflow_run,
Expand All @@ -366,10 +368,10 @@ def _process_stream_response(
)
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")

workflow_run = self._handle_workflow_run_partial_success(
workflow_run=workflow_run,
Expand All @@ -390,10 +392,10 @@ def _process_stream_response(
)
elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")
workflow_run = self._handle_workflow_run_failed(
workflow_run=workflow_run,
start_at=graph_runtime_state.start_at,
Expand Down
56 changes: 31 additions & 25 deletions api/core/app/apps/workflow_app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,17 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent)
elif isinstance(event, GraphRunFailedEvent):
self._publish_event(QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count))
elif isinstance(event, NodeRunRetryEvent):
node_run_result = event.route_node_state.node_run_result
if node_run_result:
inputs = node_run_result.inputs
process_data = node_run_result.process_data
outputs = node_run_result.outputs
execution_metadata = node_run_result.metadata
else:
inputs = {}
process_data = {}
outputs = {}
execution_metadata = {}
self._publish_event(
QueueNodeRetryEvent(
node_execution_id=event.id,
Expand All @@ -200,23 +211,15 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent)
parent_parallel_id=event.parent_parallel_id,
parent_parallel_start_node_id=event.parent_parallel_start_node_id,
start_at=event.start_at,
node_run_index=event.node_run_index,
node_run_index=event.route_node_state.index,
predecessor_node_id=event.predecessor_node_id,
in_iteration_id=event.in_iteration_id,
parallel_mode_run_id=event.parallel_mode_run_id,
inputs=event.route_node_state.node_run_result.inputs
if event.route_node_state.node_run_result
else {},
process_data=event.route_node_state.node_run_result.process_data
if event.route_node_state.node_run_result
else {},
outputs=event.route_node_state.node_run_result.outputs
if event.route_node_state.node_run_result
else {},
inputs=inputs,
process_data=process_data,
outputs=outputs,
error=event.error,
execution_metadata=event.route_node_state.node_run_result.metadata
if event.route_node_state.node_run_result
else {},
execution_metadata=execution_metadata,
retry_index=event.retry_index,
)
)
Expand All @@ -239,6 +242,17 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent)
)
)
elif isinstance(event, NodeRunSucceededEvent):
node_run_result = event.route_node_state.node_run_result
if node_run_result:
inputs = node_run_result.inputs
process_data = node_run_result.process_data
outputs = node_run_result.outputs
execution_metadata = node_run_result.metadata
else:
inputs = {}
process_data = {}
outputs = {}
execution_metadata = {}
self._publish_event(
QueueNodeSucceededEvent(
node_execution_id=event.id,
Expand All @@ -250,18 +264,10 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent)
parent_parallel_id=event.parent_parallel_id,
parent_parallel_start_node_id=event.parent_parallel_start_node_id,
start_at=event.route_node_state.start_at,
inputs=event.route_node_state.node_run_result.inputs
if event.route_node_state.node_run_result
else {},
process_data=event.route_node_state.node_run_result.process_data
if event.route_node_state.node_run_result
else {},
outputs=event.route_node_state.node_run_result.outputs
if event.route_node_state.node_run_result
else {},
execution_metadata=event.route_node_state.node_run_result.metadata
if event.route_node_state.node_run_result
else {},
inputs=inputs,
process_data=process_data,
outputs=outputs,
execution_metadata=execution_metadata,
in_iteration_id=event.in_iteration_id,
)
)
Expand Down
Loading

0 comments on commit 4c3ca07

Please sign in to comment.