From 440703481db14b27ea7620e26cc115c18d3f94ad Mon Sep 17 00:00:00 2001 From: yihong0618 Date: Thu, 19 Dec 2024 17:03:58 +0800 Subject: [PATCH 1/2] fix: droping self._remove_unreachable_nodes(event) to aviod if else short cut for answer node Signed-off-by: yihong0618 --- api/core/workflow/nodes/answer/answer_stream_processor.py | 6 +++++- .../core/workflow/graph_engine/test_graph_engine.py | 8 +++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/api/core/workflow/nodes/answer/answer_stream_processor.py b/api/core/workflow/nodes/answer/answer_stream_processor.py index d94f0590584842..e1bff3de488f4c 100644 --- a/api/core/workflow/nodes/answer/answer_stream_processor.py +++ b/api/core/workflow/nodes/answer/answer_stream_processor.py @@ -61,7 +61,11 @@ def process(self, generator: Generator[GraphEngineEvent, None, None]) -> Generat del self.current_stream_chunk_generating_node_ids[event.route_node_state.node_id] # remove unreachable nodes - self._remove_unreachable_nodes(event) + # FIXME: because of the code branch can combine directly, so for answer node + # we remove the node maybe shortcut the answer node, so comment this code for now + # there is not effect on the answer node and the workflow, when we have a better solution + # we can open this code. Issues: #11542 #9560 #10638 #10564 + # self._remove_unreachable_nodes(event) # generate stream outputs yield from self._generate_stream_outputs_when_node_finished(event) diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py index 9f1ba7b6af9c80..29aa64d80145a5 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py @@ -488,16 +488,14 @@ def test_run_branch(mock_close, mock_remove): items = [] generator = graph_engine.run() for item in generator: - # print(type(item), item) items.append(item) assert len(items) == 10 assert items[3].route_node_state.node_id == "if-else-1" assert items[4].route_node_state.node_id == "if-else-1" - assert isinstance(items[5], NodeRunStreamChunkEvent) - assert items[5].chunk_content == "1 " - assert isinstance(items[6], NodeRunStreamChunkEvent) - assert items[6].chunk_content == "takato" + assert isinstance(items[8], NodeRunStreamChunkEvent) + # assert items[6].chunk_content == "takato" + assert items[8].chunk_content == "takato" assert items[7].route_node_state.node_id == "answer-1" assert items[8].route_node_state.node_id == "answer-1" assert items[8].route_node_state.node_run_result.outputs["answer"] == "1 takato" From c1cc63dbbf3fa3b264845ec5371e703a70f61bc6 Mon Sep 17 00:00:00 2001 From: yihong0618 Date: Fri, 20 Dec 2024 13:48:29 +0800 Subject: [PATCH 2/2] fix: last answer not stream Signed-off-by: yihong0618 --- api/core/workflow/nodes/answer/answer_stream_processor.py | 7 +------ api/core/workflow/nodes/answer/base_stream_processor.py | 8 +++++++- .../core/workflow/graph_engine/test_graph_engine.py | 6 +++--- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/api/core/workflow/nodes/answer/answer_stream_processor.py b/api/core/workflow/nodes/answer/answer_stream_processor.py index e1bff3de488f4c..ed033e7f283961 100644 --- a/api/core/workflow/nodes/answer/answer_stream_processor.py +++ b/api/core/workflow/nodes/answer/answer_stream_processor.py @@ -60,12 +60,7 @@ def process(self, generator: Generator[GraphEngineEvent, None, None]) -> Generat del self.current_stream_chunk_generating_node_ids[event.route_node_state.node_id] - # remove unreachable nodes - # FIXME: because of the code branch can combine directly, so for answer node - # we remove the node maybe shortcut the answer node, so comment this code for now - # there is not effect on the answer node and the workflow, when we have a better solution - # we can open this code. Issues: #11542 #9560 #10638 #10564 - # self._remove_unreachable_nodes(event) + self._remove_unreachable_nodes(event) # generate stream outputs yield from self._generate_stream_outputs_when_node_finished(event) diff --git a/api/core/workflow/nodes/answer/base_stream_processor.py b/api/core/workflow/nodes/answer/base_stream_processor.py index 36c3fe180a9cb2..01138802efed0b 100644 --- a/api/core/workflow/nodes/answer/base_stream_processor.py +++ b/api/core/workflow/nodes/answer/base_stream_processor.py @@ -37,7 +37,13 @@ def _remove_unreachable_nodes(self, event: NodeRunSucceededEvent) -> None: and edge.run_condition.branch_identify and run_result.edge_source_handle == edge.run_condition.branch_identify ): - reachable_node_ids.extend(self._fetch_node_ids_in_reachable_branch(edge.target_node_id)) + # remove unreachable nodes + # FIXME: because of the code branch can combine directly, so for answer node + # we remove the node maybe shortcut the answer node, so comment this code for now + # there is not effect on the answer node and the workflow, when we have a better solution + # we can open this code. Issues: #11542 #9560 #10638 #10564 + + # reachable_node_ids.extend(self._fetch_node_ids_in_reachable_branch(edge.target_node_id)) continue else: unreachable_first_node_ids.append(edge.target_node_id) diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py index 29aa64d80145a5..b7d8f69e8c52ee 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py @@ -493,9 +493,9 @@ def test_run_branch(mock_close, mock_remove): assert len(items) == 10 assert items[3].route_node_state.node_id == "if-else-1" assert items[4].route_node_state.node_id == "if-else-1" - assert isinstance(items[8], NodeRunStreamChunkEvent) - # assert items[6].chunk_content == "takato" - assert items[8].chunk_content == "takato" + assert isinstance(items[5], NodeRunStreamChunkEvent) + assert isinstance(items[6], NodeRunStreamChunkEvent) + assert items[6].chunk_content == "takato" assert items[7].route_node_state.node_id == "answer-1" assert items[8].route_node_state.node_id == "answer-1" assert items[8].route_node_state.node_run_result.outputs["answer"] == "1 takato"