From ca6017eac6d4b5589e034594b79fbe1c5d9090e0 Mon Sep 17 00:00:00 2001 From: Morya Date: Sat, 23 Nov 2024 15:16:00 +0800 Subject: [PATCH 1/3] fix: minor log typo --- pkg/common/storage/cache/redis/conversation.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/common/storage/cache/redis/conversation.go b/pkg/common/storage/cache/redis/conversation.go index 326f60b96a..91d8ed69de 100644 --- a/pkg/common/storage/cache/redis/conversation.go +++ b/pkg/common/storage/cache/redis/conversation.go @@ -38,7 +38,7 @@ const ( func NewConversationRedis(rdb redis.UniversalClient, localCache *config.LocalCache, opts *rockscache.Options, db database.Conversation) cache.ConversationCache { batchHandler := NewBatchDeleterRedis(rdb, opts, []string{localCache.Conversation.Topic}) c := localCache.Conversation - log.ZDebug(context.Background(), "black local cache init", "Topic", c.Topic, "SlotNum", c.SlotNum, "SlotSize", c.SlotSize, "enable", c.Enable()) + log.ZDebug(context.Background(), "conversation local cache init", "Topic", c.Topic, "SlotNum", c.SlotNum, "SlotSize", c.SlotSize, "enable", c.Enable()) return &ConversationRedisCache{ BatchDeleter: batchHandler, rcClient: rockscache.NewClient(rdb, *opts), From 7f8b2925ef965599e30f6068d75be4093d696c69 Mon Sep 17 00:00:00 2001 From: fenghao Date: Tue, 26 Nov 2024 17:05:16 +0800 Subject: [PATCH 2/3] fix: #2895 no need to specify listen port in config file, just use system random --- config/openim-msggateway.yml | 4 +- config/openim-push.yml | 2 - config/openim-rpc-auth.yml | 3 +- config/openim-rpc-conversation.yml | 2 - config/openim-rpc-friend.yml | 2 - config/openim-rpc-group.yml | 2 - config/openim-rpc-msg.yml | 2 - config/openim-rpc-third.yml | 2 - config/openim-rpc-user.yml | 2 - internal/msggateway/hub_server.go | 3 +- internal/tools/addr/addr.go | 165 +++++++++++++++++++++++++++++ pkg/common/cmd/auth.go | 1 - pkg/common/cmd/conversation.go | 1 - pkg/common/cmd/friend.go | 1 - pkg/common/cmd/group.go | 1 - pkg/common/cmd/msg.go | 1 - pkg/common/cmd/push.go | 1 - pkg/common/cmd/third.go | 1 - pkg/common/cmd/user.go | 1 - pkg/common/config/config.go | 1 - pkg/common/startrpc/start.go | 53 +++++---- 21 files changed, 193 insertions(+), 58 deletions(-) create mode 100644 internal/tools/addr/addr.go diff --git a/config/openim-msggateway.yml b/config/openim-msggateway.yml index 6c46b52a81..84e232ccb4 100644 --- a/config/openim-msggateway.yml +++ b/config/openim-msggateway.yml @@ -1,8 +1,6 @@ rpc: # The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP - registerIP: - # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports - ports: [ 10140, 10141, 10142, 10143, 10144, 10145, 10146, 10147, 10148, 10149, 10150, 10151, 10152, 10153, 10154, 10155 ] + registerIP: prometheus: # Enable or disable Prometheus monitoring diff --git a/config/openim-push.yml b/config/openim-push.yml index 92f716ba2d..2db09b37e0 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -3,8 +3,6 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 - # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports - ports: [ 10170, 10171, 10172, 10173, 10174, 10175, 10176, 10177, 10178, 10179, 10180, 10181, 10182, 10183, 10184, 10185 ] prometheus: # Enable or disable Prometheus monitoring diff --git a/config/openim-rpc-auth.yml b/config/openim-rpc-auth.yml index 496803e43b..9ebd78070b 100644 --- a/config/openim-rpc-auth.yml +++ b/config/openim-rpc-auth.yml @@ -3,8 +3,7 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 - # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports - ports: [ 10200 ] + prometheus: # Enable or disable Prometheus monitoring diff --git a/config/openim-rpc-conversation.yml b/config/openim-rpc-conversation.yml index 3581d7e19e..1cb56ac606 100644 --- a/config/openim-rpc-conversation.yml +++ b/config/openim-rpc-conversation.yml @@ -3,8 +3,6 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 - # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports - ports: [ 10220 ] prometheus: # Enable or disable Prometheus monitoring diff --git a/config/openim-rpc-friend.yml b/config/openim-rpc-friend.yml index 3022c09f32..11b805299c 100644 --- a/config/openim-rpc-friend.yml +++ b/config/openim-rpc-friend.yml @@ -3,8 +3,6 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 - # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports - ports: [ 10240 ] prometheus: # Enable or disable Prometheus monitoring diff --git a/config/openim-rpc-group.yml b/config/openim-rpc-group.yml index 9a634d12ff..d16ae7b02a 100644 --- a/config/openim-rpc-group.yml +++ b/config/openim-rpc-group.yml @@ -3,8 +3,6 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 - # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports - ports: [ 10260 ] prometheus: # Enable or disable Prometheus monitoring diff --git a/config/openim-rpc-msg.yml b/config/openim-rpc-msg.yml index 82d6e2f539..5917ff5ae9 100644 --- a/config/openim-rpc-msg.yml +++ b/config/openim-rpc-msg.yml @@ -3,8 +3,6 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 - # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports - ports: [ 10280 ] prometheus: # Enable or disable Prometheus monitoring diff --git a/config/openim-rpc-third.yml b/config/openim-rpc-third.yml index d8f2d427f2..54123d8ea3 100644 --- a/config/openim-rpc-third.yml +++ b/config/openim-rpc-third.yml @@ -3,8 +3,6 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 - # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports - ports: [ 10300 ] prometheus: # Enable or disable Prometheus monitoring diff --git a/config/openim-rpc-user.yml b/config/openim-rpc-user.yml index 798105472c..4bd6444a74 100644 --- a/config/openim-rpc-user.yml +++ b/config/openim-rpc-user.yml @@ -3,8 +3,6 @@ rpc: registerIP: # Listening IP; 0.0.0.0 means both internal and external IPs are listened to, if blank, the internal network IP is automatically obtained by default listenIP: 0.0.0.0 - # Listening ports; if multiple are configured, multiple instances will be launched, and must be consistent with the number of prometheus.ports - ports: [ 10320 ] prometheus: # Whether to enable prometheus diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 23d9150133..fd73f81bdc 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -46,8 +46,7 @@ func (s *Server) InitServer(ctx context.Context, config *Config, disCov discover func (s *Server) Start(ctx context.Context, index int, conf *Config) error { return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, - conf.MsgGateway.RPC.RegisterIP, - conf.MsgGateway.RPC.Ports, index, + index, conf.Share.RpcRegisterName.MessageGateway, &conf.Share, conf, diff --git a/internal/tools/addr/addr.go b/internal/tools/addr/addr.go new file mode 100644 index 0000000000..b7cf8a2dbf --- /dev/null +++ b/internal/tools/addr/addr.go @@ -0,0 +1,165 @@ +// addr provides functions to retrieve local IP addresses from device interfaces. +package addr + +import ( + "net" + + "github.com/pkg/errors" +) + +var ( + // ErrIPNotFound no IP address found, and explicit IP not provided. + ErrIPNotFound = errors.New("no IP address found, and explicit IP not provided") +) + +// IsLocal checks whether an IP belongs to one of the device's interfaces. +func IsLocal(addr string) bool { + // Extract the host + host, _, err := net.SplitHostPort(addr) + if err == nil { + addr = host + } + + if addr == "localhost" { + return true + } + + // Check against all local ips + for _, ip := range IPs() { + if addr == ip { + return true + } + } + + return false +} + +// Extract returns a valid IP address. If the address provided is a valid +// address, it will be returned directly. Otherwise, the available interfaces +// will be iterated over to find an IP address, preferably private. +func Extract(addr string) (string, error) { + // if addr is already specified then it's directly returned + if len(addr) > 0 && (addr != "0.0.0.0" && addr != "[::]" && addr != "::") { + return addr, nil + } + + var ( + addrs []net.Addr + loAddrs []net.Addr + ) + + ifaces, err := net.Interfaces() + if err != nil { + return "", errors.Wrap(err, "failed to get interfaces") + } + + for _, iface := range ifaces { + ifaceAddrs, err := iface.Addrs() + if err != nil { + // ignore error, interface can disappear from system + continue + } + + if iface.Flags&net.FlagLoopback != 0 { + loAddrs = append(loAddrs, ifaceAddrs...) + continue + } + + addrs = append(addrs, ifaceAddrs...) + } + + // Add loopback addresses to the end of the list + addrs = append(addrs, loAddrs...) + + // Try to find private IP in list, public IP otherwise + ip, err := findIP(addrs) + if err != nil { + return "", err + } + + return ip.String(), nil +} + +// IPs returns all available interface IP addresses. +func IPs() []string { + ifaces, err := net.Interfaces() + if err != nil { + return nil + } + + var ipAddrs []string + + for _, i := range ifaces { + addrs, err := i.Addrs() + if err != nil { + continue + } + + for _, addr := range addrs { + var ip net.IP + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + } + + if ip == nil { + continue + } + + ipAddrs = append(ipAddrs, ip.String()) + } + } + + return ipAddrs +} + +// findIP will return the first private IP available in the list. +// If no private IP is available it will return the first public IP, if present. +// If no public IP is available, it will return the first loopback IP, if present. +func findIP(addresses []net.Addr) (net.IP, error) { + var publicIP net.IP + var localIP net.IP + + for _, rawAddr := range addresses { + var ip net.IP + switch addr := rawAddr.(type) { + case *net.IPAddr: + ip = addr.IP + case *net.IPNet: + ip = addr.IP + default: + continue + } + + if ip.IsLoopback() { + if localIP == nil { + localIP = ip + } + continue + } + + if !ip.IsPrivate() { + if publicIP == nil { + publicIP = ip + } + continue + } + + // Return private IP if available + return ip, nil + } + + // Return public or virtual IP + if len(publicIP) > 0 { + return publicIP, nil + } + + // Return local IP + if len(localIP) > 0 { + return localIP, nil + } + + return nil, ErrIPNotFound +} diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index b35a95f395..f8232bc260 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -55,6 +55,5 @@ func (a *AuthRpcCmd) Exec() error { func (a *AuthRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, - a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.Ports, a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start) } diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index bdb4447f48..6a0da11e5d 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -57,6 +57,5 @@ func (a *ConversationRpcCmd) Exec() error { func (a *ConversationRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, - a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.Ports, a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start) } diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index a564facd06..2ff25090e9 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -58,6 +58,5 @@ func (a *FriendRpcCmd) Exec() error { func (a *FriendRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, - a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.Ports, a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start) } diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index 9b0fbf8de3..f1757c7fa2 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -59,6 +59,5 @@ func (a *GroupRpcCmd) Exec() error { func (a *GroupRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, - a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.Ports, a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx()) } diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index bfd29398ef..ae718a11d1 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -59,6 +59,5 @@ func (a *MsgRpcCmd) Exec() error { func (a *MsgRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, - a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.Ports, a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start) } diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index ca22a697d2..2f9e248f14 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -59,6 +59,5 @@ func (a *PushRpcCmd) Exec() error { func (a *PushRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, - a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.Ports, a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start) } diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index a301b738fa..fa1d5d42dc 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -58,6 +58,5 @@ func (a *ThirdRpcCmd) Exec() error { func (a *ThirdRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, - a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.Ports, a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start) } diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index 9a614afcab..8a22bfce91 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -59,6 +59,5 @@ func (a *UserRpcCmd) Exec() error { func (a *UserRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, - a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.Ports, a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start) } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 468a150e83..dd2ab2784d 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -196,7 +196,6 @@ type Push struct { RPC struct { RegisterIP string `mapstructure:"registerIP"` ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` MaxConcurrentWorkers int `mapstructure:"maxConcurrentWorkers"` diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index fb8782d304..5c42586bf8 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -24,10 +24,10 @@ import ( "net/http" "os" "os/signal" - "strconv" "syscall" "time" + "github.com/openimsdk/open-im-server/v3/internal/tools/addr" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery" @@ -37,22 +37,15 @@ import ( "github.com/openimsdk/tools/utils/network" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "strconv" ) // Start rpc server. -func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP, - registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context, - config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { - - rpcPort, err := datautil.GetElemByIndex(rpcPorts, index) - if err != nil { - return err - } - - log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", rpcPort, - "prometheusPorts", prometheusConfig.Ports) - rpcTcpAddr := net.JoinHostPort(network.GetListenIP(listenIP), strconv.Itoa(rpcPort)) +func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP string, + index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context, + config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { + rpcTcpAddr := net.JoinHostPort(network.GetListenIP(listenIP), "0") listener, err := net.Listen( "tcp", rpcTcpAddr, @@ -60,6 +53,14 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo if err != nil { return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) } + + h, portStr, _ := net.SplitHostPort(listener.Addr().String()) + host, _ := addr.Extract(h) + port, _ := strconv.Atoi(portStr) + + log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", portStr, + "prometheusPorts", prometheusConfig.Ports) + defer listener.Close() client, err := kdisc.NewDiscoveryRegister(discovery, share) if err != nil { @@ -68,17 +69,13 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo defer client.Close() client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - registerIP, err = network.GetRpcRegisterIP(registerIP) - if err != nil { - return err - } - //var reg *prometheus.Registry - //var metric *grpcprometheus.ServerMetrics + // var reg *prometheus.Registry + // var metric *grpcprometheus.ServerMetrics if prometheusConfig.Enable { - //cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) - //reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) - //options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), + // cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) + // reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) + // options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), // grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) options = append( options, mw.GrpcServer(), @@ -98,8 +95,8 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo err = client.Register( rpcRegisterName, - registerIP, - rpcPort, + host, + port, grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { @@ -123,13 +120,13 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort)) netDone <- struct{}{} } - //metric.InitializeMetrics(srv) + // metric.InitializeMetrics(srv) // Create a HTTP server for prometheus. - //httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} - //if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + // httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} + // if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { // netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr) // netDone <- struct{}{} - //} + // } }() } From 4afc47101ece678b49e4d5143f9e54883b724f1e Mon Sep 17 00:00:00 2001 From: fenghao Date: Wed, 27 Nov 2024 19:51:51 +0800 Subject: [PATCH 3/3] drop useless code --- internal/msggateway/hub_server.go | 4 +--- internal/msggateway/init.go | 9 +++------ 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index fd73f81bdc..1c5852f965 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -55,7 +55,6 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error { } type Server struct { - rpcPort int LongConnServer LongConnServer config *Config pushTerminal map[int]struct{} @@ -68,9 +67,8 @@ func (s *Server) SetLongConnServer(LongConnServer LongConnServer) { s.LongConnServer = LongConnServer } -func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready func(srv *Server) error) *Server { +func NewServer(longConnServer LongConnServer, conf *Config, ready func(srv *Server) error) *Server { s := &Server{ - rpcPort: rpcPort, LongConnServer: longConnServer, pushTerminal: make(map[int]struct{}), config: conf, diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 50da060976..77bccc88cf 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -35,16 +35,13 @@ type Config struct { // Start run ws server. func Start(ctx context.Context, index int, conf *Config) error { - log.CInfo(ctx, "MSG-GATEWAY server is initializing", "rpcPorts", conf.MsgGateway.RPC.Ports, + log.CInfo(ctx, "MSG-GATEWAY server is initializing", "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index) if err != nil { return err } - rpcPort, err := datautil.GetElemByIndex(conf.MsgGateway.RPC.Ports, index) - if err != nil { - return err - } + rdb, err := redisutil.NewRedisClient(ctx, conf.RedisConfig.Build()) if err != nil { return err @@ -57,7 +54,7 @@ func Start(ctx context.Context, index int, conf *Config) error { WithMessageMaxMsgLength(conf.MsgGateway.LongConnSvr.WebsocketMaxMsgLen), ) - hubServer := NewServer(rpcPort, longServer, conf, func(srv *Server) error { + hubServer := NewServer(longServer, conf, func(srv *Server) error { longServer.online, _ = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, false, longServer.subscriberUserOnlineStatusChanges) return nil })