fox/ws/wsClient.go

162 lines
3.8 KiB
Go
Raw Normal View History

2025-05-25 20:02:15 +08:00
package ws
import (
"context"
2025-06-02 15:29:03 +08:00
"fmt"
2025-05-25 20:02:15 +08:00
"github.com/fox/fox/ksync"
"github.com/fox/fox/log"
"github.com/gorilla/websocket"
2025-05-25 20:02:15 +08:00
"net/http"
"sync"
"time"
)
2025-05-29 20:30:18 +08:00
type IOnFunc interface {
OnMessage(msg []byte) error
}
2025-05-25 20:02:15 +08:00
type Client struct {
conn *websocket.Conn
sendChan chan *wsMessage
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
2025-05-29 20:30:18 +08:00
onFunc IOnFunc
2025-06-02 15:29:03 +08:00
uid int64
2025-05-25 20:02:15 +08:00
}
2025-05-29 20:30:18 +08:00
func NewClient(url string, onFunc IOnFunc) (*Client, error) {
2025-05-25 20:02:15 +08:00
dialer := websocket.DefaultDialer
dialer.HandshakeTimeout = 30 * time.Second
conn, _, err := dialer.Dial(url, http.Header{"User-Agent": {"MyClient/1.0"}})
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
return &Client{
conn: conn,
sendChan: make(chan *wsMessage, 100),
ctx: ctx,
cancel: cancel,
2025-05-29 20:30:18 +08:00
onFunc: onFunc,
2025-05-25 20:02:15 +08:00
}, nil
}
func (c *Client) Start() {
c.wg.Add(2)
2025-05-25 20:02:15 +08:00
ksync.GoSafe(c.readLoop, nil)
ksync.GoSafe(c.writeLoop, nil)
ksync.GoSafe(c.heartbeatLoop, nil)
}
2025-06-02 15:29:03 +08:00
func (c *Client) Log(format string, v ...interface{}) string {
s := fmt.Sprintf("连接:%v, uid:%v ", c.conn.RemoteAddr().String(), c.uid)
return s + fmt.Sprintf(format, v...)
}
/*
readLoop暂时没有好的办法及时退出协程c.conn.ReadMessage()是阻塞式导致协程无法及时catch到关闭信号
如果在ReadMessage前调用SetReadDeadline设置超时它会在超时后将底部连接状态标记为已损坏后续ReadMessage会触发崩溃
*/
2025-05-25 20:02:15 +08:00
func (c *Client) readLoop() {
//defer c.wg.Done()
2025-06-02 15:29:03 +08:00
_ = c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
// 设置 Pong 处理器
c.conn.SetPongHandler(func(string) error {
_ = c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
2025-05-25 20:02:15 +08:00
for {
select {
case <-c.ctx.Done():
2025-06-02 15:29:03 +08:00
log.Debug(c.Log("readLoop 收到关闭信号"))
2025-05-25 20:02:15 +08:00
return
default:
messageType, message, err := c.conn.ReadMessage()
if err != nil {
2025-06-02 15:29:03 +08:00
log.Error(c.Log("读取错误:%v", err))
c.NotifyStop()
2025-05-25 20:02:15 +08:00
return
}
switch messageType {
case websocket.PingMessage:
2025-06-02 15:29:03 +08:00
log.Debug(c.Log("receive ping message"))
2025-05-25 20:02:15 +08:00
c.sendChan <- &wsMessage{messageType: websocket.PongMessage, data: []byte("pong")}
case websocket.PongMessage:
2025-06-02 15:29:03 +08:00
log.Debug(c.Log("receive pong message"))
2025-05-25 20:02:15 +08:00
case websocket.TextMessage, websocket.BinaryMessage:
2025-05-29 20:30:18 +08:00
_ = c.onFunc.OnMessage(message)
2025-05-25 20:02:15 +08:00
case websocket.CloseMessage:
2025-06-02 15:29:03 +08:00
log.Debug(c.Log("收到关闭帧"))
c.NotifyStop()
2025-05-25 20:02:15 +08:00
return
}
}
}
}
func (c *Client) SendMsg(data []byte) {
c.sendChan <- &wsMessage{messageType: websocket.BinaryMessage, data: data}
}
func (c *Client) writeLoop() {
defer c.wg.Done()
for {
select {
case msg := <-c.sendChan:
switch msg.messageType {
case websocket.PingMessage:
2025-06-02 15:29:03 +08:00
log.Debug(c.Log("send ping message"))
2025-05-25 20:02:15 +08:00
_ = c.conn.WriteMessage(websocket.PingMessage, []byte("ping"))
case websocket.PongMessage:
2025-06-02 15:29:03 +08:00
log.Debug(c.Log("send pong message"))
2025-05-25 20:02:15 +08:00
_ = c.conn.WriteMessage(websocket.PongMessage, []byte("pong"))
default:
_ = c.conn.WriteMessage(msg.messageType, msg.data)
}
case <-c.ctx.Done():
2025-06-02 15:29:03 +08:00
log.Debug(c.Log("writeLoop 收到关闭信号"))
2025-05-25 20:02:15 +08:00
// 发送关闭帧
_ = c.conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
time.Now().Add(10*time.Second),
)
return
}
}
}
func (c *Client) heartbeatLoop() {
defer c.wg.Done()
ticker := time.NewTicker(25 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.sendChan <- &wsMessage{messageType: websocket.PingMessage, data: []byte("ping")}
case <-c.ctx.Done():
//log.Debug("heartbeatLoop 收到关闭信号")
2025-05-25 20:02:15 +08:00
return
}
}
}
func (c *Client) WaitStop() {
c.wg.Wait()
}
func (c *Client) NotifyStop() {
2025-05-25 20:02:15 +08:00
c.cancel()
_ = c.conn.Close()
}
2025-06-02 15:29:03 +08:00
func (c *Client) SetUid(uid int64) {
c.uid = uid
}