Skip to content

Commit

Permalink
Merge branch 'feat/workflow-backend' into deploy/dev
Browse files Browse the repository at this point in the history
  • Loading branch information
takatost committed Mar 15, 2024
2 parents b18fca4 + 62846be commit fa010ca
Show file tree
Hide file tree
Showing 36 changed files with 2,671 additions and 2,107 deletions.
26 changes: 6 additions & 20 deletions api/controllers/console/app/completion.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import json
import logging
from collections.abc import Generator
from typing import Union

import flask_login
from flask import Response, stream_with_context
from flask_restful import Resource, reqparse
from werkzeug.exceptions import InternalServerError, NotFound

Expand All @@ -25,10 +21,11 @@
from core.app.entities.app_invoke_entities import InvokeFrom
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.helper import uuid_value
from libs.login import login_required
from models.model import AppMode
from services.completion_service import CompletionService
from services.app_generate_service import AppGenerateService


# define completion message api for user
Expand All @@ -54,15 +51,15 @@ def post(self, app_model):
account = flask_login.current_user

try:
response = CompletionService.completion(
response = AppGenerateService.generate(
app_model=app_model,
user=account,
args=args,
invoke_from=InvokeFrom.DEBUGGER,
streaming=streaming
)

return compact_response(response)
return helper.compact_generate_response(response)
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
except services.errors.conversation.ConversationCompletedError:
Expand Down Expand Up @@ -120,15 +117,15 @@ def post(self, app_model):
account = flask_login.current_user

try:
response = CompletionService.completion(
response = AppGenerateService.generate(
app_model=app_model,
user=account,
args=args,
invoke_from=InvokeFrom.DEBUGGER,
streaming=streaming
)

return compact_response(response)
return helper.compact_generate_response(response)
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
except services.errors.conversation.ConversationCompletedError:
Expand All @@ -151,17 +148,6 @@ def post(self, app_model):
raise InternalServerError()


def compact_response(response: Union[dict, Generator]) -> Response:
if isinstance(response, dict):
return Response(response=json.dumps(response), status=200, mimetype='application/json')
else:
def generate() -> Generator:
yield from response

return Response(stream_with_context(generate()), status=200,
mimetype='text/event-stream')


class ChatMessageStopApi(Resource):
@setup_required
@login_required
Expand Down
15 changes: 0 additions & 15 deletions api/controllers/console/app/message.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import json
import logging
from collections.abc import Generator
from typing import Union

from flask import Response, stream_with_context
from flask_login import current_user
from flask_restful import Resource, fields, marshal_with, reqparse
from flask_restful.inputs import int_range
Expand Down Expand Up @@ -179,17 +175,6 @@ def get(self, app_model):
return {'count': count}


def compact_response(response: Union[dict, Generator]) -> Response:
if isinstance(response, dict):
return Response(response=json.dumps(response), status=200, mimetype='application/json')
else:
def generate() -> Generator:
yield from response

return Response(stream_with_context(generate()), status=200,
mimetype='text/event-stream')


class MessageSuggestedQuestionApi(Resource):
@setup_required
@login_required
Expand Down
40 changes: 12 additions & 28 deletions api/controllers/console/app/workflow.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import json
import logging
from collections.abc import Generator
from typing import Union

from flask import Response, stream_with_context
from flask_restful import Resource, marshal_with, reqparse
from werkzeug.exceptions import InternalServerError, NotFound

Expand All @@ -13,12 +10,15 @@
from controllers.console.app.wraps import get_app_model
from controllers.console.setup import setup_required
from controllers.console.wraps import account_initialization_required
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from fields.workflow_fields import workflow_fields
from fields.workflow_run_fields import workflow_run_node_execution_fields
from libs import helper
from libs.helper import TimestampField, uuid_value
from libs.login import current_user, login_required
from models.model import App, AppMode
from services.app_generate_service import AppGenerateService
from services.workflow_service import WorkflowService

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -87,16 +87,16 @@ def post(self, app_model: App):
parser.add_argument('conversation_id', type=uuid_value, location='json')
args = parser.parse_args()

workflow_service = WorkflowService()
try:
response = workflow_service.run_advanced_chat_draft_workflow(
response = AppGenerateService.generate(
app_model=app_model,
user=current_user,
args=args,
invoke_from=InvokeFrom.DEBUGGER
invoke_from=InvokeFrom.DEBUGGER,
streaming=True
)

return compact_response(response)
return helper.compact_generate_response(response)
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
except services.errors.conversation.ConversationCompletedError:
Expand All @@ -121,17 +121,16 @@ def post(self, app_model: App):
parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json')
args = parser.parse_args()

workflow_service = WorkflowService()

try:
response = workflow_service.run_draft_workflow(
response = AppGenerateService.generate(
app_model=app_model,
user=current_user,
args=args,
invoke_from=InvokeFrom.DEBUGGER
invoke_from=InvokeFrom.DEBUGGER,
streaming=True
)

return compact_response(response)
return helper.compact_generate_response(response)
except ValueError as e:
raise e
except Exception as e:
Expand All @@ -148,12 +147,7 @@ def post(self, app_model: App, task_id: str):
"""
Stop workflow task
"""
workflow_service = WorkflowService()
workflow_service.stop_workflow_task(
task_id=task_id,
user=current_user,
invoke_from=InvokeFrom.DEBUGGER
)
AppQueueManager.set_stop_flag(task_id, InvokeFrom.DEBUGGER, current_user.id)

return {
"result": "success"
Expand Down Expand Up @@ -283,16 +277,6 @@ def post(self, app_model: App):
return workflow


def compact_response(response: Union[dict, Generator]) -> Response:
if isinstance(response, dict):
return Response(response=json.dumps(response), status=200, mimetype='application/json')
else:
def generate() -> Generator:
yield from response

return Response(stream_with_context(generate()), status=200,
mimetype='text/event-stream')


api.add_resource(DraftWorkflowApi, '/apps/<uuid:app_id>/workflows/draft')
api.add_resource(AdvancedChatDraftWorkflowRunApi, '/apps/<uuid:app_id>/advanced-chat/workflows/draft/run')
Expand Down
26 changes: 6 additions & 20 deletions api/controllers/console/explore/completion.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import json
import logging
from collections.abc import Generator
from datetime import datetime
from typing import Union

from flask import Response, stream_with_context
from flask_login import current_user
from flask_restful import reqparse
from werkzeug.exceptions import InternalServerError, NotFound
Expand All @@ -26,8 +22,9 @@
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.model_runtime.errors.invoke import InvokeError
from extensions.ext_database import db
from libs import helper
from libs.helper import uuid_value
from services.completion_service import CompletionService
from services.app_generate_service import AppGenerateService


# define completion api for user
Expand All @@ -53,15 +50,15 @@ def post(self, installed_app):
db.session.commit()

try:
response = CompletionService.completion(
response = AppGenerateService.generate(
app_model=app_model,
user=current_user,
args=args,
invoke_from=InvokeFrom.EXPLORE,
streaming=streaming
)

return compact_response(response)
return helper.compact_generate_response(response)
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
except services.errors.conversation.ConversationCompletedError:
Expand Down Expand Up @@ -117,15 +114,15 @@ def post(self, installed_app):
db.session.commit()

try:
response = CompletionService.completion(
response = AppGenerateService.generate(
app_model=app_model,
user=current_user,
args=args,
invoke_from=InvokeFrom.EXPLORE,
streaming=streaming
)

return compact_response(response)
return helper.compact_generate_response(response)
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
except services.errors.conversation.ConversationCompletedError:
Expand Down Expand Up @@ -159,17 +156,6 @@ def post(self, installed_app, task_id):
return {'result': 'success'}, 200


def compact_response(response: Union[dict, Generator]) -> Response:
if isinstance(response, dict):
return Response(response=json.dumps(response), status=200, mimetype='application/json')
else:
def generate() -> Generator:
yield from response

return Response(stream_with_context(generate()), status=200,
mimetype='text/event-stream')


api.add_resource(CompletionApi, '/installed-apps/<uuid:installed_app_id>/completion-messages', endpoint='installed_app_completion')
api.add_resource(CompletionStopApi, '/installed-apps/<uuid:installed_app_id>/completion-messages/<string:task_id>/stop', endpoint='installed_app_stop_completion')
api.add_resource(ChatApi, '/installed-apps/<uuid:installed_app_id>/chat-messages', endpoint='installed_app_chat_completion')
Expand Down
22 changes: 4 additions & 18 deletions api/controllers/console/explore/message.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import json
import logging
from collections.abc import Generator
from typing import Union

from flask import Response, stream_with_context
from flask_login import current_user
from flask_restful import marshal_with, reqparse
from flask_restful.inputs import int_range
Expand All @@ -28,8 +24,9 @@
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.model_runtime.errors.invoke import InvokeError
from fields.message_fields import message_infinite_scroll_pagination_fields
from libs import helper
from libs.helper import uuid_value
from services.completion_service import CompletionService
from services.app_generate_service import AppGenerateService
from services.errors.app import MoreLikeThisDisabledError
from services.errors.conversation import ConversationNotExistsError
from services.errors.message import MessageNotExistsError, SuggestedQuestionsAfterAnswerDisabledError
Expand Down Expand Up @@ -91,14 +88,14 @@ def get(self, installed_app, message_id):
streaming = args['response_mode'] == 'streaming'

try:
response = CompletionService.generate_more_like_this(
response = AppGenerateService.generate_more_like_this(
app_model=app_model,
user=current_user,
message_id=message_id,
invoke_from=InvokeFrom.EXPLORE,
streaming=streaming
)
return compact_response(response)
return helper.compact_generate_response(response)
except MessageNotExistsError:
raise NotFound("Message Not Exists.")
except MoreLikeThisDisabledError:
Expand All @@ -118,17 +115,6 @@ def get(self, installed_app, message_id):
raise InternalServerError()


def compact_response(response: Union[dict, Generator]) -> Response:
if isinstance(response, dict):
return Response(response=json.dumps(response), status=200, mimetype='application/json')
else:
def generate() -> Generator:
yield from response

return Response(stream_with_context(generate()), status=200,
mimetype='text/event-stream')


class MessageSuggestedQuestionApi(InstalledAppResource):
def get(self, installed_app, message_id):
app_model = installed_app.app
Expand Down
Loading

0 comments on commit fa010ca

Please sign in to comment.