package nat import ( "fmt" "github.com/fox/fox/log" "github.com/fox/fox/safeChan" "github.com/nats-io/nats.go" "sync" "time" ) type RpcHandler func(msg []byte) ([]byte, error) type Nats struct { address []string nc *nats.Conn sub []*nats.Subscription name string mt sync.Mutex } // []string{ // "nats://server1:4222", // "nats://server2:4222", // "nats://server3:4222", // } func NewNats(name string, address ...string) *Nats { n := &Nats{ address: address, nc: nil, sub: nil, name: name, mt: sync.Mutex{}, } return n } func (n *Nats) Connect() error { opts := nats.GetDefaultOptions() opts.Servers = n.address opts.AllowReconnect = true opts.MaxReconnect = 10 opts.ReconnectWait = 5 * time.Second opts.Name = n.name opts.ClosedCB = func(conn *nats.Conn) { _ = conn log.Info("nats 连接已关闭") } opts.DisconnectedErrCB = func(conn *nats.Conn, err error) { if err != nil { log.ErrorF("nats 连接断开, err:%v", err) } } nc, err := opts.Connect() if err != nil { return err } n.nc = nc log.InfoF("连接nats成功,当前服务器:%v", nc.ConnectedUrl()) return nil } func (n *Nats) Close() { if n.nc != nil { n.nc.Close() } n.clearAllSub() } func (n *Nats) SubscribeRpc(topic string, rpcHandler RpcHandler) error { if rpcHandler == nil { return fmt.Errorf("rpc handler is nil") } rspErrF := func(m *nats.Msg) { if err := m.Respond([]byte("error")); err != nil { log.Error(err.Error()) } } sub, err := n.nc.Subscribe(topic, func(m *nats.Msg) { rsp, err := rpcHandler(m.Data) if err != nil { log.Error(err.Error()) rspErrF(m) return } if err = m.Respond(rsp); err != nil { log.Error(err.Error()) } }) if err != nil { return err } n.addSub(sub) return nil } // 订阅回调,极少用。调用方自行保证并发性问题 func (n *Nats) SubscribeCb(topic string, cb func(m *nats.Msg)) error { sub, err := n.nc.Subscribe(topic, cb) if err != nil { return err } n.addSub(sub) return nil } // 订阅,消息统一放入msgChan交给服务自行处理 func (n *Nats) Subscribe(topic string, msgChan *safeChan.ByteChan) error { sub, err := n.nc.Subscribe(topic, func(m *nats.Msg) { _ = msgChan.Write(m.Data) }) if err != nil { return err } n.addSub(sub) return nil } func (n *Nats) Publish(topic string, msg []byte) error { return n.nc.Publish(topic, msg) } func (n *Nats) Rpc(topic string, timeout time.Duration, msg []byte) ([]byte, error) { rsp, err := n.nc.Request(topic, msg, timeout) if err != nil { return nil, err } return rsp.Data, nil } // 队列订阅,队列中只会有一个消费者消费该消息 func (n *Nats) QueueSubscribe(topic string, queue string, msgChan *safeChan.ByteChan) error { sub, err := n.nc.QueueSubscribe(topic, queue, func(m *nats.Msg) { _ = msgChan.Write(m.Data) }) if err != nil { return err } n.addSub(sub) return nil } func (n *Nats) addSub(sub *nats.Subscription) { n.mt.Lock() defer n.mt.Unlock() n.sub = append(n.sub, sub) } func (n *Nats) clearAllSub() { n.mt.Lock() defer n.mt.Unlock() for _, sub := range n.sub { _ = sub.Unsubscribe() } n.sub = n.sub[:0] }