fox/service/natsService.go

158 lines
4.1 KiB
Go
Raw Normal View History

2025-05-25 20:02:15 +08:00
package service
import (
"encoding/json"
"github.com/fox/fox/etcd"
2025-05-29 11:49:24 +08:00
"github.com/fox/fox/ipb"
"github.com/fox/fox/log"
2025-05-25 20:02:15 +08:00
"github.com/fox/fox/nat"
2025-05-29 11:49:24 +08:00
"github.com/fox/fox/processor"
"github.com/golang/protobuf/proto"
"github.com/nats-io/nats.go"
"os"
"sync"
2025-05-25 20:02:15 +08:00
)
const (
updateTopic = "update.service.topic"
)
// 初始化服务所需要的参数
type InitNatsServiceParams struct {
EtcdAddress []string
EtcdUsername string
EtcdPassword string
NatsAddress []string
ServiceType string // 本服务的服务类型
ServiceName string // 本服务的服务名,唯一索引
OnFunc IOnFunc // 包含初始化,停止,服务的消息处理
TypeId int // 与ServiceType一一对应
Version string
}
2025-05-25 20:02:15 +08:00
type NatsService struct {
*BaseService
2025-05-29 11:49:24 +08:00
nats *nat.Nats
registry *etcd.Registry[etcd.ServiceNode]
rpcProcessor *processor.Processor
node etcd.ServiceNode // 本服务节点信息
2025-05-25 20:02:15 +08:00
}
func NewNatsService(param *InitNatsServiceParams) (*NatsService, error) {
var err error
2025-05-25 20:02:15 +08:00
s := new(NatsService)
s.BaseService = NewBaseService(param.ServiceType, param.ServiceName, param.OnFunc, s)
if s.registry, err = etcd.NewRegistry[etcd.ServiceNode](param.EtcdAddress, param.EtcdUsername, param.EtcdPassword); err != nil {
2025-05-25 20:02:15 +08:00
return nil, err
}
s.node = etcd.ServiceNode{
TypeId: param.TypeId,
Name: s.Name(),
Type: s.Type(),
Version: param.Version,
}
if err = s.registry.Register(s.node); err != nil {
return nil, err
}
s.registry.WatchServices()
s.nats = nat.NewNats(param.ServiceName, param.NatsAddress...)
if err = s.nats.Connect(); err != nil {
s.registry.UnregisterService()
return nil, err
}
// 订阅本服务名的topic
if err = s.Subscribe(Topic(s)); err != nil {
log.Error(err.Error())
}
2025-05-29 11:49:24 +08:00
// 订阅广播服务上线
_ = s.subscribeUpdateService()
s.publishUpdateService()
// 订阅rpc回调
_ = s.subscribeRpc()
2025-05-25 20:02:15 +08:00
return s, nil
}
2025-05-28 20:07:20 +08:00
func (n *NatsService) publishUpdateService() {
jsonData, _ := json.Marshal(n.node)
_ = n.nats.Publish(updateTopic, jsonData)
}
func (n *NatsService) subscribeUpdateService() error {
2025-05-29 11:49:24 +08:00
return n.SubscribeCb(updateTopic, func(m *nats.Msg) {
var node = &etcd.ServiceNode{}
_ = json.Unmarshal(m.Data, node)
// 不是同类服务不处理,是自己发出来的更新,也不处理
if node.Type != n.Type() || node.Name == n.Name() {
return
}
// 有新服务上线,本服务准备退出
if n.node.Version < node.Version {
n.NotifyStop()
n.WaitStop()
log.InfoF("%v auto exit, initiating shutdown...", n.Name())
os.Exit(0)
}
})
}
2025-05-29 11:49:24 +08:00
func (n *NatsService) subscribeRpc() error {
return n.SubscribeCb(RpcTopic(n), func(m *nats.Msg) {
var iMsg = &ipb.InternalMsg{}
_ = proto.Unmarshal(m.Data, iMsg)
_ = n.rpcProcessor.Dispatch(iMsg.RetRpcMsgId, iMsg)
})
}
// 订阅回调
2025-05-28 20:07:20 +08:00
func (n *NatsService) SubscribeCb(topic string, cb func(m *nats.Msg)) error {
2025-05-29 11:49:24 +08:00
return n.nats.SubscribeCb(topic, func(m *nats.Msg) {
n.RunOnce(func() {
cb(m)
})
})
2025-05-28 20:07:20 +08:00
}
2025-05-25 20:02:15 +08:00
func (n *NatsService) Subscribe(topic string) error {
return n.nats.Subscribe(topic, n.msg)
}
// 队列订阅,队列中只会有一个消费者消费该消息
func (n *NatsService) QueueSubscribe(topic string, queue string) error {
return n.nats.QueueSubscribe(topic, queue, n.msg)
}
func (s *NatsService) OnStop() {
s.nats.Close()
}
2025-05-29 11:49:24 +08:00
func (s *NatsService) Send(topic string, msg *ipb.InternalMsg) error {
data, _ := proto.Marshal(msg)
return s.nats.Publish(topic, data)
2025-05-25 20:02:15 +08:00
}
2025-05-29 11:49:24 +08:00
func (s *NatsService) Call(rpcTopic string, msg *ipb.InternalMsg, cb func(msg *ipb.InternalMsg)) error {
data, _ := proto.Marshal(ipb.MakeRpcMsg(msg.ServiceName, msg.ConnId, msg.UserId, msg.MsgId, msg.Msg))
err := s.nats.Publish(rpcTopic, data)
if err != nil {
return err
}
s.rpcProcessor.RegisterMessage(msg.RetRpcMsgId, ipb.InternalMsg{}, cb)
return nil
2025-05-25 20:02:15 +08:00
}
2025-05-28 19:43:56 +08:00
func (s *NatsService) ServiceEtcd() *etcd.Registry[etcd.ServiceNode] {
return s.registry
}
// 从etcd中获取所有服务节点
func (s *NatsService) GetServiceNodes() *sync.Map {
return s.registry.GetNodes()
}
// 查找指定的服务节点信息
func (s *NatsService) FindServiceNode(serviceName string) (etcd.ServiceNode, error) {
return s.registry.FindNode(serviceName)
}