diff --git a/service/baseService.go b/service/baseService.go index b3c3c92..3bdf016 100644 --- a/service/baseService.go +++ b/service/baseService.go @@ -40,7 +40,7 @@ func NewBaseService(type_, name string, onFunc IOnFunc, sender ISender) *BaseSer s.stop, s.stopFunc = context.WithCancel(context.Background()) s.waitStop, s.waitStopFunc = context.WithCancel(context.Background()) - s.Run() + //s.Run() return s } diff --git a/service/natsService_test.go b/service/natsService_test.go index 52bd299..2755804 100644 --- a/service/natsService_test.go +++ b/service/natsService_test.go @@ -65,6 +65,7 @@ func (s *gameService) OnInit() { if err := s.NatsService.QueueSubscribe(GroupTopic(s), GroupQueue(s)); err != nil { log.Error(err.Error()) } + s.Run() log.Debug("onInit") } @@ -87,9 +88,10 @@ func TestGameService(t *testing.T) { if err := s.Send(Topic(s), []byte(msg)); err != nil { log.Error(err.Error()) } - for _, srv := range s.etcdService.GetNodes() { - log.Debug(s.Log("发现有服务:%v", srv)) - } + s.etcdService.GetNodes().Range(func(key, value interface{}) bool { + log.Debug(s.Log("发现有服务:%v", value)) + return true + }) time.Sleep(1 * time.Second) s.NotifyStop() s.WaitStop() diff --git a/ws/iconn.go b/ws/iconn.go index 310edde..130a411 100644 --- a/ws/iconn.go +++ b/ws/iconn.go @@ -1,7 +1,7 @@ package ws type IConn interface { - Close() + NotifyClose() SendMsg(data []byte) error Name() string Id() uint32 diff --git a/ws/wsConn.go b/ws/wsConn.go index ffa2990..2e61726 100644 --- a/ws/wsConn.go +++ b/ws/wsConn.go @@ -73,7 +73,12 @@ func (c *wsConnect) SendMsg(data []byte) error { } // 关闭链接 -func (c *wsConnect) Close() { +func (c *wsConnect) NotifyClose() { + c.closeCh <- struct{}{} +} + +// 关闭链接 +func (c *wsConnect) close() { log.Debug(c.Log("关闭链接")) c.mutex.Lock() defer c.mutex.Unlock() @@ -83,7 +88,7 @@ func (c *wsConnect) Close() { c.onDisconnect(c) } wsMgr.Remove(c) - close(c.closeCh) + //close(c.closeCh) } } @@ -98,7 +103,7 @@ func (c *wsConnect) readWsLoop() { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { log.Error(c.Log("消息读取出现错误:%v", err)) } - c.Close() + c.close() return } switch msgType { @@ -117,7 +122,7 @@ func (c *wsConnect) readWsLoop() { // 发送响应关闭帧(必须回传相同状态码) rspMsg := websocket.FormatCloseMessage(code, reason) _ = c.wsConn.WriteControl(websocket.CloseMessage, rspMsg, time.Now().Add(5*time.Second)) - c.Close() + c.close() default: if msgType != wsMsgType { continue @@ -142,7 +147,7 @@ func (c *wsConnect) writeWsLoop() { if err := c.wsConn.WriteMessage(msg.messageType, msg.data); err != nil { log.Error(c.Log("发送消息错误:%v", err)) // 关闭连接 - c.Close() + c.close() return } case <-c.closeCh: diff --git a/ws/wsServer.go b/ws/wsServer.go index cf6db00..69bce45 100644 --- a/ws/wsServer.go +++ b/ws/wsServer.go @@ -51,7 +51,7 @@ func (s *WsServer) wsHandle(w http.ResponseWriter, r *http.Request) { log.ErrorF("升级到WebSocket失败:%v", err) return } - // defer func() { _ = conn.Close() }() + // defer func() { _ = conn.close() }() nextConnId++ wsConn := newWsConnect(conn, s.onDisconnect) wsMgr.Add(wsConn)