Skip to content

Commit

Permalink
Merge pull request #326 from actiontech/sqle-ce-2752-5
Browse files Browse the repository at this point in the history
Sqle ce 2752 5
  • Loading branch information
LordofAvernus authored Nov 15, 2024
2 parents 449c098 + b3949ea commit fffa2e4
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 35 deletions.
36 changes: 34 additions & 2 deletions internal/apiserver/service/dms_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,23 @@ func (d *DMSController) CheckDBServiceIsConnectableById(c echo.Context) error {
// schema:
// "$ref": "#/definitions/GenericResp"
func (d *DMSController) CheckProjectDBServicesConnections(c echo.Context) error {
return nil
var req aV1.CheckDBServicesIsConnectableReq
err := bindAndValidateReq(c, &req)
if nil != err {
return NewErrResp(c, err, apiError.BadRequestErr)
}

currentUserUid, err := jwt.GetUserUidStrFromContext(c)
if err != nil {
return NewErrResp(c, err, apiError.DMSServiceErr)
}

reply, err := d.DMS.CheckDBServiceIsConnectableByIds(c.Request().Context(), req.ProjectUid,currentUserUid,req.DBServices)
if nil != err {
return NewErrResp(c, err, apiError.DMSServiceErr)
}

return NewOkRespWithReply(c, reply)
}


Expand Down Expand Up @@ -2087,7 +2103,23 @@ func (a *DMSController) DBServicesConnection(c echo.Context) error {
// schema:
// "$ref": "#/definitions/GenericResp"
func (a *DMSController) CheckGlobalDBServicesConnections(c echo.Context) error {
return nil
var req aV1.DBServicesConnectionReq
err := bindAndValidateReq(c, &req)
if nil != err {
return NewErrResp(c, err, apiError.BadRequestErr)
}

currentUserUid, err := jwt.GetUserUidStrFromContext(c)
if err != nil {
return NewErrResp(c, err, apiError.DMSServiceErr)
}

reply, err := a.DMS.CheckDBServiceIsConnectableByIds(c.Request().Context(),"", currentUserUid,req.DBServices)
if nil != err {
return NewErrResp(c, err, apiError.DMSServiceErr)
}

return NewOkRespWithReply(c, reply)
}


Expand Down
159 changes: 156 additions & 3 deletions internal/dms/biz/db_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"

dmsV1 "github.com/actiontech/dms/api/dms/service/v1"
"github.com/actiontech/dms/internal/apiserver/conf"
Expand All @@ -18,6 +19,7 @@ import (
pkgParams "github.com/actiontech/dms/pkg/params"
pkgPeriods "github.com/actiontech/dms/pkg/periods"
pkgRand "github.com/actiontech/dms/pkg/rand"
"github.com/go-openapi/strfmt"

dmsCommonV1 "github.com/actiontech/dms/pkg/dms-common/api/dms/v1"
"github.com/actiontech/dms/pkg/dms-common/pkg/aes"
Expand All @@ -32,6 +34,13 @@ type SQLEConfig struct {
SQLQueryConfig *SQLQueryConfig `json:"sql_query_config"`
}

type LastConnectionStatus string

const (
LastConnectionStatusSuccess LastConnectionStatus = "connect_success"
LastConnectionStatusFailed LastConnectionStatus = "connect_failed"
)

// 数据源
type DBService struct {
Base
Expand All @@ -50,6 +59,11 @@ type DBService struct {
MaintenancePeriod pkgPeriods.Periods `json:"maintenance_period"`
Source string `json:"source"`

// db service connection
LastConnectionStatus *LastConnectionStatus `json:"last_connection_status"`
LastConnectionTime *time.Time `json:"last_connection_time"`
LastConnectionErrorMsg *string `json:"last_connection_error_msg"`

// sqle config
SQLEConfig *SQLEConfig `json:"sqle_config"`
IsMaskingSwitch bool `json:"is_masking_switch"`
Expand Down Expand Up @@ -231,10 +245,15 @@ func (d *DBServiceUsecase) ListDBService(ctx context.Context, option *ListDBServ
if err != nil {
return nil, 0, fmt.Errorf("list db services failed: %w", err)
}
// 只允许系统用户查询所有数据源,同步数据到其他服务(provision)
// 只允许系统用户和平台管理/查看权限用户查询所有数据源,同步数据到其他服务(provision)
if projectUid == "" {
if currentUserUid != pkgConst.UIDOfUserSys {
return nil, 0, fmt.Errorf("list db service error: project is empty")
canViewProject, err := d.opPermissionVerifyUsecase.CanViewProject(ctx, currentUserUid, projectUid)
if err != nil {
return nil, 0, err
}

if !(currentUserUid == pkgConst.UIDOfUserSys || canViewProject) {
return nil, 0, fmt.Errorf("user is not sys user or global management or view permission user")
}
} else {
err = d.AddInstanceAuditPlanForDBServiceFromSqle(ctx, projectUid, services)
Expand All @@ -246,6 +265,61 @@ func (d *DBServiceUsecase) ListDBService(ctx context.Context, option *ListDBServ
return services, total, nil
}

func (d *DBServiceUsecase) TestDbServiceConnections(ctx context.Context, DBServiceList []*DBService, currentUserUid string) []dmsV1.DBServiceIsConnectableReply {
connectionResp := make([]dmsV1.DBServiceIsConnectableReply, len(DBServiceList))
concurrencyLimit := make(chan int, 3)
var wg sync.WaitGroup
var respMu sync.Mutex

for i, dbService := range DBServiceList {
wg.Add(1)

go func(dbService *DBService, index int) {
defer func() {
if r := recover(); r != nil {
d.log.Errorf("CheckDBServiceIsConnectableByIds panic: %v", r)
}
}()
defer wg.Done()

if dbService == nil {
return
}

concurrencyLimit <- 1

connectionResult, err := d.TestDbServiceConnection(ctx, dbService)
if err != nil {
d.log.Errorf("db connectionResult uid: %v,TestDBServiceConnection err: %v", connectionResult.DBServiceUid, err)
}

dbService.LastConnectionStatus = &connectionResult.ConnectionStatus
dbService.LastConnectionTime = &connectionResult.TestConnectionTime
dbService.LastConnectionErrorMsg = &connectionResult.ConnectErrorMessage

err = d.UpdateDBServiceByBiz(ctx, dbService, currentUserUid)
if err != nil {
d.log.Errorf("dbService name: %v,UpdateDBServiceByBiz err: %v", dbService.Name, err)
}

respMu.Lock()
connectionResp[index] = dmsV1.DBServiceIsConnectableReply{
DBServiceUid: connectionResult.DBServiceUid,
ConnectionStatus: dmsCommonV1.LastConnectionTestStatus(connectionResult.ConnectionStatus),
TestConnectionTime: strfmt.DateTime(connectionResult.TestConnectionTime),
ConnectErrorMessage: connectionResult.ConnectErrorMessage,
}
respMu.Unlock()

<-concurrencyLimit
}(dbService, i)
}

wg.Wait()

return connectionResp
}

type instanceAuditPlanReply struct {
Code int `json:"code" example:"0"`
Message string `json:"message" example:"ok"`
Expand Down Expand Up @@ -443,6 +517,85 @@ func (d *DBServiceUsecase) GetDBService(ctx context.Context, dbServiceUid string
return d.repo.GetDBService(ctx, dbServiceUid)
}

type TestDbServiceConnectionResult struct {
DBServiceUid string
ConnectionStatus LastConnectionStatus
TestConnectionTime time.Time
ConnectErrorMessage string
}

func (d *DBServiceUsecase) TestDbServiceConnection(ctx context.Context, dbService *DBService) (TestDbServiceConnectionResult, error) {
connectionResult := TestDbServiceConnectionResult{
DBServiceUid: dbService.UID,
TestConnectionTime: time.Now(),
ConnectionStatus: LastConnectionStatusSuccess,
}

var additionParams []*dmsCommonV1.AdditionalParam
for _, item := range dbService.AdditionalParams {
additionParams = append(additionParams, &dmsCommonV1.AdditionalParam{
Name: item.Key,
Value: item.Value,
})
}

checkDbConnectableParams := dmsCommonV1.CheckDbConnectable{
DBType: dbService.DBType,
User: dbService.User,
Host: dbService.Host,
Port: dbService.Port,
Password: dbService.Password,
AdditionalParams: additionParams,
}

connectable, err := d.IsConnectable(ctx, checkDbConnectableParams)
if err != nil {
connectionResult.ConnectionStatus = LastConnectionStatusFailed
connectionResult.ConnectErrorMessage = err.Error()
return connectionResult, err
}

if len(connectable) == 0 {
connectionResult.ConnectionStatus = LastConnectionStatusFailed
connectionResult.ConnectErrorMessage = "check db connectable failed"
} else {
for _, c := range connectable {
if !c.IsConnectable {
connectionResult.ConnectionStatus = LastConnectionStatusFailed
connectionResult.ConnectErrorMessage = c.ConnectErrorMessage
break
}
}
}

return connectionResult, nil
}

func (d *DBServiceUsecase) UpdateDBServiceByBiz(ctx context.Context, ds *DBService, currentUserUid string) (err error) {
// 检查项目是否归档/删除
if err := d.projectUsecase.isProjectActive(ctx, ds.ProjectUID); err != nil {
return fmt.Errorf("update db service error: %v", err)
}

// 检查当前用户有项目管理员权限
if canOpProject, err := d.opPermissionVerifyUsecase.CanOpProject(ctx, currentUserUid, ds.ProjectUID); err != nil {
return fmt.Errorf("check user is project admin or golobal op permission failed: %v", err)
} else if !canOpProject {
return fmt.Errorf("user is not project admin or golobal op permission user")
}

if err := d.repo.UpdateDBService(ctx, ds); nil != err {
return fmt.Errorf("update db service error: %v", err)
}

err = d.pluginUsecase.OperateDataResourceHandle(ctx, ds.UID, dmsCommonV1.DataResourceTypeDBService, dmsCommonV1.OperationTypeUpdate, dmsCommonV1.OperationTimingAfter)
if err != nil {
return fmt.Errorf("plugin handle after update db_service err: %v", err)
}

return nil
}

func (d *DBServiceUsecase) UpdateDBService(ctx context.Context, dbServiceUid string, updateDBService *BizDBServiceArgs, currentUserUid string) (err error) {
ds, err := d.repo.GetDBService(ctx, dbServiceUid)
if err != nil {
Expand Down
31 changes: 16 additions & 15 deletions internal/dms/biz/repo_fields.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 56 additions & 0 deletions internal/dms/service/db_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
pkgAes "github.com/actiontech/dms/pkg/dms-common/pkg/aes"
"github.com/actiontech/dms/pkg/params"
"github.com/actiontech/dms/pkg/periods"
"github.com/go-openapi/strfmt"
)

func (d *DMSService) DelDBService(ctx context.Context, req *dmsV1.DelDBServiceReq, currentUserUid string) (err error) {
Expand Down Expand Up @@ -141,6 +142,44 @@ func (d *DMSService) CheckDBServiceIsConnectableById(ctx context.Context, req *d
return ret, nil
}

func (d *DMSService) CheckDBServiceIsConnectableByIds(ctx context.Context, projectUID, currentUserUid string, dbServiceList []dmsV1.DbServiceConnections) (*dmsV1.DBServicesConnectionReqReply, error) {
if len(dbServiceList) == 0 {
return &dmsV1.DBServicesConnectionReqReply{
Data: []dmsV1.DBServiceIsConnectableReply{},
}, nil
}

filterBy := make([]pkgConst.FilterCondition, 0)
var dbServiceUidList []string
for _, dbService := range dbServiceList {
dbServiceUidList = append(dbServiceUidList, dbService.DBServiceUid)
}

filterBy = append(filterBy, pkgConst.FilterCondition{
Field: string(biz.DBServiceFieldUID),
Operator: pkgConst.FilterOperatorIn,
Value: dbServiceUidList,
})

listOption := &biz.ListDBServicesOption{
PageNumber: 1,
LimitPerPage: uint32(len(dbServiceList)),
OrderBy: biz.DBServiceFieldName,
FilterBy: filterBy,
}

DBServiceList, _, err := d.DBServiceUsecase.ListDBService(ctx, listOption, projectUID, currentUserUid)
if err != nil {
return nil, err
}

resp := d.DBServiceUsecase.TestDbServiceConnections(ctx, DBServiceList, currentUserUid)

return &dmsV1.DBServicesConnectionReqReply{
Data: resp,
}, nil
}

func (d *DMSService) AddDBService(ctx context.Context, req *dmsV1.AddDBServiceReq, currentUserUid string) (reply *dmsV1.AddDBServiceReply, err error) {
d.log.Infof("AddDBServices.req=%v", req)
defer func() {
Expand Down Expand Up @@ -406,6 +445,13 @@ func (d *DMSService) ListDBServices(ctx context.Context, req *dmsCommonV1.ListDB
Value: req.ProjectUid,
})
}
if req.FilterLastConnectionTestStatus != nil {
filterBy = append(filterBy, pkgConst.FilterCondition{
Field: string(biz.DBServiceFieldLastConnectionStatus),
Operator: pkgConst.FilterOperatorEqual,
Value: *req.FilterLastConnectionTestStatus,
})
}
if len(req.FilterByDBServiceIds) > 0 {
filterBy = append(filterBy, pkgConst.FilterCondition{
Field: string(biz.DBServiceFieldUID),
Expand Down Expand Up @@ -464,6 +510,16 @@ func (d *DMSService) ListDBServices(ctx context.Context, req *dmsCommonV1.ListDB
AuditPlanTypes: u.AuditPlanTypes,
}

if u.LastConnectionTime != nil {
ret[i].LastConnectionTestTime = strfmt.DateTime(*u.LastConnectionTime)
}
if u.LastConnectionStatus != nil {
ret[i].LastConnectionTestStatus = dmsCommonV1.LastConnectionTestStatus(*u.LastConnectionStatus)
}
if u.LastConnectionErrorMsg != nil {
ret[i].LastConnectionTestErrorMessage = *u.LastConnectionErrorMsg
}

if u.AdditionalParams != nil {
additionalParams := make([]*dmsCommonV1.AdditionalParam, 0, len(u.AdditionalParams))
for _, item := range u.AdditionalParams {
Expand Down
Loading

0 comments on commit fffa2e4

Please sign in to comment.