diff --git a/service/natsService.go b/service/natsService.go index 27a8712..49fc68f 100644 --- a/service/natsService.go +++ b/service/natsService.go @@ -77,21 +77,24 @@ func NewNatsService(param *InitNatsServiceParams) (*NatsService, error) { func (n *NatsService) publishUpdateService() { jsonData, _ := json.Marshal(n.node) _ = n.nats.Publish(updateTopic, jsonData) + //log.Debug(n.Log("publishUpdateService:%v", string(jsonData))) } func (n *NatsService) subscribeUpdateService() error { return n.SubscribeCb(updateTopic, func(m *nats.Msg) { var node = &etcd.ServiceNode{} _ = json.Unmarshal(m.Data, node) + //log.Debug(n.Log("发现新节点:%v", node)) // 不是同类服务不处理,是自己发出来的更新,也不处理 if node.Type != n.Type() || node.Name == n.Name() { + //log.Debug(n.Log("与本节点不匹配.本节点:%v", n.node)) return } // 有新服务上线,本服务准备退出 if n.node.Version < node.Version { n.NotifyStop() + log.InfoF("有新服务%v auto exit, initiating shutdown...", n.Name()) n.WaitStop() - log.InfoF("%v auto exit, initiating shutdown...", n.Name()) os.Exit(0) } }) diff --git a/ws/wsClient.go b/ws/wsClient.go index 95065b8..22d4812 100644 --- a/ws/wsClient.go +++ b/ws/wsClient.go @@ -4,11 +4,10 @@ import ( "context" "github.com/fox/fox/ksync" "github.com/fox/fox/log" + "github.com/gorilla/websocket" "net/http" "sync" "time" - - "github.com/gorilla/websocket" ) type IOnFunc interface { @@ -44,23 +43,28 @@ func NewClient(url string, onFunc IOnFunc) (*Client, error) { } func (c *Client) Start() { - c.wg.Add(3) + c.wg.Add(2) ksync.GoSafe(c.readLoop, nil) ksync.GoSafe(c.writeLoop, nil) ksync.GoSafe(c.heartbeatLoop, nil) } +/* +readLoop暂时没有好的办法及时退出协程,c.conn.ReadMessage()是阻塞式,导致协程无法及时catch到关闭信号 +如果在ReadMessage前调用SetReadDeadline设置超时,它会在超时后将底部连接状态标记为已损坏,后续ReadMessage会触发崩溃 +*/ func (c *Client) readLoop() { - defer c.wg.Done() + //defer c.wg.Done() for { select { case <-c.ctx.Done(): + log.Debug("readLoop 收到关闭信号") return default: messageType, message, err := c.conn.ReadMessage() if err != nil { - // log.Error(fmt.Sprintf("读取错误:%v", err)) - c.Stop() + //log.Error(fmt.Sprintf("读取错误:%v", err)) + c.NotifyStop() return } @@ -74,7 +78,7 @@ func (c *Client) readLoop() { case websocket.CloseMessage: log.Debug("收到关闭帧") - c.Stop() + c.NotifyStop() return } } @@ -99,6 +103,7 @@ func (c *Client) writeLoop() { _ = c.conn.WriteMessage(msg.messageType, msg.data) } case <-c.ctx.Done(): + //log.Debug("writeLoop 收到关闭信号") // 发送关闭帧 _ = c.conn.WriteControl( websocket.CloseMessage, @@ -120,13 +125,17 @@ func (c *Client) heartbeatLoop() { case <-ticker.C: c.sendChan <- &wsMessage{messageType: websocket.PingMessage, data: []byte("ping")} case <-c.ctx.Done(): + //log.Debug("heartbeatLoop 收到关闭信号") return } } } -func (c *Client) Stop() { - c.cancel() - _ = c.conn.Close() +func (c *Client) WaitStop() { c.wg.Wait() } + +func (c *Client) NotifyStop() { + c.cancel() + _ = c.conn.Close() +}