fox/nat/nats.go

159 lines
3.1 KiB
Go
Raw Permalink Normal View History

2025-05-25 20:02:15 +08:00
package nat
import (
"fmt"
"github.com/fox/fox/log"
"github.com/fox/fox/safeChan"
"github.com/nats-io/nats.go"
"sync"
"time"
)
type RpcHandler func(msg []byte) ([]byte, error)
type Nats struct {
address []string
nc *nats.Conn
sub []*nats.Subscription
name string
mt sync.Mutex
}
// []string{
// "nats://server1:4222",
// "nats://server2:4222",
// "nats://server3:4222",
// }
func NewNats(name string, address ...string) *Nats {
n := &Nats{
address: address,
nc: nil,
sub: nil,
name: name,
mt: sync.Mutex{},
}
return n
}
func (n *Nats) Connect() error {
opts := nats.GetDefaultOptions()
opts.Servers = n.address
opts.AllowReconnect = true
opts.MaxReconnect = 10
opts.ReconnectWait = 5 * time.Second
opts.Name = n.name
opts.ClosedCB = func(conn *nats.Conn) {
_ = conn
log.Info("nats 连接已关闭")
}
opts.DisconnectedErrCB = func(conn *nats.Conn, err error) {
if err != nil {
log.ErrorF("nats 连接断开, err:%v", err)
}
}
nc, err := opts.Connect()
if err != nil {
return err
}
n.nc = nc
log.InfoF("连接nats成功当前服务器:%v", nc.ConnectedUrl())
return nil
}
func (n *Nats) Close() {
if n.nc != nil {
n.nc.Close()
}
n.clearAllSub()
}
func (n *Nats) SubscribeRpc(topic string, rpcHandler RpcHandler) error {
if rpcHandler == nil {
return fmt.Errorf("rpc handler is nil")
}
rspErrF := func(m *nats.Msg) {
if err := m.Respond([]byte("error")); err != nil {
log.Error(err.Error())
}
}
sub, err := n.nc.Subscribe(topic, func(m *nats.Msg) {
rsp, err := rpcHandler(m.Data)
if err != nil {
log.Error(err.Error())
rspErrF(m)
return
}
if err = m.Respond(rsp); err != nil {
log.Error(err.Error())
}
})
if err != nil {
return err
}
n.addSub(sub)
return nil
}
// 订阅回调,极少用。调用方自行保证并发性问题
func (n *Nats) SubscribeCb(topic string, cb func(m *nats.Msg)) error {
sub, err := n.nc.Subscribe(topic, cb)
if err != nil {
return err
}
n.addSub(sub)
return nil
}
// 订阅消息统一放入msgChan交给服务自行处理
2025-05-25 20:02:15 +08:00
func (n *Nats) Subscribe(topic string, msgChan *safeChan.ByteChan) error {
sub, err := n.nc.Subscribe(topic, func(m *nats.Msg) {
_ = msgChan.Write(m.Data)
})
if err != nil {
return err
}
n.addSub(sub)
return nil
}
func (n *Nats) Publish(topic string, msg []byte) error {
return n.nc.Publish(topic, msg)
}
2025-06-01 09:57:01 +08:00
func (n *Nats) Rpc(topic string, timeout time.Duration, msg []byte) ([]byte, error) {
rsp, err := n.nc.Request(topic, msg, timeout)
2025-05-25 20:02:15 +08:00
if err != nil {
return nil, err
}
return rsp.Data, nil
}
// 队列订阅,队列中只会有一个消费者消费该消息
func (n *Nats) QueueSubscribe(topic string, queue string, msgChan *safeChan.ByteChan) error {
sub, err := n.nc.QueueSubscribe(topic, queue, func(m *nats.Msg) {
_ = msgChan.Write(m.Data)
})
if err != nil {
return err
}
n.addSub(sub)
return nil
}
func (n *Nats) addSub(sub *nats.Subscription) {
n.mt.Lock()
defer n.mt.Unlock()
n.sub = append(n.sub, sub)
}
func (n *Nats) clearAllSub() {
n.mt.Lock()
defer n.mt.Unlock()
for _, sub := range n.sub {
_ = sub.Unsubscribe()
}
n.sub = n.sub[:0]
}