Skip to content

Commit

Permalink
Merge branch 'feat/workflow-backend' into deploy/dev
Browse files Browse the repository at this point in the history
  • Loading branch information
takatost committed Mar 11, 2024
2 parents ef861e0 + 19c9091 commit 5ea7d4c
Show file tree
Hide file tree
Showing 18 changed files with 403 additions and 135 deletions.
2 changes: 1 addition & 1 deletion api/controllers/console/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from . import admin, apikey, extension, feature, setup, version, ping
# Import app controllers
from .app import (advanced_prompt_template, annotation, app, audio, completion, conversation, generator, message,
model_config, site, statistic, workflow, workflow_app_log)
model_config, site, statistic, workflow, workflow_run, workflow_app_log)
# Import auth controllers
from .auth import activate, data_source_oauth, login, oauth
# Import billing controllers
Expand Down
21 changes: 14 additions & 7 deletions api/controllers/console/app/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from controllers.console.wraps import account_initialization_required
from core.app.entities.app_invoke_entities import InvokeFrom
from fields.workflow_fields import workflow_fields
from fields.workflow_run_fields import workflow_run_node_execution_fields
from libs.helper import TimestampField, uuid_value
from libs.login import current_user, login_required
from models.model import App, AppMode
Expand Down Expand Up @@ -164,18 +165,24 @@ class DraftWorkflowNodeRunApi(Resource):
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@marshal_with(workflow_run_node_execution_fields)
def post(self, app_model: App, node_id: str):
"""
Run draft workflow node
"""
# TODO
parser = reqparse.RequestParser()
parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json')
args = parser.parse_args()

workflow_service = WorkflowService()
workflow_service.run_draft_workflow_node(app_model=app_model, node_id=node_id, account=current_user)
workflow_node_execution = workflow_service.run_draft_workflow_node(
app_model=app_model,
node_id=node_id,
user_inputs=args.get('inputs'),
account=current_user
)

# TODO
return {
"result": "success"
}
return workflow_node_execution


