From 679728c6bc06b8cb5c6809678c316a441fbf3b6e Mon Sep 17 00:00:00 2001 From: liuxiaobo <1224730913@qq.com> Date: Mon, 2 Jun 2025 16:51:07 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E9=99=A4=E8=B0=83=E8=AF=95=E6=97=A5?= =?UTF-8?q?=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ws/wsConn.go | 46 ++++++++++++++++++++++++++++++---------------- ws/wsServer.go | 4 ++-- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/ws/wsConn.go b/ws/wsConn.go index 1e5526c..fec1d7b 100644 --- a/ws/wsConn.go +++ b/ws/wsConn.go @@ -55,7 +55,7 @@ func (c *wsConnect) SendMsg(data []byte) error { // 关闭链接 func (c *wsConnect) Close() { c.once.Do(func() { - log.Debug(c.Log("关闭链接")) + //log.Debug(c.Log("关闭链接")) _ = c.wsConn.WriteMessage(websocket.CloseMessage, []byte{}) c.inChan.Close() c.outChan.Close() @@ -69,10 +69,15 @@ func (c *wsConnect) Close() { // 循环从websocket中读取消息放入到读队列中 func (c *wsConnect) readWsLoop() { + defer func() { + //log.Debug(c.Log("readWsLoop协程退出")) + c.Close() + }() + c.wsConn.SetReadLimit(maxMessageSize) _ = c.wsConn.SetReadDeadline(time.Now().Add(pongWait)) c.wsConn.SetPongHandler(func(string) error { - log.Debug(c.Log("received pong. from client")) + //log.Debug(c.Log("received pong. from client")) _ = c.wsConn.SetReadDeadline(time.Now().Add(pongWait)) return nil }) @@ -84,8 +89,7 @@ func (c *wsConnect) readWsLoop() { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { log.Error(c.Log("消息读取出现错误:%v", err)) } - log.Debug(c.Log("关闭连接:%v", err)) - c.Close() + //log.Debug(c.Log("关闭连接:%v", err)) return } switch msgType { @@ -105,53 +109,63 @@ func (c *wsConnect) readWsLoop() { // 发送响应关闭帧(必须回传相同状态码) rspMsg := websocket.FormatCloseMessage(code, reason) _ = c.wsConn.WriteControl(websocket.CloseMessage, rspMsg, time.Now().Add(5*time.Second)) - c.Close() + return default: if msgType != wsMsgType { continue } msg := &wsMessage{messageType: msgType, data: data} - _ = c.inChan.Write(msg) + if err = c.inChan.Write(msg); err != nil { + log.Error(c.Log("读队列关闭:%v", err)) + return + } } } } func (c *wsConnect) writeWsLoop() { ticker := time.NewTicker(pingPeriod) - defer ticker.Stop() + defer func() { + //log.Debug(c.Log("writeWsLoop协程退出")) + ticker.Stop() + // 关闭连接 + c.Close() + }() for { select { // 取一个消息发送给客户端 - case msg := <-c.outChan.Reader(): + case msg, ok := <-c.outChan.Reader(): + if !ok { + return + } if err := c.wsConn.WriteMessage(msg.messageType, msg.data); err != nil { iMsg := &ipb.InternalMsg{} _ = proto.Unmarshal(msg.data, iMsg) log.Error(c.Log("发送消息错误:%v 消息长度:%v 消息内容:%v", err, len(msg.data), iMsg)) - // 关闭连接 - c.Close() return } + case <-ticker.C: if err := c.wsConn.WriteMessage(websocket.PingMessage, []byte("ping")); err != nil { log.Error(c.Log("发送心跳失败:%v", err)) - c.Close() return } else { - log.Debug(c.Log("发送心跳")) + //log.Debug(c.Log("发送心跳")) } } } } func (c *wsConnect) handle(process func(IConn, []byte)) { + defer c.Close() for { select { case msg, ok := <-c.inChan.Reader(): - if ok { - process(c, msg.data) - } else { - c.Close() + if !ok { + //log.Debug(c.Log("handle协程退出")) + return } + process(c, msg.data) } } } diff --git a/ws/wsServer.go b/ws/wsServer.go index dc87ce1..34dc36b 100644 --- a/ws/wsServer.go +++ b/ws/wsServer.go @@ -11,7 +11,7 @@ import ( const ( // 允许等待的写入时间 - writeWait = 10 * time.Second + //writeWait = 10 * time.Second // pong间隔时间 pongWait = 60 * time.Second // ping间隔时间 @@ -64,7 +64,7 @@ func (s *WsServer) wsHandle(w http.ResponseWriter, r *http.Request) { } func (s *WsServer) debugGoroutineNum() { - ch := time.Tick(5 * time.Minute) + ch := time.Tick(2 * time.Minute) go func() { for { select {