fox/service/natsService.go

174 lines
4.8 KiB
Go
Raw Normal View History

2025-05-25 20:02:15 +08:00
package service
import (
"encoding/json"
"github.com/fox/fox/etcd"
2025-05-29 11:49:24 +08:00
"github.com/fox/fox/ipb"
"github.com/fox/fox/log"
2025-05-25 20:02:15 +08:00
"github.com/fox/fox/nat"
2025-05-29 11:49:24 +08:00
"github.com/fox/fox/processor"
"github.com/golang/protobuf/proto"
"github.com/nats-io/nats.go"
"os"
"sync"
2025-06-01 09:57:01 +08:00
"time"
2025-05-25 20:02:15 +08:00
)
const (
updateTopic = "update.service.topic"
)
// 初始化服务所需要的参数
type InitNatsServiceParams struct {
EtcdAddress []string
EtcdUsername string
EtcdPassword string
NatsAddress []string
ServiceType string // 本服务的服务类型
ServiceName string // 本服务的服务名,唯一索引
OnFunc IOnFunc // 包含初始化,停止,服务的消息处理
TypeId int // 与ServiceType一一对应
Version string
}
2025-05-25 20:02:15 +08:00
type NatsService struct {
*BaseService
2025-05-29 11:49:24 +08:00
nats *nat.Nats
registry *etcd.Registry[etcd.ServiceNode]
2025-06-02 01:08:12 +08:00
RpcProcessor *processor.RpcProcessor
2025-05-29 11:49:24 +08:00
node etcd.ServiceNode // 本服务节点信息
2025-05-25 20:02:15 +08:00
}
func NewNatsService(param *InitNatsServiceParams) (*NatsService, error) {
var err error
2025-05-25 20:02:15 +08:00
s := new(NatsService)
s.BaseService = NewBaseService(param.ServiceType, param.ServiceName, param.OnFunc, s)
if s.registry, err = etcd.NewRegistry[etcd.ServiceNode](param.EtcdAddress, param.EtcdUsername, param.EtcdPassword); err != nil {
2025-05-25 20:02:15 +08:00
return nil, err
}
2025-06-02 01:08:12 +08:00
s.RpcProcessor = processor.NewRpcProcessor()
s.node = etcd.ServiceNode{
TypeId: param.TypeId,
Name: s.Name(),
Type: s.Type(),
Version: param.Version,
}
if err = s.registry.Register(s.node); err != nil {
return nil, err
}
s.registry.WatchServices()
s.nats = nat.NewNats(param.ServiceName, param.NatsAddress...)
if err = s.nats.Connect(); err != nil {
s.registry.UnregisterService()
return nil, err
}
// 订阅本服务名的topic
if err = s.Subscribe(Topic(s)); err != nil {
log.Error(err.Error())
}
2025-05-29 11:49:24 +08:00
// 订阅广播服务上线
_ = s.subscribeUpdateService()
s.publishUpdateService()
// 订阅rpc回调
_ = s.subscribeRpc()
2025-05-25 20:02:15 +08:00
return s, nil
}
2025-05-28 20:07:20 +08:00
func (n *NatsService) publishUpdateService() {
jsonData, _ := json.Marshal(n.node)
_ = n.nats.Publish(updateTopic, jsonData)
//log.Debug(n.Log("publishUpdateService:%v", string(jsonData)))
2025-05-28 20:07:20 +08:00
}
func (n *NatsService) subscribeUpdateService() error {
2025-05-29 11:49:24 +08:00
return n.SubscribeCb(updateTopic, func(m *nats.Msg) {
var node = &etcd.ServiceNode{}
_ = json.Unmarshal(m.Data, node)
2025-05-31 10:36:46 +08:00
//log.Debug(n.Log("发现新节点:%v", string(m.Data)))
// 不是同类服务不处理,是自己发出来的更新,也不处理
if node.Type != n.Type() || node.Name == n.Name() {
//log.Debug(n.Log("与本节点不匹配.本节点:%v", n.node))
return
}
// 有新服务上线,本服务准备退出
if n.node.Version < node.Version {
2025-05-31 10:36:46 +08:00
log.InfoF(n.Log("有新服务:%v,版本:%v.本服务即将关闭", node.Name, node.Version))
n.NotifyStop()
2025-05-31 10:36:46 +08:00
//n.WaitStop()
os.Exit(0)
}
})
}
2025-05-29 11:49:24 +08:00
func (n *NatsService) subscribeRpc() error {
return n.SubscribeCb(RpcTopic(n), func(m *nats.Msg) {
var iMsg = &ipb.InternalMsg{}
_ = proto.Unmarshal(m.Data, iMsg)
2025-06-02 01:08:12 +08:00
rsp, err := n.RpcProcessor.Dispatch(iMsg.RpcMsgId, iMsg)
if err != nil {
log.Error(err.Error())
return
2025-05-29 17:06:54 +08:00
}
2025-06-02 01:08:12 +08:00
rspData, _ := proto.Marshal(rsp)
_ = m.Respond(rspData)
2025-05-29 11:49:24 +08:00
})
}
2025-05-31 10:36:46 +08:00
// 订阅回调 cb会切回service协程执行避免在cb中有阻塞式请求
2025-05-28 20:07:20 +08:00
func (n *NatsService) SubscribeCb(topic string, cb func(m *nats.Msg)) error {
2025-05-29 11:49:24 +08:00
return n.nats.SubscribeCb(topic, func(m *nats.Msg) {
n.RunOnce(func() {
cb(m)
})
})
2025-05-28 20:07:20 +08:00
}
2025-05-25 20:02:15 +08:00
func (n *NatsService) Subscribe(topic string) error {
return n.nats.Subscribe(topic, n.msg)
}
// 队列订阅,队列中只会有一个消费者消费该消息
func (n *NatsService) QueueSubscribe(topic string, queue string) error {
return n.nats.QueueSubscribe(topic, queue, n.msg)
}
func (s *NatsService) OnStop() {
s.nats.Close()
}
2025-05-29 11:49:24 +08:00
func (s *NatsService) Send(topic string, msg *ipb.InternalMsg) error {
data, _ := proto.Marshal(msg)
return s.nats.Publish(topic, data)
2025-05-25 20:02:15 +08:00
}
2025-06-02 21:37:23 +08:00
// call会阻塞主协程需要起新协程去等待返回值业务逻辑则转回主协程处理。
// 不要将处理返回值的业务逻辑放到新协程,避免业务逻辑中有数据并发问题
// 比如并发call拉用户数据然后操作本地的user map
2025-06-01 09:57:01 +08:00
func (s *NatsService) Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) {
data, _ := proto.Marshal(msg)
response, err := s.nats.Rpc(rpcTopic, timeout, data)
2025-05-29 11:49:24 +08:00
if err != nil {
2025-06-01 09:57:01 +08:00
return nil, err
2025-05-29 11:49:24 +08:00
}
2025-06-01 09:57:01 +08:00
rspMsg := &ipb.InternalMsg{}
_ = proto.Unmarshal(response, rspMsg)
return rspMsg, nil
2025-05-25 20:02:15 +08:00
}
2025-05-28 19:43:56 +08:00
func (s *NatsService) ServiceEtcd() *etcd.Registry[etcd.ServiceNode] {
return s.registry
}
// 从etcd中获取所有服务节点
func (s *NatsService) GetServiceNodes() *sync.Map {
return s.registry.GetNodes()
}
// 查找指定的服务节点信息
func (s *NatsService) FindServiceNode(serviceName string) (etcd.ServiceNode, error) {
return s.registry.FindNode(serviceName)
}