From a8bb1d01b4ba6c541aa294298a4941ecd75dd94d Mon Sep 17 00:00:00 2001 From: liuxiaobo <1224730913@qq.com> Date: Mon, 2 Jun 2025 15:29:03 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9websocket?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ipb/helper.go | 38 +++++----------- ws/iconn.go | 2 +- ws/wsClient.go | 30 ++++++++++--- ws/wsConn.go | 118 ++++++++++++++++++++----------------------------- 4 files changed, 84 insertions(+), 104 deletions(-) diff --git a/ipb/helper.go b/ipb/helper.go index 6805ad6..9301bc0 100644 --- a/ipb/helper.go +++ b/ipb/helper.go @@ -1,7 +1,5 @@ package ipb -import "sync/atomic" - func MakeMsg(serviceName string, connId uint32, userId int64, msgId int32, msg []byte) *InternalMsg { return &InternalMsg{ ServiceName: serviceName, @@ -12,28 +10,14 @@ func MakeMsg(serviceName string, connId uint32, userId int64, msgId int32, msg [ } } -func MakeRpcMsg(serviceName string, connId uint32, userId int64, msgId int32, msg []byte) *InternalMsg { - return &InternalMsg{ - ServiceName: serviceName, - ConnId: connId, - UserId: userId, - MsgId: msgId, - Msg: msg, - Type: MsgType_RpcMsg, - RetRpcMsgId: genRpcId(), - } -} - -const ( - rpcBeginId = -500000 - rpcEndId = -100000 -) - -var rpcId int32 - -func genRpcId() int32 { - if atomic.LoadInt32(&rpcId) > rpcEndId { - atomic.StoreInt32(&rpcId, rpcBeginId) - } - return atomic.AddInt32(&rpcId, 1) -} +//func MakeRpcMsg(serviceName string, connId uint32, userId int64, msgId int32, msg []byte) *InternalMsg { +// return &InternalMsg{ +// ServiceName: serviceName, +// ConnId: connId, +// UserId: userId, +// MsgId: msgId, +// Msg: msg, +// Type: MsgType_RpcMsg, +// RetRpcMsgId: genRpcId(), +// } +//} diff --git a/ws/iconn.go b/ws/iconn.go index 6f981ad..97faf9f 100644 --- a/ws/iconn.go +++ b/ws/iconn.go @@ -2,7 +2,7 @@ package ws type IConn interface { Addr() string - NotifyClose() + Close() SendMsg(data []byte) error Name() string Id() uint32 diff --git a/ws/wsClient.go b/ws/wsClient.go index 22d4812..dfaacd9 100644 --- a/ws/wsClient.go +++ b/ws/wsClient.go @@ -2,6 +2,7 @@ package ws import ( "context" + "fmt" "github.com/fox/fox/ksync" "github.com/fox/fox/log" "github.com/gorilla/websocket" @@ -21,6 +22,7 @@ type Client struct { cancel context.CancelFunc wg sync.WaitGroup onFunc IOnFunc + uid int64 } func NewClient(url string, onFunc IOnFunc) (*Client, error) { @@ -49,35 +51,47 @@ func (c *Client) Start() { ksync.GoSafe(c.heartbeatLoop, nil) } +func (c *Client) Log(format string, v ...interface{}) string { + s := fmt.Sprintf("连接:%v, uid:%v ", c.conn.RemoteAddr().String(), c.uid) + return s + fmt.Sprintf(format, v...) +} + /* readLoop暂时没有好的办法及时退出协程,c.conn.ReadMessage()是阻塞式,导致协程无法及时catch到关闭信号 如果在ReadMessage前调用SetReadDeadline设置超时,它会在超时后将底部连接状态标记为已损坏,后续ReadMessage会触发崩溃 */ func (c *Client) readLoop() { //defer c.wg.Done() + _ = c.conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + // 设置 Pong 处理器 + c.conn.SetPongHandler(func(string) error { + _ = c.conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + return nil + }) for { select { case <-c.ctx.Done(): - log.Debug("readLoop 收到关闭信号") + log.Debug(c.Log("readLoop 收到关闭信号")) return default: messageType, message, err := c.conn.ReadMessage() if err != nil { - //log.Error(fmt.Sprintf("读取错误:%v", err)) + log.Error(c.Log("读取错误:%v", err)) c.NotifyStop() return } switch messageType { case websocket.PingMessage: + log.Debug(c.Log("receive ping message")) c.sendChan <- &wsMessage{messageType: websocket.PongMessage, data: []byte("pong")} case websocket.PongMessage: - + log.Debug(c.Log("receive pong message")) case websocket.TextMessage, websocket.BinaryMessage: _ = c.onFunc.OnMessage(message) case websocket.CloseMessage: - log.Debug("收到关闭帧") + log.Debug(c.Log("收到关闭帧")) c.NotifyStop() return } @@ -96,14 +110,16 @@ func (c *Client) writeLoop() { case msg := <-c.sendChan: switch msg.messageType { case websocket.PingMessage: + log.Debug(c.Log("send ping message")) _ = c.conn.WriteMessage(websocket.PingMessage, []byte("ping")) case websocket.PongMessage: + log.Debug(c.Log("send pong message")) _ = c.conn.WriteMessage(websocket.PongMessage, []byte("pong")) default: _ = c.conn.WriteMessage(msg.messageType, msg.data) } case <-c.ctx.Done(): - //log.Debug("writeLoop 收到关闭信号") + log.Debug(c.Log("writeLoop 收到关闭信号")) // 发送关闭帧 _ = c.conn.WriteControl( websocket.CloseMessage, @@ -139,3 +155,7 @@ func (c *Client) NotifyStop() { c.cancel() _ = c.conn.Close() } + +func (c *Client) SetUid(uid int64) { + c.uid = uid +} diff --git a/ws/wsConn.go b/ws/wsConn.go index af6186b..1e5526c 100644 --- a/ws/wsConn.go +++ b/ws/wsConn.go @@ -3,7 +3,10 @@ package ws import ( "encoding/binary" "fmt" + "github.com/fox/fox/ipb" "github.com/fox/fox/log" + "github.com/fox/fox/safeChan" + "github.com/golang/protobuf/proto" "github.com/gorilla/websocket" "sync" "time" @@ -23,73 +26,45 @@ type wsMessage struct { // 客户端连接 type wsConnect struct { - wsConn *websocket.Conn // 底层websocket - inChan chan *wsMessage // 读队列 - outChan chan *wsMessage // 写队列 - mutex sync.Mutex // 避免重复关闭管道,加锁处理 - isClosed bool - closeCh chan struct{} // 关闭通知 + wsConn *websocket.Conn // 底层websocket + inChan *safeChan.SafeChan[*wsMessage] // 读队列 + outChan *safeChan.SafeChan[*wsMessage] // 写队列 id uint32 userId int64 onDisconnect func(IConn) + once sync.Once } func newWsConnect(wsConn *websocket.Conn, onDisconnect func(IConn)) *wsConnect { - return &wsConnect{ + c := &wsConnect{ wsConn: wsConn, - inChan: make(chan *wsMessage, 1000), - outChan: make(chan *wsMessage, 1000), - closeCh: make(chan struct{}), - isClosed: false, + inChan: safeChan.NewSafeChan[*wsMessage](1000), + outChan: safeChan.NewSafeChan[*wsMessage](1000), id: nextConnId, userId: 0, onDisconnect: onDisconnect, } -} - -// 从读队列读取消息 -func (c *wsConnect) readFromChan() (*wsMessage, error) { - select { - case msg := <-c.inChan: - return msg, nil - case <-c.closeCh: - return nil, fmt.Errorf("连接已关闭") - } -} - -// 把消息放进写队列 -func (c *wsConnect) sendMsg(msgType int, data []byte) error { - select { - case c.outChan <- &wsMessage{messageType: msgType, data: data}: - case <-c.closeCh: - return fmt.Errorf("连接已关闭") - } - return nil + return c } // 把消息放进写队列 func (c *wsConnect) SendMsg(data []byte) error { - return c.sendMsg(wsMsgType, data) + return c.outChan.Write(&wsMessage{messageType: wsMsgType, data: data}) } // 关闭链接 -func (c *wsConnect) NotifyClose() { - c.closeCh <- struct{}{} -} - -// 关闭链接 -func (c *wsConnect) close() { - log.Debug(c.Log("关闭链接")) - c.mutex.Lock() - defer c.mutex.Unlock() - if c.isClosed == false { - c.isClosed = true +func (c *wsConnect) Close() { + c.once.Do(func() { + log.Debug(c.Log("关闭链接")) + _ = c.wsConn.WriteMessage(websocket.CloseMessage, []byte{}) + c.inChan.Close() + c.outChan.Close() + _ = c.wsConn.Close() if c.onDisconnect != nil { c.onDisconnect(c) } wsMgr.Remove(c) - //close(c.closeCh) - } + }) } // 循环从websocket中读取消息放入到读队列中 @@ -109,14 +84,16 @@ func (c *wsConnect) readWsLoop() { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { log.Error(c.Log("消息读取出现错误:%v", err)) } - c.close() + log.Debug(c.Log("关闭连接:%v", err)) + c.Close() return } switch msgType { - case websocket.PingMessage: - _ = c.sendMsg(websocket.PongMessage, []byte("pong")) - case websocket.PongMessage: - log.Debug(c.Log("received pong from client")) + //case websocket.PingMessage: + // log.Debug(c.Log("received ping message")) + // _ = c.sendMsg(websocket.PongMessage, []byte("pong")) + //case websocket.PongMessage: + // log.Debug(c.Log("received pong from client")) case websocket.CloseMessage: code := websocket.CloseNormalClosure reason := "" @@ -128,17 +105,13 @@ func (c *wsConnect) readWsLoop() { // 发送响应关闭帧(必须回传相同状态码) rspMsg := websocket.FormatCloseMessage(code, reason) _ = c.wsConn.WriteControl(websocket.CloseMessage, rspMsg, time.Now().Add(5*time.Second)) - c.close() + c.Close() default: if msgType != wsMsgType { continue } msg := &wsMessage{messageType: msgType, data: data} - select { - case c.inChan <- msg: - case <-c.closeCh: - return - } + _ = c.inChan.Write(msg) } } } @@ -149,20 +122,22 @@ func (c *wsConnect) writeWsLoop() { for { select { // 取一个消息发送给客户端 - case msg := <-c.outChan: + case msg := <-c.outChan.Reader(): if err := c.wsConn.WriteMessage(msg.messageType, msg.data); err != nil { - log.Error(c.Log("发送消息错误:%v", err)) + iMsg := &ipb.InternalMsg{} + _ = proto.Unmarshal(msg.data, iMsg) + log.Error(c.Log("发送消息错误:%v 消息长度:%v 消息内容:%v", err, len(msg.data), iMsg)) // 关闭连接 - c.close() + c.Close() return } - case <-c.closeCh: - // 收到关闭通知 - return case <-ticker.C: - _ = c.wsConn.SetWriteDeadline(time.Now().Add(writeWait)) - if err := c.wsConn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + if err := c.wsConn.WriteMessage(websocket.PingMessage, []byte("ping")); err != nil { + log.Error(c.Log("发送心跳失败:%v", err)) + c.Close() return + } else { + log.Debug(c.Log("发送心跳")) } } } @@ -170,13 +145,14 @@ func (c *wsConnect) writeWsLoop() { func (c *wsConnect) handle(process func(IConn, []byte)) { for { - msg, err := c.readFromChan() - if err != nil { - // log.Error(c.Log("获取消息错误:%v", err)) - break + select { + case msg, ok := <-c.inChan.Reader(): + if ok { + process(c, msg.data) + } else { + c.Close() + } } - // Log.Debug(c.Log("接收消息:%v", msg.data)) - process(c, msg.data) } } @@ -204,6 +180,6 @@ func (c *wsConnect) Addr() string { } func (c *wsConnect) Log(format string, v ...interface{}) string { - s := fmt.Sprintf("连接:%v, id:%v ", c.wsConn.RemoteAddr().String(), c.id) + s := fmt.Sprintf("连接:%v, id:%v uid:%v", c.wsConn.RemoteAddr().String(), c.id, c.userId) return s + fmt.Sprintf(format, v...) }