Skip to content

Commit

Permalink
Merge branch 'feat/workflow-backend' into deploy/dev
Browse files Browse the repository at this point in the history
  • Loading branch information
takatost committed Mar 4, 2024
2 parents 5d2ff41 + e4df029 commit 7d78646
Show file tree
Hide file tree
Showing 17 changed files with 440 additions and 90 deletions.
86 changes: 77 additions & 9 deletions api/controllers/console/app/workflow.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
import json
import logging
from collections.abc import Generator

from flask import Response, stream_with_context
from flask_restful import Resource, marshal_with, reqparse
from werkzeug.exceptions import InternalServerError, NotFound

import services
from controllers.console import api
from controllers.console.app.error import DraftWorkflowNotExist
from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist
from controllers.console.app.wraps import get_app_model
from controllers.console.setup import setup_required
from controllers.console.wraps import account_initialization_required
from core.app.entities.app_invoke_entities import InvokeFrom
from fields.workflow_fields import workflow_fields
from libs.helper import uuid_value
from libs.login import current_user, login_required
from models.model import App, AppMode
from services.workflow_service import WorkflowService

logger = logging.getLogger(__name__)


class DraftWorkflowApi(Resource):
@setup_required
Expand Down Expand Up @@ -59,23 +68,80 @@ def post(self, app_model: App):
}


class AdvancedChatDraftWorkflowRunApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
def post(self, app_model: App):
"""
Run draft workflow
"""
parser = reqparse.RequestParser()
parser.add_argument('inputs', type=dict, required=True, location='json')
parser.add_argument('query', type=str, location='json', default='')
parser.add_argument('files', type=list, required=False, location='json')
parser.add_argument('conversation_id', type=uuid_value, location='json')
args = parser.parse_args()

workflow_service = WorkflowService()
try:
response = workflow_service.run_advanced_chat_draft_workflow(
app_model=app_model,
user=current_user,
args=args,
invoke_from=InvokeFrom.DEBUGGER
)
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
except services.errors.conversation.ConversationCompletedError:
raise ConversationCompletedError()
except ValueError as e:
raise e
except Exception as e:
logging.exception("internal server error.")
raise InternalServerError()

def generate() -> Generator:
yield from response

return Response(stream_with_context(generate()), status=200,
mimetype='text/event-stream')


class DraftWorkflowRunApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@get_app_model(mode=[AppMode.WORKFLOW])
def post(self, app_model: App):
"""
Run draft workflow
"""
# TODO
parser = reqparse.RequestParser()
parser.add_argument('inputs', type=dict, required=True, location='json')
args = parser.parse_args()

workflow_service = WorkflowService()
workflow_service.run_draft_workflow(app_model=app_model, account=current_user)

# TODO
return {
"result": "success"
}
try:
response = workflow_service.run_draft_workflow(
app_model=app_model,
user=current_user,
args=args,
invoke_from=InvokeFrom.DEBUGGER
)
except ValueError as e:
raise e
except Exception as e:
logging.exception("internal server error.")
raise InternalServerError()

def generate() -> Generator:
yield from response

return Response(stream_with_context(generate()), status=200,
mimetype='text/event-stream')


class WorkflowTaskStopApi(Resource):
Expand Down Expand Up @@ -214,10 +280,12 @@ def post(self, app_model: App):


api.add_resource(DraftWorkflowApi, '/apps/<uuid:app_id>/workflows/draft')
api.add_resource(AdvancedChatDraftWorkflowRunApi, '/apps/<uuid:app_id>/advanced-chat/workflows/draft/run')
api.add_resource(DraftWorkflowRunApi, '/apps/<uuid:app_id>/workflows/draft/run')
api.add_resource(WorkflowTaskStopApi, '/apps/<uuid:app_id>/workflows/tasks/<string:task_id>/stop')
api.add_resource(DraftWorkflowNodeRunApi, '/apps/<uuid:app_id>/workflows/draft/nodes/<uuid:node_id>/run')
api.add_resource(PublishedWorkflowApi, '/apps/<uuid:app_id>/workflows/published')
api.add_resource(DefaultBlockConfigsApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs')
api.add_resource(DefaultBlockConfigApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs/:block_type')
api.add_resource(DefaultBlockConfigApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs'
'/<string:block_type>')
api.add_resource(ConvertToWorkflowApi, '/apps/<uuid:app_id>/convert-to-workflow')
4 changes: 2 additions & 2 deletions api/core/agent/cot_agent_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage], usage: LLMUsage):
input=query
)

# recale llm max tokens
self.recale_llm_max_tokens(self.model_config, prompt_messages)
# recalc llm max tokens
self.recalc_llm_max_tokens(self.model_config, prompt_messages)
# invoke model
chunks: Generator[LLMResultChunk, None, None] = model_instance.invoke_llm(
prompt_messages=prompt_messages,
Expand Down
4 changes: 2 additions & 2 deletions api/core/agent/fc_agent_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ def increase_usage(final_llm_usage_dict: dict[str, LLMUsage], usage: LLMUsage):
messages_ids=message_file_ids
)

# recale llm max tokens
self.recale_llm_max_tokens(self.model_config, prompt_messages)
# recalc llm max tokens
self.recalc_llm_max_tokens(self.model_config, prompt_messages)
# invoke model
chunks: Union[Generator[LLMResultChunk, None, None], LLMResult] = model_instance.invoke_llm(
prompt_messages=prompt_messages,
Expand Down
18 changes: 5 additions & 13 deletions api/core/app/apps/advanced_chat/app_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import threading
import uuid
from collections.abc import Generator
from typing import Any, Union
from typing import Union

from flask import Flask, current_app
from pydantic import ValidationError
Expand All @@ -16,25 +16,27 @@
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom
from core.file.message_file_parser import MessageFileParser
from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
from core.workflow.workflow_engine_manager import WorkflowEngineManager
from extensions.ext_database import db
from models.account import Account
from models.model import App, Conversation, EndUser, Message
from models.workflow import Workflow

logger = logging.getLogger(__name__)


class AdvancedChatAppGenerator(MessageBasedAppGenerator):
def generate(self, app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
args: Any,
args: dict,
invoke_from: InvokeFrom,
stream: bool = True) \
-> Union[dict, Generator]:
"""
Generate App response.
:param app_model: App
:param workflow: Workflow
:param user: account or end user
:param args: request args
:param invoke_from: invoke from source
Expand All @@ -59,16 +61,6 @@ def generate(self, app_model: App,
if args.get('conversation_id'):
conversation = self._get_conversation_by_user(app_model, args.get('conversation_id'), user)

# get workflow
workflow_engine_manager = WorkflowEngineManager()
if invoke_from == InvokeFrom.DEBUGGER:
workflow = workflow_engine_manager.get_draft_workflow(app_model=app_model)
else:
workflow = workflow_engine_manager.get_published_workflow(app_model=app_model)

if not workflow:
raise ValueError('Workflow not initialized')

# parse files
files = args['files'] if 'files' in args and args['files'] else []
message_file_parser = MessageFileParser(tenant_id=app_model.tenant_id, app_id=app_model.id)
Expand Down
Loading

0 comments on commit 7d78646

Please sign in to comment.