fox/nat/nats.go
2025-05-25 20:02:15 +08:00

148 lines
2.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package nat
import (
"fmt"
"github.com/fox/fox/log"
"github.com/fox/fox/safeChan"
"github.com/nats-io/nats.go"
"sync"
"time"
)
type RpcHandler func(msg []byte) ([]byte, error)
type Nats struct {
address []string
nc *nats.Conn
sub []*nats.Subscription
name string
mt sync.Mutex
}
// []string{
// "nats://server1:4222",
// "nats://server2:4222",
// "nats://server3:4222",
// }
func NewNats(name string, address ...string) *Nats {
n := &Nats{
address: address,
nc: nil,
sub: nil,
name: name,
mt: sync.Mutex{},
}
return n
}
func (n *Nats) Connect() error {
opts := nats.GetDefaultOptions()
opts.Servers = n.address
opts.AllowReconnect = true
opts.MaxReconnect = 10
opts.ReconnectWait = 5 * time.Second
opts.Name = n.name
opts.ClosedCB = func(conn *nats.Conn) {
_ = conn
log.Info("nats 连接已关闭")
}
opts.DisconnectedErrCB = func(conn *nats.Conn, err error) {
if err != nil {
log.ErrorF("nats 连接断开, err:%v", err)
}
}
nc, err := opts.Connect()
if err != nil {
return err
}
n.nc = nc
log.InfoF("连接nats成功当前服务器:%v", nc.ConnectedUrl())
return nil
}
func (n *Nats) Close() {
if n.nc != nil {
n.nc.Close()
}
n.clearAllSub()
}
func (n *Nats) SubscribeRpc(topic string, rpcHandler RpcHandler) error {
if rpcHandler == nil {
return fmt.Errorf("rpc handler is nil")
}
rspErrF := func(m *nats.Msg) {
if err := m.Respond([]byte("error")); err != nil {
log.Error(err.Error())
}
}
sub, err := n.nc.Subscribe(topic, func(m *nats.Msg) {
rsp, err := rpcHandler(m.Data)
if err != nil {
log.Error(err.Error())
rspErrF(m)
return
}
if err = m.Respond(rsp); err != nil {
log.Error(err.Error())
}
})
if err != nil {
return err
}
n.addSub(sub)
return nil
}
func (n *Nats) Subscribe(topic string, msgChan *safeChan.ByteChan) error {
sub, err := n.nc.Subscribe(topic, func(m *nats.Msg) {
_ = msgChan.Write(m.Data)
})
if err != nil {
return err
}
n.addSub(sub)
return nil
}
func (n *Nats) Publish(topic string, msg []byte) error {
return n.nc.Publish(topic, msg)
}
func (n *Nats) Rpc(topic string, msg []byte) ([]byte, error) {
rsp, err := n.nc.Request(topic, msg, 30*time.Second)
if err != nil {
return nil, err
}
return rsp.Data, nil
}
// 队列订阅,队列中只会有一个消费者消费该消息
func (n *Nats) QueueSubscribe(topic string, queue string, msgChan *safeChan.ByteChan) error {
sub, err := n.nc.QueueSubscribe(topic, queue, func(m *nats.Msg) {
_ = msgChan.Write(m.Data)
})
if err != nil {
return err
}
n.addSub(sub)
return nil
}
func (n *Nats) addSub(sub *nats.Subscription) {
n.mt.Lock()
defer n.mt.Unlock()
n.sub = append(n.sub, sub)
}
func (n *Nats) clearAllSub() {
n.mt.Lock()
defer n.mt.Unlock()
for _, sub := range n.sub {
_ = sub.Unsubscribe()
}
n.sub = n.sub[:0]
}