diff --git a/config/openim-msggateway.yml b/config/openim-msggateway.yml index 5659c6f9b3..6c46b52a81 100644 --- a/config/openim-msggateway.yml +++ b/config/openim-msggateway.yml @@ -22,5 +22,3 @@ longConnSvr: websocketMaxMsgLen: 4096 # WebSocket connection handshake timeout in seconds websocketTimeout: 10 - - diff --git a/config/openim-push.yml b/config/openim-push.yml index 4d2aaca6b0..2d54bb9e13 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -17,25 +17,25 @@ maxConcurrentWorkers: 3 enable: geTui geTui: pushUrl: https://restapi.getui.com/v2/$appId - masterSecret: - appKey: - intent: - channelID: - channelName: + masterSecret: + appKey: + intent: + channelID: + channelName: fcm: # Prioritize using file paths. If the file path is empty, use URL - filePath: # File path is concatenated with the parameters passed in through - c(`mage` default pass in `config/`) and filePath. + filePath: # File path is concatenated with the parameters passed in through - c(`mage` default pass in `config/`) and filePath. authURL: # Must start with https or http. jpns: - appKey: - masterSecret: - pushURL: - pushIntent: + appKey: + masterSecret: + pushURL: + pushIntent: # iOS system push sound and badge count iosPush: - pushSound: xxx - badgeCount: true - production: false + pushSound: xxx + badgeCount: true + production: false fullUserCache: true diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index bc06fa9507..af96e7d460 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -16,6 +16,7 @@ package msggateway import ( "context" + "encoding/json" "fmt" "runtime/debug" "sync" @@ -69,6 +70,8 @@ type Client struct { IsCompress bool `json:"isCompress"` UserID string `json:"userID"` IsBackground bool `json:"isBackground"` + SDKType string `json:"sdkType"` + Encoder Encoder ctx *UserConnContext longConnServer LongConnServer closed atomic.Bool @@ -94,11 +97,17 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer c.closed.Store(false) c.closedErr = nil c.token = ctx.GetToken() + c.SDKType = ctx.GetSDKType() c.hbCtx, c.hbCancel = context.WithCancel(c.ctx) c.subLock = new(sync.Mutex) if c.subUserIDs != nil { clear(c.subUserIDs) } + if c.SDKType == GoSDK { + c.Encoder = NewGobEncoder() + } else { + c.Encoder = NewJsonEncoder() + } c.subUserIDs = make(map[string]struct{}) } @@ -159,9 +168,12 @@ func (c *Client) readMessage() { return } case MessageText: - c.closedErr = ErrNotSupportMessageProtocol - return - + _ = c.conn.SetReadDeadline(pongWait) + parseDataErr := c.handlerTextMessage(message) + if parseDataErr != nil { + c.closedErr = parseDataErr + return + } case PingMessage: err := c.writePongMsg("") log.ZError(c.ctx, "writePongMsg", err) @@ -188,7 +200,7 @@ func (c *Client) handleMessage(message []byte) error { var binaryReq = getReq() defer freeReq(binaryReq) - err := c.longConnServer.Decode(message, binaryReq) + err := c.Encoder.Decode(message, binaryReq) if err != nil { return err } @@ -335,7 +347,7 @@ func (c *Client) writeBinaryMsg(resp Resp) error { return nil } - encodedBuf, err := c.longConnServer.Encode(resp) + encodedBuf, err := c.Encoder.Encode(resp) if err != nil { return err } @@ -419,3 +431,26 @@ func (c *Client) writePongMsg(appData string) error { return errs.Wrap(err) } + +func (c *Client) handlerTextMessage(b []byte) error { + var msg TextMessage + if err := json.Unmarshal(b, &msg); err != nil { + return err + } + switch msg.Type { + case TextPong: + return nil + case TextPing: + msg.Type = TextPong + msgData, err := json.Marshal(msg) + if err != nil { + return err + } + if err := c.conn.SetWriteDeadline(writeWait); err != nil { + return err + } + return c.conn.WriteMessage(MessageText, msgData) + default: + return fmt.Errorf("not support message type %s", msg.Type) + } +} diff --git a/internal/msggateway/constant.go b/internal/msggateway/constant.go index 584cebe1e1..a825c05196 100644 --- a/internal/msggateway/constant.go +++ b/internal/msggateway/constant.go @@ -27,6 +27,12 @@ const ( GzipCompressionProtocol = "gzip" BackgroundStatus = "isBackground" SendResponse = "isMsgResp" + SDKType = "sdkType" +) + +const ( + GoSDK = "go" + JsSDK = "js" ) const ( diff --git a/internal/msggateway/context.go b/internal/msggateway/context.go index 3909766b1b..d73a96df4c 100644 --- a/internal/msggateway/context.go +++ b/internal/msggateway/context.go @@ -153,6 +153,14 @@ func (c *UserConnContext) GetCompression() bool { return false } +func (c *UserConnContext) GetSDKType() string { + sdkType := c.Req.URL.Query().Get(SDKType) + if sdkType == "" { + sdkType = GoSDK + } + return sdkType +} + func (c *UserConnContext) ShouldSendResp() bool { errResp, exists := c.Query(SendResponse) if exists { @@ -193,7 +201,11 @@ func (c *UserConnContext) ParseEssentialArgs() error { _, err := strconv.Atoi(platformIDStr) if err != nil { return servererrs.ErrConnArgsErr.WrapMsg("platformID is not int") - + } + switch sdkType, _ := c.Query(SDKType); sdkType { + case "", GoSDK, JsSDK: + default: + return servererrs.ErrConnArgsErr.WrapMsg("sdkType is not go or js") } return nil } diff --git a/internal/msggateway/encoder.go b/internal/msggateway/encoder.go index 3af2663748..6a5936d6d2 100644 --- a/internal/msggateway/encoder.go +++ b/internal/msggateway/encoder.go @@ -17,6 +17,7 @@ package msggateway import ( "bytes" "encoding/gob" + "encoding/json" "github.com/openimsdk/tools/errs" ) @@ -28,12 +29,12 @@ type Encoder interface { type GobEncoder struct{} -func NewGobEncoder() *GobEncoder { - return &GobEncoder{} +func NewGobEncoder() Encoder { + return GobEncoder{} } -func (g *GobEncoder) Encode(data any) ([]byte, error) { - buff := bytes.Buffer{} +func (g GobEncoder) Encode(data any) ([]byte, error) { + var buff bytes.Buffer enc := gob.NewEncoder(&buff) if err := enc.Encode(data); err != nil { return nil, errs.WrapMsg(err, "GobEncoder.Encode failed", "action", "encode") @@ -41,7 +42,7 @@ func (g *GobEncoder) Encode(data any) ([]byte, error) { return buff.Bytes(), nil } -func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error { +func (g GobEncoder) Decode(encodeData []byte, decodeData any) error { buff := bytes.NewBuffer(encodeData) dec := gob.NewDecoder(buff) if err := dec.Decode(decodeData); err != nil { @@ -49,3 +50,25 @@ func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error { } return nil } + +type JsonEncoder struct{} + +func NewJsonEncoder() Encoder { + return JsonEncoder{} +} + +func (g JsonEncoder) Encode(data any) ([]byte, error) { + b, err := json.Marshal(data) + if err != nil { + return nil, errs.New("JsonEncoder.Encode failed", "action", "encode") + } + return b, nil +} + +func (g JsonEncoder) Decode(encodeData []byte, decodeData any) error { + err := json.Unmarshal(encodeData, decodeData) + if err != nil { + return errs.New("JsonEncoder.Decode failed", "action", "decode") + } + return nil +} diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index e96ab4b0dc..23d9150133 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -83,17 +83,11 @@ func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready f return s } -func (s *Server) OnlinePushMsg( - context context.Context, - req *msggateway.OnlinePushMsgReq, -) (*msggateway.OnlinePushMsgResp, error) { +func (s *Server) OnlinePushMsg(context context.Context, req *msggateway.OnlinePushMsgReq) (*msggateway.OnlinePushMsgResp, error) { panic("implement me") } -func (s *Server) GetUsersOnlineStatus( - ctx context.Context, - req *msggateway.GetUsersOnlineStatusReq, -) (*msggateway.GetUsersOnlineStatusResp, error) { +func (s *Server) GetUsersOnlineStatus(ctx context.Context, req *msggateway.GetUsersOnlineStatusReq) (*msggateway.GetUsersOnlineStatusResp, error) { if !authverify.IsAppManagerUid(ctx, s.config.Share.IMAdminUserID) { return nil, errs.ErrNoPermission.WrapMsg("only app manager") } @@ -155,6 +149,7 @@ func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.M (client.IsBackground && client.PlatformID != constant.IOSPlatformID) { err := client.PushMessage(ctx, msgData) if err != nil { + log.ZWarn(ctx, "online push msg failed", err, "userID", userID, "platformID", client.PlatformID) userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code()) } else { if _, ok := s.pushTerminal[client.PlatformID]; ok { @@ -220,10 +215,7 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msgga } } -func (s *Server) KickUserOffline( - ctx context.Context, - req *msggateway.KickUserOfflineReq, -) (*msggateway.KickUserOfflineResp, error) { +func (s *Server) KickUserOffline(ctx context.Context, req *msggateway.KickUserOfflineReq) (*msggateway.KickUserOfflineResp, error) { for _, v := range req.KickUserIDList { clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID)) if !ok { diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index 4b78c10048..5407ba90cb 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -16,6 +16,7 @@ package msggateway import ( "context" + "encoding/json" "sync" "github.com/go-playground/validator/v10" @@ -31,6 +32,16 @@ import ( "github.com/openimsdk/tools/utils/jsonutil" ) +const ( + TextPing = "ping" + TextPong = "pong" +) + +type TextMessage struct { + Type string `json:"type"` + Body json.RawMessage `json:"body"` +} + type Req struct { ReqIdentifier int32 `json:"reqIdentifier" validate:"required"` Token string `json:"token"` diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index 48e4b5cee9..e6b4f3fa47 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -37,7 +37,6 @@ type LongConnServer interface { SetKickHandlerInfo(i *kickHandler) SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error) Compressor - Encoder MessageHandler } @@ -61,7 +60,7 @@ type WsServer struct { authClient *rpcclient.Auth disCov discovery.SvcDiscoveryRegistry Compressor - Encoder + //Encoder MessageHandler webhookClient *webhook.Client } @@ -135,7 +134,6 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer { clients: newUserMap(), subscription: newSubscription(), Compressor: NewGzipCompressor(), - Encoder: NewGobEncoder(), webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL), } } @@ -278,14 +276,7 @@ func (ws *WsServer) registerClient(client *Client) { wg.Wait() - log.ZDebug( - client.ctx, - "user online", - "online user Num", - ws.onlineUserNum.Load(), - "online user conn Num", - ws.onlineUserConnNum.Load(), - ) + log.ZDebug(client.ctx, "user online", "online user Num", ws.onlineUserNum.Load(), "online user conn Num", ws.onlineUserConnNum.Load()) } func getRemoteAdders(client []*Client) string { diff --git a/pkg/common/storage/controller/auth.go b/pkg/common/storage/controller/auth.go index 82d6f9476a..df12749677 100644 --- a/pkg/common/storage/controller/auth.go +++ b/pkg/common/storage/controller/auth.go @@ -2,15 +2,14 @@ package controller import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" - "github.com/openimsdk/tools/log" - "github.com/golang-jwt/jwt/v4" "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/tokenverify" )