Skip to content

Commit

Permalink
Merge branch 'feat/workflow-backend' into deploy/dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Yeuoly committed Mar 14, 2024
2 parents 0a12ddc + 3c35717 commit 8823091
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 170 deletions.
31 changes: 1 addition & 30 deletions api/core/app/apps/workflow/workflow_event_trigger_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
QueueNodeFailedEvent,
QueueNodeStartedEvent,
QueueNodeSucceededEvent,
QueueTextChunkEvent,
QueueWorkflowFailedEvent,
QueueWorkflowStartedEvent,
QueueWorkflowSucceededEvent,
Expand All @@ -20,7 +19,6 @@ class WorkflowEventTriggerCallback(BaseWorkflowCallback):

def __init__(self, queue_manager: AppQueueManager, workflow: Workflow):
self._queue_manager = queue_manager
self._streamable_node_ids = self._fetch_streamable_node_ids(workflow.graph_dict)

def on_workflow_run_started(self) -> None:
"""
Expand Down Expand Up @@ -118,31 +116,4 @@ def on_node_text_chunk(self, node_id: str, text: str) -> None:
"""
Publish text chunk
"""
if node_id in self._streamable_node_ids:
self._queue_manager.publish(
QueueTextChunkEvent(
text=text
), PublishFrom.APPLICATION_MANAGER
)

def _fetch_streamable_node_ids(self, graph: dict) -> list[str]:
"""
Fetch streamable node ids
When the Workflow type is chat, only the nodes before END Node are LLM or Direct Answer can be streamed output
When the Workflow type is workflow, only the nodes before END Node (only Plain Text mode) are LLM can be streamed output
:param graph: workflow graph
:return:
"""
streamable_node_ids = []
end_node_ids = []
for node_config in graph.get('nodes'):
if node_config.get('data', {}).get('type') == NodeType.END.value:
if node_config.get('data', {}).get('outputs', {}).get('type', '') == 'plain-text':
end_node_ids.append(node_config.get('id'))

for edge_config in graph.get('edges'):
if edge_config.get('target') in end_node_ids:
streamable_node_ids.append(edge_config.get('source'))

return streamable_node_ids
pass
50 changes: 44 additions & 6 deletions api/core/workflow/nodes/answer/answer_node.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import time
from typing import cast

from core.prompt.utils.prompt_template_parser import PromptTemplateParser
Expand Down Expand Up @@ -32,14 +31,49 @@ def _run(self, variable_pool: VariablePool) -> NodeRunResult:

variable_values[variable_selector.variable] = value

variable_keys = list(variable_values.keys())

# format answer template
template_parser = PromptTemplateParser(node_data.answer)
answer = template_parser.format(variable_values)
template_variable_keys = template_parser.variable_keys

# Take the intersection of variable_keys and template_variable_keys
variable_keys = list(set(variable_keys) & set(template_variable_keys))

template = node_data.answer
for var in variable_keys:
template = template.replace(f'{{{{{var}}}}}', f'Ω{{{{{var}}}}}Ω')

split_template = [
{
"type": "var" if self._is_variable(part, variable_keys) else "text",
"value": part.replace('Ω', '') if self._is_variable(part, variable_keys) else part
}
for part in template.split('Ω') if part
]

answer = []
for part in split_template:
if part["type"] == "var":
value = variable_values.get(part["value"].replace('{{', '').replace('}}', ''))
answer_part = {
"type": "text",
"text": value
}
# TODO File
else:
answer_part = {
"type": "text",
"text": part["value"]
}

# publish answer as stream
for word in answer:
self.publish_text_chunk(word)
time.sleep(10) # TODO for debug
if len(answer) > 0 and answer[-1]["type"] == "text" and answer_part["type"] == "text":
answer[-1]["text"] += answer_part["text"]
else:
answer.append(answer_part)

if len(answer) == 1 and answer[0]["type"] == "text":
answer = answer[0]["text"]

return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
Expand All @@ -49,6 +83,10 @@ def _run(self, variable_pool: VariablePool) -> NodeRunResult:
}
)

def _is_variable(self, part, variable_keys):
cleaned_part = part.replace('{{', '').replace('}}', '')
return part.startswith('{{') and cleaned_part in variable_keys

@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Expand Down
14 changes: 3 additions & 11 deletions api/core/workflow/nodes/base_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from core.workflow.entities.base_node_data_entities import BaseNodeData
from core.workflow.entities.node_entities import NodeRunResult, NodeType
from core.workflow.entities.variable_pool import VariablePool
from models.workflow import WorkflowNodeExecutionStatus


class UserFrom(Enum):
Expand Down Expand Up @@ -80,16 +79,9 @@ def run(self, variable_pool: VariablePool) -> NodeRunResult:
:param variable_pool: variable pool
:return:
"""
try:
result = self._run(
variable_pool=variable_pool
)
except Exception as e:
# process unhandled exception
result = NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
error=str(e)
)
result = self._run(
variable_pool=variable_pool
)

self.node_run_result = result
return result
Expand Down
38 changes: 9 additions & 29 deletions api/core/workflow/nodes/end/end_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

from core.workflow.entities.base_node_data_entities import BaseNodeData
from core.workflow.entities.node_entities import NodeRunResult, NodeType
from core.workflow.entities.variable_pool import ValueType, VariablePool
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.nodes.base_node import BaseNode
from core.workflow.nodes.end.entities import EndNodeData, EndNodeDataOutputs
from core.workflow.nodes.end.entities import EndNodeData
from models.workflow import WorkflowNodeExecutionStatus


