From 0f27a3aa5b805ae4bd13d21e57a97761ccdc3ce3 Mon Sep 17 00:00:00 2001 From: liuxiaobo <1224730913@qq.com> Date: Wed, 28 May 2025 14:54:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=BF=9E=E6=8E=A5=E5=85=B3?= =?UTF-8?q?=E9=97=AD=E6=94=B9=E4=B8=BA=E9=80=9A=E7=9F=A5=E5=85=B3=E9=97=AD?= =?UTF-8?q?=EF=BC=8C=E7=84=B6=E5=90=8Econn=E5=86=85=E9=83=A8=E5=8D=8F?= =?UTF-8?q?=E7=A8=8B=E8=87=AA=E8=A1=8C=E5=85=B3=E9=97=AD=20=E5=B0=86servic?= =?UTF-8?q?e.Run=E4=BA=A4=E7=94=B1=E5=A4=96=E9=83=A8=E8=87=AA=E8=A1=8C?= =?UTF-8?q?=E8=B0=83=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- service/baseService.go | 2 +- service/natsService_test.go | 8 +++++--- ws/iconn.go | 2 +- ws/wsConn.go | 15 ++++++++++----- ws/wsServer.go | 2 +- 5 files changed, 18 insertions(+), 11 deletions(-) diff --git a/service/baseService.go b/service/baseService.go index b3c3c92..3bdf016 100644 --- a/service/baseService.go +++ b/service/baseService.go @@ -40,7 +40,7 @@ func NewBaseService(type_, name string, onFunc IOnFunc, sender ISender) *BaseSer s.stop, s.stopFunc = context.WithCancel(context.Background()) s.waitStop, s.waitStopFunc = context.WithCancel(context.Background()) - s.Run() + //s.Run() return s } diff --git a/service/natsService_test.go b/service/natsService_test.go index 52bd299..2755804 100644 --- a/service/natsService_test.go +++ b/service/natsService_test.go @@ -65,6 +65,7 @@ func (s *gameService) OnInit() { if err := s.NatsService.QueueSubscribe(GroupTopic(s), GroupQueue(s)); err != nil { log.Error(err.Error()) } + s.Run() log.Debug("onInit") } @@ -87,9 +88,10 @@ func TestGameService(t *testing.T) { if err := s.Send(Topic(s), []byte(msg)); err != nil { log.Error(err.Error()) } - for _, srv := range s.etcdService.GetNodes() { - log.Debug(s.Log("发现有服务:%v", srv)) - } + s.etcdService.GetNodes().Range(func(key, value interface{}) bool { + log.Debug(s.Log("发现有服务:%v", value)) + return true + }) time.Sleep(1 * time.Second) s.NotifyStop() s.WaitStop() diff --git a/ws/iconn.go b/ws/iconn.go index 310edde..130a411 100644 --- a/ws/iconn.go +++ b/ws/iconn.go @@ -1,7 +1,7 @@ package ws type IConn interface { - Close() + NotifyClose() SendMsg(data []byte) error Name() string Id() uint32 diff --git a/ws/wsConn.go b/ws/wsConn.go index ffa2990..2e61726 100644 --- a/ws/wsConn.go +++ b/ws/wsConn.go @@ -73,7 +73,12 @@ func (c *wsConnect) SendMsg(data []byte) error { } // 关闭链接 -func (c *wsConnect) Close() { +func (c *wsConnect) NotifyClose() { + c.closeCh <- struct{}{} +} + +// 关闭链接 +func (c *wsConnect) close() { log.Debug(c.Log("关闭链接")) c.mutex.Lock() defer c.mutex.Unlock() @@ -83,7 +88,7 @@ func (c *wsConnect) Close() { c.onDisconnect(c) } wsMgr.Remove(c) - close(c.closeCh) + //close(c.closeCh) } } @@ -98,7 +103,7 @@ func (c *wsConnect) readWsLoop() { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { log.Error(c.Log("消息读取出现错误:%v", err)) } - c.Close() + c.close() return } switch msgType { @@ -117,7 +122,7 @@ func (c *wsConnect) readWsLoop() { // 发送响应关闭帧(必须回传相同状态码) rspMsg := websocket.FormatCloseMessage(code, reason) _ = c.wsConn.WriteControl(websocket.CloseMessage, rspMsg, time.Now().Add(5*time.Second)) - c.Close() + c.close() default: if msgType != wsMsgType { continue @@ -142,7 +147,7 @@ func (c *wsConnect) writeWsLoop() { if err := c.wsConn.WriteMessage(msg.messageType, msg.data); err != nil { log.Error(c.Log("发送消息错误:%v", err)) // 关闭连接 - c.Close() + c.close() return } case <-c.closeCh: diff --git a/ws/wsServer.go b/ws/wsServer.go index cf6db00..69bce45 100644 --- a/ws/wsServer.go +++ b/ws/wsServer.go @@ -51,7 +51,7 @@ func (s *WsServer) wsHandle(w http.ResponseWriter, r *http.Request) { log.ErrorF("升级到WebSocket失败:%v", err) return } - // defer func() { _ = conn.Close() }() + // defer func() { _ = conn.close() }() nextConnId++ wsConn := newWsConnect(conn, s.onDisconnect) wsMgr.Add(wsConn)