修复client收到exit信号无法及时退出程序

This commit is contained in:
liuxiaobo 2025-05-31 01:12:45 +08:00
parent 15ad096dfa
commit 1de3a11c56
2 changed files with 23 additions and 11 deletions

View File

@ -77,21 +77,24 @@ func NewNatsService(param *InitNatsServiceParams) (*NatsService, error) {
func (n *NatsService) publishUpdateService() {
jsonData, _ := json.Marshal(n.node)
_ = n.nats.Publish(updateTopic, jsonData)
//log.Debug(n.Log("publishUpdateService:%v", string(jsonData)))
}
func (n *NatsService) subscribeUpdateService() error {
return n.SubscribeCb(updateTopic, func(m *nats.Msg) {
var node = &etcd.ServiceNode{}
_ = json.Unmarshal(m.Data, node)
//log.Debug(n.Log("发现新节点:%v", node))
// 不是同类服务不处理,是自己发出来的更新,也不处理
if node.Type != n.Type() || node.Name == n.Name() {
//log.Debug(n.Log("与本节点不匹配.本节点:%v", n.node))
return
}
// 有新服务上线,本服务准备退出
if n.node.Version < node.Version {
n.NotifyStop()
log.InfoF("有新服务%v auto exit, initiating shutdown...", n.Name())
n.WaitStop()
log.InfoF("%v auto exit, initiating shutdown...", n.Name())
os.Exit(0)
}
})

View File

@ -4,11 +4,10 @@ import (
"context"
"github.com/fox/fox/ksync"
"github.com/fox/fox/log"
"github.com/gorilla/websocket"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
type IOnFunc interface {
@ -44,23 +43,28 @@ func NewClient(url string, onFunc IOnFunc) (*Client, error) {
}
func (c *Client) Start() {
c.wg.Add(3)
c.wg.Add(2)
ksync.GoSafe(c.readLoop, nil)
ksync.GoSafe(c.writeLoop, nil)
ksync.GoSafe(c.heartbeatLoop, nil)
}
/*
readLoop暂时没有好的办法及时退出协程c.conn.ReadMessage()是阻塞式导致协程无法及时catch到关闭信号
如果在ReadMessage前调用SetReadDeadline设置超时它会在超时后将底部连接状态标记为已损坏后续ReadMessage会触发崩溃
*/
func (c *Client) readLoop() {
defer c.wg.Done()
//defer c.wg.Done()
for {
select {
case <-c.ctx.Done():
log.Debug("readLoop 收到关闭信号")
return
default:
messageType, message, err := c.conn.ReadMessage()
if err != nil {
// log.Error(fmt.Sprintf("读取错误:%v", err))
c.Stop()
//log.Error(fmt.Sprintf("读取错误:%v", err))
c.NotifyStop()
return
}
@ -74,7 +78,7 @@ func (c *Client) readLoop() {
case websocket.CloseMessage:
log.Debug("收到关闭帧")
c.Stop()
c.NotifyStop()
return
}
}
@ -99,6 +103,7 @@ func (c *Client) writeLoop() {
_ = c.conn.WriteMessage(msg.messageType, msg.data)
}
case <-c.ctx.Done():
//log.Debug("writeLoop 收到关闭信号")
// 发送关闭帧
_ = c.conn.WriteControl(
websocket.CloseMessage,
@ -120,13 +125,17 @@ func (c *Client) heartbeatLoop() {
case <-ticker.C:
c.sendChan <- &wsMessage{messageType: websocket.PingMessage, data: []byte("ping")}
case <-c.ctx.Done():
//log.Debug("heartbeatLoop 收到关闭信号")
return
}
}
}
func (c *Client) Stop() {
c.cancel()
_ = c.conn.Close()
func (c *Client) WaitStop() {
c.wg.Wait()
}
func (c *Client) NotifyStop() {
c.cancel()
_ = c.conn.Close()
}