package ws import ( "context" "github.com/fox/fox/ksync" "github.com/fox/fox/log" "net/http" "sync" "time" "github.com/gorilla/websocket" ) type Client struct { conn *websocket.Conn sendChan chan *wsMessage ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } func NewClient(url string) (*Client, error) { dialer := websocket.DefaultDialer dialer.HandshakeTimeout = 30 * time.Second conn, _, err := dialer.Dial(url, http.Header{"User-Agent": {"MyClient/1.0"}}) if err != nil { return nil, err } ctx, cancel := context.WithCancel(context.Background()) return &Client{ conn: conn, sendChan: make(chan *wsMessage, 100), ctx: ctx, cancel: cancel, }, nil } func (c *Client) Start() { c.wg.Add(3) ksync.GoSafe(c.readLoop, nil) ksync.GoSafe(c.writeLoop, nil) ksync.GoSafe(c.heartbeatLoop, nil) } func (c *Client) readLoop() { defer c.wg.Done() for { select { case <-c.ctx.Done(): return default: messageType, message, err := c.conn.ReadMessage() if err != nil { // log.Error(fmt.Sprintf("读取错误:%v", err)) c.Stop() return } switch messageType { case websocket.PingMessage: c.sendChan <- &wsMessage{messageType: websocket.PongMessage, data: []byte("pong")} case websocket.PongMessage: case websocket.TextMessage, websocket.BinaryMessage: log.DebugF("收到消息,类型:%v 内容:%v", messageType, string(message)) case websocket.CloseMessage: log.Debug("收到关闭帧") c.Stop() return } } } } func (c *Client) SendMsg(data []byte) { c.sendChan <- &wsMessage{messageType: websocket.BinaryMessage, data: data} } func (c *Client) writeLoop() { defer c.wg.Done() for { select { case msg := <-c.sendChan: switch msg.messageType { case websocket.PingMessage: _ = c.conn.WriteMessage(websocket.PingMessage, []byte("ping")) case websocket.PongMessage: _ = c.conn.WriteMessage(websocket.PongMessage, []byte("pong")) default: _ = c.conn.WriteMessage(msg.messageType, msg.data) } case <-c.ctx.Done(): // 发送关闭帧 _ = c.conn.WriteControl( websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(10*time.Second), ) return } } } func (c *Client) heartbeatLoop() { defer c.wg.Done() ticker := time.NewTicker(25 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: c.sendChan <- &wsMessage{messageType: websocket.PingMessage, data: []byte("ping")} case <-c.ctx.Done(): return } } } func (c *Client) Stop() { c.cancel() _ = c.conn.Close() c.wg.Wait() }