class PublishedWorkflowApi(Resource):
Expand Down Expand Up @@ -291,7 +298,7 @@ def generate() -> Generator:
api.add_resource(AdvancedChatDraftWorkflowRunApi, '/apps/<uuid:app_id>/advanced-chat/workflows/draft/run')
api.add_resource(DraftWorkflowRunApi, '/apps/<uuid:app_id>/workflows/draft/run')
api.add_resource(WorkflowTaskStopApi, '/apps/<uuid:app_id>/workflows/tasks/<string:task_id>/stop')
api.add_resource(DraftWorkflowNodeRunApi, '/apps/<uuid:app_id>/workflows/draft/nodes/<uuid:node_id>/run')
api.add_resource(DraftWorkflowNodeRunApi, '/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/run')
api.add_resource(PublishedWorkflowApi, '/apps/<uuid:app_id>/workflows/published')
api.add_resource(DefaultBlockConfigsApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs')
api.add_resource(DefaultBlockConfigApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs'
Expand Down
10 changes: 10 additions & 0 deletions api/core/workflow/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from core.workflow.entities.node_entities import NodeType


class WorkflowNodeRunFailedError(Exception):
def __init__(self, node_id: str, node_type: NodeType, node_title: str, error: str):
self.node_id = node_id
self.node_type = node_type
self.node_title = node_title
self.error = error
super().__init__(f"Node {node_title} run failed: {error}")
4 changes: 2 additions & 2 deletions api/core/workflow/nodes/base_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def publish_text_chunk(self, text: str) -> None:
)

@classmethod
def extract_variable_selector_to_variable_mapping(cls, config: dict) -> dict:
def extract_variable_selector_to_variable_mapping(cls, config: dict) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param config: node config
Expand All @@ -119,7 +119,7 @@ def extract_variable_selector_to_variable_mapping(cls, config: dict) -> dict:

@classmethod
@abstractmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
Expand Down
59 changes: 54 additions & 5 deletions api/core/workflow/nodes/code/code_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,13 @@ def _check_number(self, value: Union[int, float], variable: str) -> Union[int, f
raise ValueError(f'{variable} in input form is out of range.')

if isinstance(value, float):
value = round(value, MAX_PRECISION)
# raise error if precision is too high
if len(str(value).split('.')[1]) > MAX_PRECISION:
raise ValueError(f'{variable} in output form has too high precision.')

return value

def _transform_result(self, result: dict, output_schema: dict[str, CodeNodeData.Output],
def _transform_result(self, result: dict, output_schema: Optional[dict[str, CodeNodeData.Output]],
prefix: str = '',
depth: int = 1) -> dict:
"""
Expand All @@ -170,6 +172,47 @@ def _transform_result(self, result: dict, output_schema: dict[str, CodeNodeData.
raise ValueError("Depth limit reached, object too deep.")

transformed_result = {}
if output_schema is None:
# validate output thought instance type
for output_name, output_value in result.items():
if isinstance(output_value, dict):
self._transform_result(
result=output_value,
output_schema=None,
prefix=f'{prefix}.{output_name}' if prefix else output_name,
depth=depth + 1
)
elif isinstance(output_value, int | float):
self._check_number(
value=output_value,
variable=f'{prefix}.{output_name}' if prefix else output_name
)
elif isinstance(output_value, str):
self._check_string(
value=output_value,
variable=f'{prefix}.{output_name}' if prefix else output_name
)
elif isinstance(output_value, list):
if all(isinstance(value, int | float) for value in output_value):
for value in output_value:
self._check_number(
value=value,
variable=f'{prefix}.{output_name}' if prefix else output_name
)
elif all(isinstance(value, str) for value in output_value):
for value in output_value:
self._check_string(
value=value,
variable=f'{prefix}.{output_name}' if prefix else output_name
)
else:
raise ValueError(f'Output {prefix}.{output_name} is not a valid array. make sure all elements are of the same type.')
else:
raise ValueError(f'Output {prefix}.{output_name} is not a valid type.')

return result

parameters_validated = {}
for output_name, output_config in output_schema.items():
if output_config.type == 'object':
# check if output is object
Expand Down Expand Up @@ -236,17 +279,23 @@ def _transform_result(self, result: dict, output_schema: dict[str, CodeNodeData.
]
else:
raise ValueError(f'Output type {output_config.type} is not supported.')

parameters_validated[output_name] = True

# check if all output parameters are validated
if len(parameters_validated) != len(result):
raise ValueError('Not all output parameters are validated.')

return transformed_result

@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: CodeNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: CodeNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
:return:
"""

return {
variable_selector.value_selector: variable_selector.variable for variable_selector in node_data.variables
}
variable_selector.variable: variable_selector.value_selector for variable_selector in node_data.variables
}
4 changes: 2 additions & 2 deletions api/core/workflow/nodes/code/entities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Literal, Union
from typing import Literal, Optional

from pydantic import BaseModel

Expand All @@ -12,7 +12,7 @@ class CodeNodeData(BaseNodeData):
"""
class Output(BaseModel):
type: Literal['string', 'number', 'object', 'array[string]', 'array[number]']
children: Union[None, dict[str, 'Output']]
children: Optional[dict[str, 'CodeNodeData.Output']]

variables: list[VariableSelector]
answer: str
Expand Down
10 changes: 8 additions & 2 deletions api/core/workflow/nodes/direct_answer/direct_answer_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,16 @@ def _run(self, variable_pool: VariablePool) -> NodeRunResult:
)

@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
:return:
"""
return {}
node_data = cast(cls._node_data_cls, node_data)

variable_mapping = {}
for variable_selector in node_data.variables:
variable_mapping[variable_selector.variable] = variable_selector.value_selector

return variable_mapping
2 changes: 1 addition & 1 deletion api/core/workflow/nodes/end/end_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _run(self, variable_pool: VariablePool) -> NodeRunResult:
)

@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
Expand Down
6 changes: 3 additions & 3 deletions api/core/workflow/nodes/http_request/http_request_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ def _run(self, variable_pool: VariablePool) -> NodeRunResult:


@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: HttpRequestNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: HttpRequestNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
:return:
"""
return {
variable_selector.value_selector: variable_selector.variable for variable_selector in node_data.variables
}
variable_selector.variable: variable_selector.value_selector for variable_selector in node_data.variables
}
2 changes: 1 addition & 1 deletion api/core/workflow/nodes/llm/llm_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def _run(self, variable_pool: VariablePool) -> NodeRunResult:
pass

@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
Expand Down
2 changes: 1 addition & 1 deletion api/core/workflow/nodes/start/start_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def _get_cleaned_inputs(self, variables: list[VariableEntity], user_inputs: dict
return filtered_inputs

@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ def _run(self, variable_pool: VariablePool) -> NodeRunResult:
)

@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: TemplateTransformNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: TemplateTransformNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
:param node_data: node data
:return:
"""
return {
variable_selector.value_selector: variable_selector.variable for variable_selector in node_data.variables
variable_selector.variable: variable_selector.value_selector for variable_selector in node_data.variables
}
10 changes: 7 additions & 3 deletions api/core/workflow/nodes/tool/tool_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _run(self, variable_pool: VariablePool) -> NodeRunResult:

try:
# TODO: user_id
messages = tool_runtime.invoke(None, parameters)
messages = tool_runtime.invoke(self.user_id, parameters)
except Exception as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
Expand Down Expand Up @@ -133,8 +133,12 @@ def _extract_tool_response_text(self, tool_response: list[ToolInvokeMessage]) ->


@classmethod
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[list[str], str]:
def _extract_variable_selector_to_variable_mapping(cls, node_data: BaseNodeData) -> dict[str, list[str]]:
"""
Extract variable selector to variable mapping
"""
pass
return {
k.variable: k.value_selector
for k in cast(ToolNodeData, node_data).tool_parameters
if k.variable_type == 'selector'
}
88 changes: 88 additions & 0 deletions api/core/workflow/workflow_engine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult, NodeType
from core.workflow.entities.variable_pool import VariablePool, VariableValue
from core.workflow.entities.workflow_entities import WorkflowNodeAndResult, WorkflowRunState
from core.workflow.errors import WorkflowNodeRunFailedError
from core.workflow.nodes.base_node import BaseNode, UserFrom
from core.workflow.nodes.code.code_node import CodeNode
from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode
Expand Down Expand Up @@ -180,6 +181,93 @@ def run_workflow(self, workflow: Workflow,
callbacks=callbacks
)

def single_step_run_workflow_node(self, workflow: Workflow,
node_id: str,
user_id: str,
user_inputs: dict) -> tuple[BaseNode, NodeRunResult]:
"""
Single step run workflow node
:param workflow: Workflow instance
:param node_id: node id
:param user_id: user id
:param user_inputs: user inputs
:return:
"""
# fetch node info from workflow graph
graph = workflow.graph_dict
if not graph:
raise ValueError('workflow graph not found')

nodes = graph.get('nodes')
if not nodes:
raise ValueError('nodes not found in workflow graph')

# fetch node config from node id
node_config = None
for node in nodes:
if node.get('id') == node_id:
node_config = node
break

if not node_config:
raise ValueError('node id not found in workflow graph')

# Get node class
node_cls = node_classes.get(NodeType.value_of(node_config.get('data', {}).get('type')))

# init workflow run state
node_instance = node_cls(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
workflow_id=workflow.id,
user_id=user_id,
user_from=UserFrom.ACCOUNT,
config=node_config
)

try:
# init variable pool
variable_pool = VariablePool(
system_variables={},
user_inputs={}
)

# variable selector to variable mapping
try:
variable_mapping = node_cls.extract_variable_selector_to_variable_mapping(node_config)
except NotImplementedError:
variable_mapping = {}

for variable_key, variable_selector in variable_mapping.items():
if variable_key not in user_inputs:
raise ValueError(f'Variable key {variable_key} not found in user inputs.')

# fetch variable node id from variable selector
variable_node_id = variable_selector[0]
variable_key_list = variable_selector[1:]

# append variable and value to variable pool
variable_pool.append_variable(
node_id=variable_node_id,
variable_key_list=variable_key_list,
value=user_inputs.get(variable_key)
)

# run node
node_run_result = node_instance.run(
variable_pool=variable_pool
)
except Exception as e:
raise WorkflowNodeRunFailedError(
node_id=node_instance.node_id,
node_type=node_instance.node_type,
node_title=node_instance.node_data.title,
error=str(e)
)

return node_instance, node_run_result


def _workflow_run_success(self, callbacks: list[BaseWorkflowCallback] = None) -> None:
"""
Workflow run success
Expand Down
Loading

0 comments on commit 5ea7d4c

Please sign in to comment.