fox/service/natsService.go
2025-05-28 20:07:20 +08:00

135 lines
3.5 KiB
Go

package service
import (
"encoding/json"
"github.com/fox/fox/etcd"
"github.com/fox/fox/log"
"github.com/fox/fox/nat"
"github.com/nats-io/nats.go"
"os"
"sync"
"time"
)
const (
updateTopic = "update.service.topic"
)
// 初始化服务所需要的参数
type InitNatsServiceParams struct {
EtcdAddress []string
EtcdUsername string
EtcdPassword string
NatsAddress []string
ServiceType string // 本服务的服务类型
ServiceName string // 本服务的服务名,唯一索引
OnFunc IOnFunc // 包含初始化,停止,服务的消息处理
TypeId int // 与ServiceType一一对应
Version string
}
type NatsService struct {
*BaseService
nats *nat.Nats
registry *etcd.Registry[etcd.ServiceNode]
node etcd.ServiceNode // 本服务节点信息
}
func NewNatsService(param *InitNatsServiceParams) (*NatsService, error) {
var err error
s := new(NatsService)
s.BaseService = NewBaseService(param.ServiceType, param.ServiceName, param.OnFunc, s)
if s.registry, err = etcd.NewRegistry[etcd.ServiceNode](param.EtcdAddress, param.EtcdUsername, param.EtcdPassword); err != nil {
return nil, err
}
s.node = etcd.ServiceNode{
TypeId: param.TypeId,
Name: s.Name(),
Type: s.Type(),
Version: param.Version,
}
if err = s.registry.Register(s.node); err != nil {
return nil, err
}
s.registry.WatchServices()
// 广播服务上线
s.publishUpdateService()
s.nats = nat.NewNats(param.ServiceName, param.NatsAddress...)
if err = s.nats.Connect(); err != nil {
s.registry.UnregisterService()
return nil, err
}
// 订阅本服务名的topic
if err = s.Subscribe(Topic(s)); err != nil {
log.Error(err.Error())
}
return s, nil
}
func (n *NatsService) publishUpdateService() {
jsonData, _ := json.Marshal(n.node)
_ = n.nats.Publish(updateTopic, jsonData)
}
func (n *NatsService) subscribeUpdateService() error {
return n.nats.SubscribeCb(updateTopic, func(m *nats.Msg) {
var node = &etcd.ServiceNode{}
_ = json.Unmarshal(m.Data, node)
// 不是同类服务不处理,是自己发出来的更新,也不处理
if node.Type != n.Type() || node.Name == n.Name() {
return
}
// 有新服务上线,本服务准备退出
if n.node.Version < node.Version {
n.NotifyStop()
n.WaitStop()
log.InfoF("%v auto exit, initiating shutdown...", n.Name())
os.Exit(0)
}
})
}
// 订阅回调,极少用。调用方自行保证并发性问题
func (n *NatsService) SubscribeCb(topic string, cb func(m *nats.Msg)) error {
return n.nats.SubscribeCb(topic, cb)
}
func (n *NatsService) Subscribe(topic string) error {
return n.nats.Subscribe(topic, n.msg)
}
// 队列订阅,队列中只会有一个消费者消费该消息
func (n *NatsService) QueueSubscribe(topic string, queue string) error {
return n.nats.QueueSubscribe(topic, queue, n.msg)
}
func (s *NatsService) OnStop() {
s.nats.Close()
}
func (s *NatsService) Send(topic string, msg []byte) error {
return s.nats.Publish(topic, msg)
}
func (s *NatsService) Call(topic string, timeout time.Duration, msg []byte) ([]byte, error) {
_ = topic
_ = timeout
_ = msg
return nil, nil
}
func (s *NatsService) ServiceEtcd() *etcd.Registry[etcd.ServiceNode] {
return s.registry
}
// 从etcd中获取所有服务节点
func (s *NatsService) GetServiceNodes() *sync.Map {
return s.registry.GetNodes()
}
// 查找指定的服务节点信息
func (s *NatsService) FindServiceNode(serviceName string) (etcd.ServiceNode, error) {
return s.registry.FindNode(serviceName)
}