修改连接关闭改为通知关闭,然后conn内部协程自行关闭

将service.Run交由外部自行调用
This commit is contained in:
liuxiaobo 2025-05-28 14:54:19 +08:00
parent 92d0dcef62
commit 0f27a3aa5b
5 changed files with 18 additions and 11 deletions

View File

@ -40,7 +40,7 @@ func NewBaseService(type_, name string, onFunc IOnFunc, sender ISender) *BaseSer
s.stop, s.stopFunc = context.WithCancel(context.Background()) s.stop, s.stopFunc = context.WithCancel(context.Background())
s.waitStop, s.waitStopFunc = context.WithCancel(context.Background()) s.waitStop, s.waitStopFunc = context.WithCancel(context.Background())
s.Run() //s.Run()
return s return s
} }

View File

@ -65,6 +65,7 @@ func (s *gameService) OnInit() {
if err := s.NatsService.QueueSubscribe(GroupTopic(s), GroupQueue(s)); err != nil { if err := s.NatsService.QueueSubscribe(GroupTopic(s), GroupQueue(s)); err != nil {
log.Error(err.Error()) log.Error(err.Error())
} }
s.Run()
log.Debug("onInit") log.Debug("onInit")
} }
@ -87,9 +88,10 @@ func TestGameService(t *testing.T) {
if err := s.Send(Topic(s), []byte(msg)); err != nil { if err := s.Send(Topic(s), []byte(msg)); err != nil {
log.Error(err.Error()) log.Error(err.Error())
} }
for _, srv := range s.etcdService.GetNodes() { s.etcdService.GetNodes().Range(func(key, value interface{}) bool {
log.Debug(s.Log("发现有服务:%v", srv)) log.Debug(s.Log("发现有服务:%v", value))
} return true
})
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
s.NotifyStop() s.NotifyStop()
s.WaitStop() s.WaitStop()

View File

@ -1,7 +1,7 @@
package ws package ws
type IConn interface { type IConn interface {
Close() NotifyClose()
SendMsg(data []byte) error SendMsg(data []byte) error
Name() string Name() string
Id() uint32 Id() uint32

View File

@ -73,7 +73,12 @@ func (c *wsConnect) SendMsg(data []byte) error {
} }
// 关闭链接 // 关闭链接
func (c *wsConnect) Close() { func (c *wsConnect) NotifyClose() {
c.closeCh <- struct{}{}
}
// 关闭链接
func (c *wsConnect) close() {
log.Debug(c.Log("关闭链接")) log.Debug(c.Log("关闭链接"))
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()
@ -83,7 +88,7 @@ func (c *wsConnect) Close() {
c.onDisconnect(c) c.onDisconnect(c)
} }
wsMgr.Remove(c) wsMgr.Remove(c)
close(c.closeCh) //close(c.closeCh)
} }
} }
@ -98,7 +103,7 @@ func (c *wsConnect) readWsLoop() {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) {
log.Error(c.Log("消息读取出现错误:%v", err)) log.Error(c.Log("消息读取出现错误:%v", err))
} }
c.Close() c.close()
return return
} }
switch msgType { switch msgType {
@ -117,7 +122,7 @@ func (c *wsConnect) readWsLoop() {
// 发送响应关闭帧(必须回传相同状态码) // 发送响应关闭帧(必须回传相同状态码)
rspMsg := websocket.FormatCloseMessage(code, reason) rspMsg := websocket.FormatCloseMessage(code, reason)
_ = c.wsConn.WriteControl(websocket.CloseMessage, rspMsg, time.Now().Add(5*time.Second)) _ = c.wsConn.WriteControl(websocket.CloseMessage, rspMsg, time.Now().Add(5*time.Second))
c.Close() c.close()
default: default:
if msgType != wsMsgType { if msgType != wsMsgType {
continue continue
@ -142,7 +147,7 @@ func (c *wsConnect) writeWsLoop() {
if err := c.wsConn.WriteMessage(msg.messageType, msg.data); err != nil { if err := c.wsConn.WriteMessage(msg.messageType, msg.data); err != nil {
log.Error(c.Log("发送消息错误:%v", err)) log.Error(c.Log("发送消息错误:%v", err))
// 关闭连接 // 关闭连接
c.Close() c.close()
return return
} }
case <-c.closeCh: case <-c.closeCh:

View File

@ -51,7 +51,7 @@ func (s *WsServer) wsHandle(w http.ResponseWriter, r *http.Request) {
log.ErrorF("升级到WebSocket失败:%v", err) log.ErrorF("升级到WebSocket失败:%v", err)
return return
} }
// defer func() { _ = conn.Close() }() // defer func() { _ = conn.close() }()
nextConnId++ nextConnId++
wsConn := newWsConnect(conn, s.onDisconnect) wsConn := newWsConnect(conn, s.onDisconnect)
wsMgr.Add(wsConn) wsMgr.Add(wsConn)