2025-05-25 20:02:15 +08:00
|
|
|
package service
|
|
|
|
|
|
|
|
import (
|
2025-05-28 17:52:28 +08:00
|
|
|
"encoding/json"
|
|
|
|
"github.com/fox/fox/etcd"
|
2025-05-29 11:49:24 +08:00
|
|
|
"github.com/fox/fox/ipb"
|
2025-05-28 17:52:28 +08:00
|
|
|
"github.com/fox/fox/log"
|
2025-05-25 20:02:15 +08:00
|
|
|
"github.com/fox/fox/nat"
|
2025-05-29 11:49:24 +08:00
|
|
|
"github.com/fox/fox/processor"
|
|
|
|
"github.com/golang/protobuf/proto"
|
2025-05-28 17:52:28 +08:00
|
|
|
"github.com/nats-io/nats.go"
|
|
|
|
"os"
|
|
|
|
"sync"
|
2025-05-25 20:02:15 +08:00
|
|
|
)
|
|
|
|
|
2025-05-28 17:52:28 +08:00
|
|
|
const (
|
|
|
|
updateTopic = "update.service.topic"
|
|
|
|
)
|
|
|
|
|
|
|
|
// 初始化服务所需要的参数
|
|
|
|
type InitNatsServiceParams struct {
|
|
|
|
EtcdAddress []string
|
|
|
|
EtcdUsername string
|
|
|
|
EtcdPassword string
|
|
|
|
NatsAddress []string
|
|
|
|
ServiceType string // 本服务的服务类型
|
|
|
|
ServiceName string // 本服务的服务名,唯一索引
|
|
|
|
OnFunc IOnFunc // 包含初始化,停止,服务的消息处理
|
|
|
|
TypeId int // 与ServiceType一一对应
|
|
|
|
Version string
|
|
|
|
}
|
|
|
|
|
2025-05-25 20:02:15 +08:00
|
|
|
type NatsService struct {
|
|
|
|
*BaseService
|
2025-05-29 11:49:24 +08:00
|
|
|
nats *nat.Nats
|
|
|
|
registry *etcd.Registry[etcd.ServiceNode]
|
|
|
|
rpcProcessor *processor.Processor
|
|
|
|
node etcd.ServiceNode // 本服务节点信息
|
2025-05-25 20:02:15 +08:00
|
|
|
}
|
|
|
|
|
2025-05-28 17:52:28 +08:00
|
|
|
func NewNatsService(param *InitNatsServiceParams) (*NatsService, error) {
|
|
|
|
var err error
|
2025-05-25 20:02:15 +08:00
|
|
|
s := new(NatsService)
|
2025-05-28 17:52:28 +08:00
|
|
|
s.BaseService = NewBaseService(param.ServiceType, param.ServiceName, param.OnFunc, s)
|
|
|
|
if s.registry, err = etcd.NewRegistry[etcd.ServiceNode](param.EtcdAddress, param.EtcdUsername, param.EtcdPassword); err != nil {
|
2025-05-25 20:02:15 +08:00
|
|
|
return nil, err
|
|
|
|
}
|
2025-05-28 17:52:28 +08:00
|
|
|
s.node = etcd.ServiceNode{
|
|
|
|
TypeId: param.TypeId,
|
|
|
|
Name: s.Name(),
|
|
|
|
Type: s.Type(),
|
|
|
|
Version: param.Version,
|
|
|
|
}
|
|
|
|
if err = s.registry.Register(s.node); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
s.registry.WatchServices()
|
|
|
|
|
|
|
|
s.nats = nat.NewNats(param.ServiceName, param.NatsAddress...)
|
|
|
|
if err = s.nats.Connect(); err != nil {
|
|
|
|
s.registry.UnregisterService()
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
// 订阅本服务名的topic
|
|
|
|
if err = s.Subscribe(Topic(s)); err != nil {
|
|
|
|
log.Error(err.Error())
|
|
|
|
}
|
2025-05-29 11:49:24 +08:00
|
|
|
// 订阅广播服务上线
|
|
|
|
_ = s.subscribeUpdateService()
|
|
|
|
s.publishUpdateService()
|
|
|
|
|
|
|
|
// 订阅rpc回调
|
|
|
|
_ = s.subscribeRpc()
|
2025-05-25 20:02:15 +08:00
|
|
|
return s, nil
|
|
|
|
}
|
|
|
|
|
2025-05-28 20:07:20 +08:00
|
|
|
func (n *NatsService) publishUpdateService() {
|
|
|
|
jsonData, _ := json.Marshal(n.node)
|
|
|
|
_ = n.nats.Publish(updateTopic, jsonData)
|
|
|
|
}
|
|
|
|
|
2025-05-28 17:52:28 +08:00
|
|
|
func (n *NatsService) subscribeUpdateService() error {
|
2025-05-29 11:49:24 +08:00
|
|
|
return n.SubscribeCb(updateTopic, func(m *nats.Msg) {
|
2025-05-28 17:52:28 +08:00
|
|
|
var node = &etcd.ServiceNode{}
|
|
|
|
_ = json.Unmarshal(m.Data, node)
|
|
|
|
// 不是同类服务不处理,是自己发出来的更新,也不处理
|
|
|
|
if node.Type != n.Type() || node.Name == n.Name() {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
// 有新服务上线,本服务准备退出
|
|
|
|
if n.node.Version < node.Version {
|
|
|
|
n.NotifyStop()
|
|
|
|
n.WaitStop()
|
|
|
|
log.InfoF("%v auto exit, initiating shutdown...", n.Name())
|
|
|
|
os.Exit(0)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2025-05-29 11:49:24 +08:00
|
|
|
func (n *NatsService) subscribeRpc() error {
|
|
|
|
return n.SubscribeCb(RpcTopic(n), func(m *nats.Msg) {
|
|
|
|
var iMsg = &ipb.InternalMsg{}
|
|
|
|
_ = proto.Unmarshal(m.Data, iMsg)
|
|
|
|
_ = n.rpcProcessor.Dispatch(iMsg.RetRpcMsgId, iMsg)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// 订阅回调
|
2025-05-28 20:07:20 +08:00
|
|
|
func (n *NatsService) SubscribeCb(topic string, cb func(m *nats.Msg)) error {
|
2025-05-29 11:49:24 +08:00
|
|
|
return n.nats.SubscribeCb(topic, func(m *nats.Msg) {
|
|
|
|
n.RunOnce(func() {
|
|
|
|
cb(m)
|
|
|
|
})
|
|
|
|
})
|
2025-05-28 20:07:20 +08:00
|
|
|
}
|
|
|
|
|
2025-05-25 20:02:15 +08:00
|
|
|
func (n *NatsService) Subscribe(topic string) error {
|
|
|
|
return n.nats.Subscribe(topic, n.msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
// 队列订阅,队列中只会有一个消费者消费该消息
|
|
|
|
func (n *NatsService) QueueSubscribe(topic string, queue string) error {
|
|
|
|
return n.nats.QueueSubscribe(topic, queue, n.msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *NatsService) OnStop() {
|
|
|
|
s.nats.Close()
|
|
|
|
}
|
|
|
|
|
2025-05-29 11:49:24 +08:00
|
|
|
func (s *NatsService) Send(topic string, msg *ipb.InternalMsg) error {
|
|
|
|
data, _ := proto.Marshal(msg)
|
|
|
|
return s.nats.Publish(topic, data)
|
2025-05-25 20:02:15 +08:00
|
|
|
}
|
|
|
|
|
2025-05-29 11:49:24 +08:00
|
|
|
func (s *NatsService) Call(rpcTopic string, msg *ipb.InternalMsg, cb func(msg *ipb.InternalMsg)) error {
|
|
|
|
data, _ := proto.Marshal(ipb.MakeRpcMsg(msg.ServiceName, msg.ConnId, msg.UserId, msg.MsgId, msg.Msg))
|
|
|
|
err := s.nats.Publish(rpcTopic, data)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
s.rpcProcessor.RegisterMessage(msg.RetRpcMsgId, ipb.InternalMsg{}, cb)
|
|
|
|
return nil
|
2025-05-25 20:02:15 +08:00
|
|
|
}
|
2025-05-28 17:52:28 +08:00
|
|
|
|
2025-05-28 19:43:56 +08:00
|
|
|
func (s *NatsService) ServiceEtcd() *etcd.Registry[etcd.ServiceNode] {
|
|
|
|
return s.registry
|
|
|
|
}
|
|
|
|
|
2025-05-28 17:52:28 +08:00
|
|
|
// 从etcd中获取所有服务节点
|
|
|
|
func (s *NatsService) GetServiceNodes() *sync.Map {
|
|
|
|
return s.registry.GetNodes()
|
|
|
|
}
|
|
|
|
|
|
|
|
// 查找指定的服务节点信息
|
|
|
|
func (s *NatsService) FindServiceNode(serviceName string) (etcd.ServiceNode, error) {
|
|
|
|
return s.registry.FindNode(serviceName)
|
|
|
|
}
|