修改websocket

This commit is contained in:
liuxiaobo 2025-06-02 15:29:03 +08:00
parent 61e26c7e94
commit a8bb1d01b4
4 changed files with 84 additions and 104 deletions

View File

@ -1,7 +1,5 @@
package ipb
import "sync/atomic"
func MakeMsg(serviceName string, connId uint32, userId int64, msgId int32, msg []byte) *InternalMsg {
return &InternalMsg{
ServiceName: serviceName,
@ -12,28 +10,14 @@ func MakeMsg(serviceName string, connId uint32, userId int64, msgId int32, msg [
}
}
func MakeRpcMsg(serviceName string, connId uint32, userId int64, msgId int32, msg []byte) *InternalMsg {
return &InternalMsg{
ServiceName: serviceName,
ConnId: connId,
UserId: userId,
MsgId: msgId,
Msg: msg,
Type: MsgType_RpcMsg,
RetRpcMsgId: genRpcId(),
}
}
const (
rpcBeginId = -500000
rpcEndId = -100000
)
var rpcId int32
func genRpcId() int32 {
if atomic.LoadInt32(&rpcId) > rpcEndId {
atomic.StoreInt32(&rpcId, rpcBeginId)
}
return atomic.AddInt32(&rpcId, 1)
}
//func MakeRpcMsg(serviceName string, connId uint32, userId int64, msgId int32, msg []byte) *InternalMsg {
// return &InternalMsg{
// ServiceName: serviceName,
// ConnId: connId,
// UserId: userId,
// MsgId: msgId,
// Msg: msg,
// Type: MsgType_RpcMsg,
// RetRpcMsgId: genRpcId(),
// }
//}

View File

@ -2,7 +2,7 @@ package ws
type IConn interface {
Addr() string
NotifyClose()
Close()
SendMsg(data []byte) error
Name() string
Id() uint32

View File

@ -2,6 +2,7 @@ package ws
import (
"context"
"fmt"
"github.com/fox/fox/ksync"
"github.com/fox/fox/log"
"github.com/gorilla/websocket"
@ -21,6 +22,7 @@ type Client struct {
cancel context.CancelFunc
wg sync.WaitGroup
onFunc IOnFunc
uid int64
}
func NewClient(url string, onFunc IOnFunc) (*Client, error) {
@ -49,35 +51,47 @@ func (c *Client) Start() {
ksync.GoSafe(c.heartbeatLoop, nil)
}
func (c *Client) Log(format string, v ...interface{}) string {
s := fmt.Sprintf("连接:%v, uid:%v ", c.conn.RemoteAddr().String(), c.uid)
return s + fmt.Sprintf(format, v...)
}
/*
readLoop暂时没有好的办法及时退出协程c.conn.ReadMessage()是阻塞式导致协程无法及时catch到关闭信号
如果在ReadMessage前调用SetReadDeadline设置超时它会在超时后将底部连接状态标记为已损坏后续ReadMessage会触发崩溃
*/
func (c *Client) readLoop() {
//defer c.wg.Done()
_ = c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
// 设置 Pong 处理器
c.conn.SetPongHandler(func(string) error {
_ = c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
select {
case <-c.ctx.Done():
log.Debug("readLoop 收到关闭信号")
log.Debug(c.Log("readLoop 收到关闭信号"))
return
default:
messageType, message, err := c.conn.ReadMessage()
if err != nil {
//log.Error(fmt.Sprintf("读取错误:%v", err))
log.Error(c.Log("读取错误:%v", err))
c.NotifyStop()
return
}
switch messageType {
case websocket.PingMessage:
log.Debug(c.Log("receive ping message"))
c.sendChan <- &wsMessage{messageType: websocket.PongMessage, data: []byte("pong")}
case websocket.PongMessage:
log.Debug(c.Log("receive pong message"))
case websocket.TextMessage, websocket.BinaryMessage:
_ = c.onFunc.OnMessage(message)
case websocket.CloseMessage:
log.Debug("收到关闭帧")
log.Debug(c.Log("收到关闭帧"))
c.NotifyStop()
return
}
@ -96,14 +110,16 @@ func (c *Client) writeLoop() {
case msg := <-c.sendChan:
switch msg.messageType {
case websocket.PingMessage:
log.Debug(c.Log("send ping message"))
_ = c.conn.WriteMessage(websocket.PingMessage, []byte("ping"))
case websocket.PongMessage:
log.Debug(c.Log("send pong message"))
_ = c.conn.WriteMessage(websocket.PongMessage, []byte("pong"))
default:
_ = c.conn.WriteMessage(msg.messageType, msg.data)
}
case <-c.ctx.Done():
//log.Debug("writeLoop 收到关闭信号")
log.Debug(c.Log("writeLoop 收到关闭信号"))
// 发送关闭帧
_ = c.conn.WriteControl(
websocket.CloseMessage,
@ -139,3 +155,7 @@ func (c *Client) NotifyStop() {
c.cancel()
_ = c.conn.Close()
}
func (c *Client) SetUid(uid int64) {
c.uid = uid
}

View File

@ -3,7 +3,10 @@ package ws
import (
"encoding/binary"
"fmt"
"github.com/fox/fox/ipb"
"github.com/fox/fox/log"
"github.com/fox/fox/safeChan"
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
"sync"
"time"
@ -24,72 +27,44 @@ type wsMessage struct {
// 客户端连接
type wsConnect struct {
wsConn *websocket.Conn // 底层websocket
inChan chan *wsMessage // 读队列
outChan chan *wsMessage // 写队列
mutex sync.Mutex // 避免重复关闭管道,加锁处理
isClosed bool
closeCh chan struct{} // 关闭通知
inChan *safeChan.SafeChan[*wsMessage] // 读队列
outChan *safeChan.SafeChan[*wsMessage] // 写队列
id uint32
userId int64
onDisconnect func(IConn)
once sync.Once
}
func newWsConnect(wsConn *websocket.Conn, onDisconnect func(IConn)) *wsConnect {
return &wsConnect{
c := &wsConnect{
wsConn: wsConn,
inChan: make(chan *wsMessage, 1000),
outChan: make(chan *wsMessage, 1000),
closeCh: make(chan struct{}),
isClosed: false,
inChan: safeChan.NewSafeChan[*wsMessage](1000),
outChan: safeChan.NewSafeChan[*wsMessage](1000),
id: nextConnId,
userId: 0,
onDisconnect: onDisconnect,
}
}
// 从读队列读取消息
func (c *wsConnect) readFromChan() (*wsMessage, error) {
select {
case msg := <-c.inChan:
return msg, nil
case <-c.closeCh:
return nil, fmt.Errorf("连接已关闭")
}
}
// 把消息放进写队列
func (c *wsConnect) sendMsg(msgType int, data []byte) error {
select {
case c.outChan <- &wsMessage{messageType: msgType, data: data}:
case <-c.closeCh:
return fmt.Errorf("连接已关闭")
}
return nil
return c
}
// 把消息放进写队列
func (c *wsConnect) SendMsg(data []byte) error {
return c.sendMsg(wsMsgType, data)
return c.outChan.Write(&wsMessage{messageType: wsMsgType, data: data})
}
// 关闭链接
func (c *wsConnect) NotifyClose() {
c.closeCh <- struct{}{}
}
// 关闭链接
func (c *wsConnect) close() {
func (c *wsConnect) Close() {
c.once.Do(func() {
log.Debug(c.Log("关闭链接"))
c.mutex.Lock()
defer c.mutex.Unlock()
if c.isClosed == false {
c.isClosed = true
_ = c.wsConn.WriteMessage(websocket.CloseMessage, []byte{})
c.inChan.Close()
c.outChan.Close()
_ = c.wsConn.Close()
if c.onDisconnect != nil {
c.onDisconnect(c)
}
wsMgr.Remove(c)
//close(c.closeCh)
}
})
}
// 循环从websocket中读取消息放入到读队列中
@ -109,14 +84,16 @@ func (c *wsConnect) readWsLoop() {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) {
log.Error(c.Log("消息读取出现错误:%v", err))
}
c.close()
log.Debug(c.Log("关闭连接:%v", err))
c.Close()
return
}
switch msgType {
case websocket.PingMessage:
_ = c.sendMsg(websocket.PongMessage, []byte("pong"))
case websocket.PongMessage:
log.Debug(c.Log("received pong from client"))
//case websocket.PingMessage:
// log.Debug(c.Log("received ping message"))
// _ = c.sendMsg(websocket.PongMessage, []byte("pong"))
//case websocket.PongMessage:
// log.Debug(c.Log("received pong from client"))
case websocket.CloseMessage:
code := websocket.CloseNormalClosure
reason := ""
@ -128,17 +105,13 @@ func (c *wsConnect) readWsLoop() {
// 发送响应关闭帧(必须回传相同状态码)
rspMsg := websocket.FormatCloseMessage(code, reason)
_ = c.wsConn.WriteControl(websocket.CloseMessage, rspMsg, time.Now().Add(5*time.Second))
c.close()
c.Close()
default:
if msgType != wsMsgType {
continue
}
msg := &wsMessage{messageType: msgType, data: data}
select {
case c.inChan <- msg:
case <-c.closeCh:
return
}
_ = c.inChan.Write(msg)
}
}
}
@ -149,20 +122,22 @@ func (c *wsConnect) writeWsLoop() {
for {
select {
// 取一个消息发送给客户端
case msg := <-c.outChan:
case msg := <-c.outChan.Reader():
if err := c.wsConn.WriteMessage(msg.messageType, msg.data); err != nil {
log.Error(c.Log("发送消息错误:%v", err))
iMsg := &ipb.InternalMsg{}
_ = proto.Unmarshal(msg.data, iMsg)
log.Error(c.Log("发送消息错误:%v 消息长度:%v 消息内容:%v", err, len(msg.data), iMsg))
// 关闭连接
c.close()
c.Close()
return
}
case <-c.closeCh:
// 收到关闭通知
return
case <-ticker.C:
_ = c.wsConn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.wsConn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
if err := c.wsConn.WriteMessage(websocket.PingMessage, []byte("ping")); err != nil {
log.Error(c.Log("发送心跳失败:%v", err))
c.Close()
return
} else {
log.Debug(c.Log("发送心跳"))
}
}
}
@ -170,13 +145,14 @@ func (c *wsConnect) writeWsLoop() {
func (c *wsConnect) handle(process func(IConn, []byte)) {
for {
msg, err := c.readFromChan()
if err != nil {
// log.Error(c.Log("获取消息错误:%v", err))
break
}
// Log.Debug(c.Log("接收消息:%v", msg.data))
select {
case msg, ok := <-c.inChan.Reader():
if ok {
process(c, msg.data)
} else {
c.Close()
}
}
}
}
@ -204,6 +180,6 @@ func (c *wsConnect) Addr() string {
}
func (c *wsConnect) Log(format string, v ...interface{}) string {
s := fmt.Sprintf("连接:%v, id:%v ", c.wsConn.RemoteAddr().String(), c.id)
s := fmt.Sprintf("连接:%v, id:%v uid:%v", c.wsConn.RemoteAddr().String(), c.id, c.userId)
return s + fmt.Sprintf(format, v...)
}