fox/ws/wsClient.go
2025-05-29 20:30:18 +08:00

133 lines
2.7 KiB
Go

package ws
import (
"context"
"github.com/fox/fox/ksync"
"github.com/fox/fox/log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
type IOnFunc interface {
OnMessage(msg []byte) error
}
type Client struct {
conn *websocket.Conn
sendChan chan *wsMessage
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
onFunc IOnFunc
}
func NewClient(url string, onFunc IOnFunc) (*Client, error) {
dialer := websocket.DefaultDialer
dialer.HandshakeTimeout = 30 * time.Second
conn, _, err := dialer.Dial(url, http.Header{"User-Agent": {"MyClient/1.0"}})
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
return &Client{
conn: conn,
sendChan: make(chan *wsMessage, 100),
ctx: ctx,
cancel: cancel,
onFunc: onFunc,
}, nil
}
func (c *Client) Start() {
c.wg.Add(3)
ksync.GoSafe(c.readLoop, nil)
ksync.GoSafe(c.writeLoop, nil)
ksync.GoSafe(c.heartbeatLoop, nil)
}
func (c *Client) readLoop() {
defer c.wg.Done()
for {
select {
case <-c.ctx.Done():
return
default:
messageType, message, err := c.conn.ReadMessage()
if err != nil {
// log.Error(fmt.Sprintf("读取错误:%v", err))
c.Stop()
return
}
switch messageType {
case websocket.PingMessage:
c.sendChan <- &wsMessage{messageType: websocket.PongMessage, data: []byte("pong")}
case websocket.PongMessage:
case websocket.TextMessage, websocket.BinaryMessage:
_ = c.onFunc.OnMessage(message)
case websocket.CloseMessage:
log.Debug("收到关闭帧")
c.Stop()
return
}
}
}
}
func (c *Client) SendMsg(data []byte) {
c.sendChan <- &wsMessage{messageType: websocket.BinaryMessage, data: data}
}
func (c *Client) writeLoop() {
defer c.wg.Done()
for {
select {
case msg := <-c.sendChan:
switch msg.messageType {
case websocket.PingMessage:
_ = c.conn.WriteMessage(websocket.PingMessage, []byte("ping"))
case websocket.PongMessage:
_ = c.conn.WriteMessage(websocket.PongMessage, []byte("pong"))
default:
_ = c.conn.WriteMessage(msg.messageType, msg.data)
}
case <-c.ctx.Done():
// 发送关闭帧
_ = c.conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
time.Now().Add(10*time.Second),
)
return
}
}
}
func (c *Client) heartbeatLoop() {
defer c.wg.Done()
ticker := time.NewTicker(25 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.sendChan <- &wsMessage{messageType: websocket.PingMessage, data: []byte("ping")}
case <-c.ctx.Done():
return
}
}
}
func (c *Client) Stop() {
c.cancel()
_ = c.conn.Close()
c.wg.Wait()
}