维护golang版websocket版本开源
【v0.0.2】新增责任链,新增自定义责任链,消息拦截解耦,examples有案例,添加znet/chain.go , ziface/ichain.go
参数说明
"Name": "zin-ws -------gitxuzan",
"Host": "127.0.0.1",
"端口": "端口",
"TcpPort": 8999,
"最大连接数": "最大连接数",
"MaxConn": 1000,
"最大的包大小": "最大包大小",
"MaxPackageSize": 4096,
"worker池子": "worker池子10个并发处理读的数据",
"WorkerPoolSize": 10
MsgId | len | body |
---|---|---|
协议号ID | body长度 | 二进制body长度 |
uint32 | uint32 | []byte |
wsconfig.SetWSConfig("127.0.0.1", 8999, wsconfig.WithName("gitxuzan ----- websocket"))
还有其他设置例如:
wsconfig.WithWorkerSize(10) // 设置10个worker处理业务逻辑
wsconfig.WithMaxPackSize(4096) // 每个发送的包大小 4k
wsconfig.WithMaxConn(1000) // 同时在线1000个连接
wsconfig.WithVersion() // 自定义本地版本
type LoginInfo struct {
znet.BaseRouter
}
例如上面写的 LoginInfo 继承znet.BaseRouter
重写三个方法依次执行:
PreHandle
Handle
PostHandle
同时要设置router
// 登录
s.AddRouter(1001, &LoginInfo{})
1001 代表协议号,相当于协议投里面的msgId,映射到具体某个业务,发送端需要发送对应的协议号
func (l *LoginInfo) PreHandle(request ziface.IRequest) {
request 中 目前有发送,断开,获取当前属性,获取当前连接
}
package main
import (
wsconfig "github.com/Xuzan9396/zinx-ws/config"
"github.com/Xuzan9396/zinx-ws/ziface"
"github.com/Xuzan9396/zinx-ws/znet"
"log"
"time"
)
func init() {
log.SetFlags(log.Lshortfile | log.LstdFlags)
}
type LoginInfo struct {
znet.BaseRouter
}
// 模拟登录逻辑
func (l *LoginInfo) PreHandle(request ziface.IRequest) {
auth := false
<-time.After(5 * time.Second) // 模拟业务
if auth == false {
// 模拟登录认证失败,然后断开连接
request.GetConnetion().Stop()
}
}
type PingInfo struct {
znet.BaseRouter
}
type HelloInfo struct {
znet.BaseRouter
}
func (p *PingInfo) PreHandle(request ziface.IRequest) {
log.Printf("pre:%s,conntId:%d,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID())
}
func (p *PingInfo) Handle(request ziface.IRequest) {
log.Printf("Handle:%s,conntId:%d,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID())
}
func (p *PingInfo) PostHandle(request ziface.IRequest) {
log.Printf("post:%s,conntId:%d,,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID())
request.GetConnetion().SendMsg(request.GetMsgID(), []byte("回复ping!"))
}
func (p *HelloInfo) PreHandle(request ziface.IRequest) {
log.Printf("pre:%s,conntId:%d,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID())
}
func (p *HelloInfo) Handle(request ziface.IRequest) {
log.Printf("Handle:%s,conntId:%d,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID())
}
func (p *HelloInfo) PostHandle(request ziface.IRequest) {
log.Printf("post:%s,conntId:%d,,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID())
request.GetConnetion().SendMsg(request.GetMsgID(), []byte("回复hello!"))
}
// 创建链接后初始化函数
func SetOnConnetStart(conn ziface.IConnection) {
conn.SetProperty("name", "xuzan")
res, bools := conn.GetProperty("name")
if bools {
log.Println("name", res.(string))
}
conn.RemoveProperty("name")
}
func GetConnectNum(s ziface.IServer) {
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
connNumTotal := s.GetConnMgr().Len()
log.Println("连接数量:", connNumTotal)
}
}
}()
}
func main() {
//设置配置
wsconfig.SetWSConfig("127.0.0.1", 8999, wsconfig.WithName("gitxuzan ----- websocket"))
// 创建一个server 句柄
s := znet.NewServer()
// 启动sever
s.SetOnConnStart(SetOnConnetStart)
// 测试业务
s.AddRouter(1, &HelloInfo{})
// 其他业务
s.AddRouter(2, &PingInfo{})
// 登录
s.AddRouter(1001, &LoginInfo{})
// 监控长连接数量
GetConnectNum(s)
s.Server()
}
package main
import (
"flag"
"github.com/Xuzan9396/zinx-ws/znet"
"github.com/gorilla/websocket"
"log"
"net/http"
"net/url"
"os"
"os/signal"
"time"
)
var addr = flag.String("addr", "127.0.0.1:8999", "http service address")
func main() {
flag.Parse()
log.SetFlags(0)
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
u := url.URL{Scheme: "ws", Host: *addr, Path: "/"}
log.Printf("connecting to %s", u.String())
c, _, err := websocket.DefaultDialer.Dial(u.String(), http.Header{"User-Agent": {""}})
if err != nil {
log.Fatal("dial:", err)
}
defer c.Close()
log.Println("ws 连接成功")
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
p := znet.NewDataPack()
by := []byte{'h', 'e', 'l', 'l', 'o'}
resBytes, err := p.Pack(&znet.Message{
Id: 1,
DataLen: uint32(len(by)),
Data: by,
})
byPing := []byte("ping")
resPingBytes, _ := p.Pack(&znet.Message{
Id: 2,
DataLen: uint32(len(byPing)),
Data: byPing,
})
timer := time.NewTimer(30 * time.Second)
go read(c)
for {
select {
case <-timer.C:
// 模拟认证登录
sendMsg := []byte("login")
sendMsgPack, _ := p.Pack(&znet.Message{
Id: 1001,
DataLen: uint32(len(sendMsg)),
Data: sendMsg,
})
err := c.WriteMessage(websocket.BinaryMessage, sendMsgPack)
if err != nil {
log.Println("write:", err)
timer.Stop()
return
}
log.Println("login写入成功:", string(sendMsg))
timer.Stop()
case <-ticker.C:
sendMsg := resBytes
err := c.WriteMessage(websocket.BinaryMessage, sendMsg)
if err != nil {
log.Println("write:", err)
return
}
log.Println("写入成功:", string(by))
err = c.WriteMessage(websocket.BinaryMessage, resPingBytes)
if err != nil {
log.Println("write:", err)
return
}
log.Println("写入成功:", string(resPingBytes))
case <-interrupt:
log.Println("interrupt")
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Println("write close:", err)
return
}
}
}
}
func read(c *websocket.Conn) {
for {
_, message, err := c.ReadMessage()
if err != nil {
log.Println("read:", err)
return
}
p := znet.NewDataPack()
img, err := p.Unpack(message)
if err != nil {
log.Println("read:", err)
return
}
log.Printf("msgId:%d,recv: %s", img.GetMsgId(), img.GetData())
}
}