Expand All @@ -20,34 +20,14 @@ def _run(self, variable_pool: VariablePool) -> NodeRunResult:
"""
node_data = self.node_data
node_data = cast(self._node_data_cls, node_data)
outputs_config = node_data.outputs
output_variables = node_data.outputs

outputs = None
if outputs_config:
if outputs_config.type == EndNodeDataOutputs.OutputType.PLAIN_TEXT:
plain_text_selector = outputs_config.plain_text_selector
if plain_text_selector:
outputs = {
'text': variable_pool.get_variable_value(
variable_selector=plain_text_selector,
target_value_type=ValueType.STRING
)
}
else:
outputs = {
'text': ''
}
elif outputs_config.type == EndNodeDataOutputs.OutputType.STRUCTURED:
structured_variables = outputs_config.structured_variables
if structured_variables:
outputs = {}
for variable_selector in structured_variables:
variable_value = variable_pool.get_variable_value(
variable_selector=variable_selector.value_selector
)
outputs[variable_selector.variable] = variable_value
else:
outputs = {}
outputs = {}
for variable_selector in output_variables:
variable_value = variable_pool.get_variable_value(
variable_selector=variable_selector.value_selector
)
outputs[variable_selector.variable] = variable_value

return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
Expand Down
61 changes: 1 addition & 60 deletions api/core/workflow/nodes/end/entities.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,9 @@
from enum import Enum
from typing import Optional

from pydantic import BaseModel

from core.workflow.entities.base_node_data_entities import BaseNodeData
from core.workflow.entities.variable_entities import VariableSelector


class EndNodeOutputType(Enum):
"""
END Node Output Types.
none, plain-text, structured
"""
NONE = 'none'
PLAIN_TEXT = 'plain-text'
STRUCTURED = 'structured'

@classmethod
def value_of(cls, value: str) -> 'OutputType':
"""
Get value of given output type.
:param value: output type value
:return: output type
"""
for output_type in cls:
if output_type.value == value:
return output_type
raise ValueError(f'invalid output type value {value}')


class EndNodeDataOutputs(BaseModel):
"""
END Node Data Outputs.
"""
class OutputType(Enum):
"""
Output Types.
"""
NONE = 'none'
PLAIN_TEXT = 'plain-text'
STRUCTURED = 'structured'

@classmethod
def value_of(cls, value: str) -> 'OutputType':
"""
Get value of given output type.
:param value: output type value
:return: output type
"""
for output_type in cls:
if output_type.value == value:
return output_type
raise ValueError(f'invalid output type value {value}')

type: OutputType = OutputType.NONE
plain_text_selector: Optional[list[str]] = None
structured_variables: Optional[list[VariableSelector]] = None


class EndNodeData(BaseNodeData):
"""
END Node Data.
"""
outputs: Optional[EndNodeDataOutputs] = None
outputs: list[VariableSelector]
2 changes: 1 addition & 1 deletion api/core/workflow/nodes/http_request/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def check_config(cls, v, values):
return v

class Body(BaseModel):
type: Literal[None, 'form-data', 'x-www-form-urlencoded', 'raw', 'json']
type: Literal['none', 'form-data', 'x-www-form-urlencoded', 'raw', 'json']
data: Union[None, str]

variables: list[VariableSelector]
Expand Down
6 changes: 3 additions & 3 deletions api/core/workflow/nodes/http_request/http_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ def _init_template(self, node_data: HttpRequestNodeData, variables: dict[str, An
self.headers['Content-Type'] = 'application/json'
elif node_data.body.type == 'x-www-form-urlencoded':
self.headers['Content-Type'] = 'application/x-www-form-urlencoded'
# elif node_data.body.type == 'form-data':
# self.headers['Content-Type'] = 'multipart/form-data'

if node_data.body.type in ['form-data', 'x-www-form-urlencoded']:
body = {}
Expand All @@ -152,8 +150,10 @@ def _init_template(self, node_data: HttpRequestNodeData, variables: dict[str, An
}
else:
self.body = urlencode(body)
else:
elif node_data.body.type in ['json', 'raw']:
self.body = original_body
elif node_data.body.type == 'none':
self.body = ''

def _assembling_headers(self) -> dict[str, Any]:
authorization = deepcopy(self.authorization)
Expand Down
2 changes: 1 addition & 1 deletion api/core/workflow/nodes/http_request/http_request_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _run(self, variable_pool: VariablePool) -> NodeRunResult:
inputs=variables,
outputs={
'status_code': response.status_code,
'body': response,
'body': response.body,
'headers': response.headers
},
process_data={
Expand Down
4 changes: 4 additions & 0 deletions api/core/workflow/workflow_engine_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import time
from typing import Optional

Expand Down Expand Up @@ -41,6 +42,8 @@
NodeType.VARIABLE_ASSIGNER: VariableAssignerNode,
}

logger = logging.getLogger(__name__)


class WorkflowEngineManager:
def get_default_configs(self) -> list[dict]:
Expand Down Expand Up @@ -407,6 +410,7 @@ def _run_workflow_node(self, workflow_run_state: WorkflowRunState,
variable_pool=workflow_run_state.variable_pool
)
except Exception as e:
logger.exception(f"Node {node.node_data.title} run failed: {str(e)}")
node_run_result = NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
error=str(e)
Expand Down
Loading

0 comments on commit 8823091

Please sign in to comment.