在natsService中集中etcd的服务发现,当新服务上线,旧服务自动退出

This commit is contained in:
liuxiaobo 2025-05-28 17:52:28 +08:00
parent 0f27a3aa5b
commit d061a2bc50
9 changed files with 174 additions and 103 deletions

View File

@ -17,10 +17,10 @@ type Registry[T INode] struct {
nodes sync.Map nodes sync.Map
} }
func NewRegistry[T INode](endpoints []string, rootKey, username, password string) (*Registry[T], error) { func NewRegistry[T INode](endpoints []string, username, password string) (*Registry[T], error) {
var err error var err error
e := &Registry[T]{} e := &Registry[T]{}
e.etcdRegistryImpl, err = newServiceRegistryImpl(endpoints, rootKey, username, password, e.saveNode) e.etcdRegistryImpl, err = newServiceRegistryImpl(endpoints, resultT[T]{}.Value.EtcdRootKey(), username, password, e.saveNode)
return e, err return e, err
} }
@ -46,7 +46,7 @@ func (r *Registry[T]) GetNodes() *sync.Map {
return &r.nodes return &r.nodes
} }
// 获取当前根节点下所有节点信息 // 据inode的mapKey()查找对应的节点
func (r *Registry[T]) FindNode(key string) (T, error) { func (r *Registry[T]) FindNode(key string) (T, error) {
var tmp = resultT[T]{Err: nil} var tmp = resultT[T]{Err: nil}
v, ok := r.nodes.Load(key) v, ok := r.nodes.Load(key)

View File

@ -33,21 +33,21 @@ func TestService(t *testing.T) {
_ = etcdAddress2 _ = etcdAddress2
log.Open("test.log", log.DebugL) log.Open("test.log", log.DebugL)
// 创建注册中心 // 创建注册中心
registry, err := NewRegistry[ServiceNode]([]string{etcdAddress2}, rootKeyServices, "", "") registry, err := NewRegistry[ServiceNode]([]string{etcdAddress2}, "", "")
if err != nil { if err != nil {
log.Fatal(err.Error()) log.Fatal(err.Error())
return
} }
// 注册示例服务 // 注册示例服务
service := &ServiceNode{ service := &ServiceNode{
Name: "instance-1", Name: "instance-1",
Type: "user-service", Type: "user-service",
Address: "localhost",
Port: 8080,
} }
if err := registry.Register(service); err != nil { if err := registry.Register(service); err != nil {
log.Fatal(err.Error()) log.Fatal(err.Error())
return
} }
// 监控服务变化 // 监控服务变化
@ -62,9 +62,10 @@ func TestTopicRegistry(t *testing.T) {
_ = etcdAddress2 _ = etcdAddress2
log.Open("test.log", log.DebugL) log.Open("test.log", log.DebugL)
// 创建注册中心 // 创建注册中心
registry, err := NewRegistry[TopicNode]([]string{etcdAddress2}, rootKeyTopic, "", "") registry, err := NewRegistry[TopicNode]([]string{etcdAddress2}, "", "")
if err != nil { if err != nil {
log.Fatal(err.Error()) log.Fatal(err.Error())
return
} }
// 注册示例服务 // 注册示例服务
@ -79,12 +80,12 @@ func TestTopicRegistry(t *testing.T) {
// 监控服务变化 // 监控服务变化
registry.WatchServices() registry.WatchServices()
for _, n := range registry.GetNodes() { registry.GetNodes().Range(func(k, v interface{}) bool {
log.DebugF("发现topic:%v, 创建者:%v", n.Name, n.Creator) if node, ok := v.(TopicNode); ok {
if v, err := registry.FindNode(n.MapKey()); err == nil { log.DebugF("this is topic:%v", node.Name)
log.DebugF("topic:%v exist", v.Name)
} }
} return true
})
time.Sleep(60 * time.Second) time.Sleep(60 * time.Second)
registry.UnregisterService() registry.UnregisterService()

View File

@ -16,13 +16,10 @@ type INode interface {
// 服务节点信息TypeId及Type都是标记同类型的节点Name是区别该节点与其它节点的字段 // 服务节点信息TypeId及Type都是标记同类型的节点Name是区别该节点与其它节点的字段
type ServiceNode struct { type ServiceNode struct {
TypeId int `json:"type_id"` // 服务类型id与Type字段功能一样 由proto定义gate通过该字段找到这类服务的所有节点信息 TypeId int `json:"type_id"` // 服务类型id与Type字段功能一样 由proto定义gate通过该字段找到这类服务的所有节点信息
Name string `json:"name"` // 服务名 多个同类服务依赖name区分:lobby1,lobby2,lobby3等等 Name string `json:"name"` // 服务名 多个同类服务依赖name区分:lobby1,lobby2,lobby3等等
Type string `json:"type"` // 服务类型:lobby, game, gate等等 Type string `json:"type"` // 服务类型:lobby, game, gate等等
Address string `json:"address"` // 地址 Version string `json:"version"` // 版本号
Port int `json:"port"` // 端口
Version string `json:"version"` // 版本号
ServiceType ServiceType `json:"service_type"` // 服务类型
} }
func (s ServiceNode) EtcdKey() string { func (s ServiceNode) EtcdKey() string {

View File

@ -96,6 +96,17 @@ func (n *Nats) SubscribeRpc(topic string, rpcHandler RpcHandler) error {
return nil return nil
} }
// 订阅回调,极少用。调用方自行保证并发性问题
func (n *Nats) SubscribeCb(topic string, cb func(m *nats.Msg)) error {
sub, err := n.nc.Subscribe(topic, cb)
if err != nil {
return err
}
n.addSub(sub)
return nil
}
// 订阅消息统一放入msgChan交给服务自行处理
func (n *Nats) Subscribe(topic string, msgChan *safeChan.ByteChan) error { func (n *Nats) Subscribe(topic string, msgChan *safeChan.ByteChan) error {
sub, err := n.nc.Subscribe(topic, func(m *nats.Msg) { sub, err := n.nc.Subscribe(topic, func(m *nats.Msg) {
_ = msgChan.Write(m.Data) _ = msgChan.Write(m.Data)

View File

@ -18,12 +18,12 @@ type BaseService struct {
onFunc IOnFunc onFunc IOnFunc
sender ISender sender ISender
msg *safeChan.SafeChan[[]byte] msg *safeChan.SafeChan[[]byte]
job *safeChan.SafeChan[func()] job *safeChan.SafeChan[func()]
stop context.Context readyStop context.Context // 通知关闭的chan
stopFunc context.CancelFunc readyStopFunc context.CancelFunc // NotifyStop会调用该方法触发服务进入关闭流程
waitStop context.Context waitStop context.Context
waitStopFunc context.CancelFunc waitStopFunc context.CancelFunc
} }
func NewBaseService(type_, name string, onFunc IOnFunc, sender ISender) *BaseService { func NewBaseService(type_, name string, onFunc IOnFunc, sender ISender) *BaseService {
@ -37,7 +37,7 @@ func NewBaseService(type_, name string, onFunc IOnFunc, sender ISender) *BaseSer
msg: safeChan.NewSafeChan[[]byte](128), msg: safeChan.NewSafeChan[[]byte](128),
job: safeChan.NewSafeChan[func()](128), job: safeChan.NewSafeChan[func()](128),
} }
s.stop, s.stopFunc = context.WithCancel(context.Background()) s.readyStop, s.readyStopFunc = context.WithCancel(context.Background())
s.waitStop, s.waitStopFunc = context.WithCancel(context.Background()) s.waitStop, s.waitStopFunc = context.WithCancel(context.Background())
//s.Run() //s.Run()
@ -58,8 +58,8 @@ func (s *BaseService) Write(msg []byte) error {
func (s *BaseService) RunOnce(cb func()) { func (s *BaseService) RunOnce(cb func()) {
select { select {
case <-s.stop.Done(): case <-s.readyStop.Done():
log.Error(s.Log("want stop, can not call RunOnce function")) log.Error(s.Log("want readyStop, can not call RunOnce function"))
return return
default: default:
_ = s.job.Write(cb) _ = s.job.Write(cb)
@ -68,8 +68,8 @@ func (s *BaseService) RunOnce(cb func()) {
func (s *BaseService) RunWait(cb func() (retValue any)) (retValue any, err error) { func (s *BaseService) RunWait(cb func() (retValue any)) (retValue any, err error) {
select { select {
case <-s.stop.Done(): case <-s.readyStop.Done():
err = fmt.Errorf(s.Log("want stop, can not call RunOnce function")) err = fmt.Errorf(s.Log("want readyStop, can not call RunOnce function"))
log.Error(err.Error()) log.Error(err.Error())
return nil, err return nil, err
default: default:
@ -111,24 +111,21 @@ func (s *BaseService) WaitStop() {
} }
func (s *BaseService) NotifyStop() { func (s *BaseService) NotifyStop() {
s.stopFunc() s.readyStopFunc()
// log.Debug(fmt.Sprintf("notify %v service stop", s.name)) // log.Debug(fmt.Sprintf("notify %v service readyStop", s.name))
} }
func (s *BaseService) allChanEmpty() bool { //func (s *BaseService) allChanEmpty() bool {
if s.job.Size() == 0 && s.msg.Size() == 0 { // if s.job.Size() == 0 && s.msg.Size() == 0 {
return true // return true
} // }
return false // return false
} //}
func (s *BaseService) canStop() bool { func (s *BaseService) canStop() bool {
select { select {
case <-s.stop.Done(): case <-s.readyStop.Done():
if s.allChanEmpty() { return true
return true
}
return false
default: default:
return false return false
} }
@ -136,16 +133,19 @@ func (s *BaseService) canStop() bool {
func (s *BaseService) run() { func (s *BaseService) run() {
for { for {
if s.canStop() { if s.onFunc.CanStop() && s.canStop() {
if s.onFunc != nil { s.msg.Close()
s.onFunc.OnStop() s.job.Close()
s.waitStopFunc() s.Timer.CancelAllTimer()
} s.Timer.Close()
s.onFunc.OnStop()
s.waitStopFunc()
break break
} }
select { select {
case msg, ok := <-s.msg.Reader(): case msg, ok := <-s.msg.Reader():
if ok && s.onFunc != nil { if ok {
_ = s.onFunc.OnMessage(msg) _ = s.onFunc.OnMessage(msg)
} }
case cb, ok := <-s.job.Reader(): case cb, ok := <-s.job.Reader():
@ -156,13 +156,6 @@ func (s *BaseService) run() {
if ok && t != nil && t.Func != nil { if ok && t != nil && t.Func != nil {
t.Func() t.Func()
} }
case _ = <-s.stop.Done():
if s.onFunc != nil {
s.msg.Close()
s.job.Close()
s.Timer.CancelAllTimer()
s.Timer.Close()
}
} }
} }
} }

View File

@ -29,6 +29,8 @@ type ISender interface {
} }
type IOnFunc interface { type IOnFunc interface {
// 由子服务实现,比如玩法服需要等待所有玩家退出才能关闭服务
CanStop() bool
OnStop() OnStop()
OnInit() OnInit()
OnMessage(msg []byte) error OnMessage(msg []byte) error

View File

@ -1,28 +1,88 @@
package service package service
import ( import (
"fmt" "encoding/json"
"github.com/fox/fox/etcd"
"github.com/fox/fox/log"
"github.com/fox/fox/nat" "github.com/fox/fox/nat"
"github.com/nats-io/nats.go"
"os"
"sync"
"time" "time"
) )
type NatsService struct { const (
*BaseService updateTopic = "update.service.topic"
nats *nat.Nats )
// 初始化服务所需要的参数
type InitNatsServiceParams struct {
EtcdAddress []string
EtcdUsername string
EtcdPassword string
NatsAddress []string
ServiceType string // 本服务的服务类型
ServiceName string // 本服务的服务名,唯一索引
OnFunc IOnFunc // 包含初始化,停止,服务的消息处理
TypeId int // 与ServiceType一一对应
Version string
} }
func NewNatsService(type_, name string, onFunc IOnFunc, natsAddress ...string) (*NatsService, error) { type NatsService struct {
*BaseService
nats *nat.Nats
registry *etcd.Registry[etcd.ServiceNode]
node etcd.ServiceNode // 本服务节点信息
}
func NewNatsService(param *InitNatsServiceParams) (*NatsService, error) {
var err error
s := new(NatsService) s := new(NatsService)
s.BaseService = NewBaseService(type_, name, onFunc, s) s.BaseService = NewBaseService(param.ServiceType, param.ServiceName, param.OnFunc, s)
s.nats = nat.NewNats(fmt.Sprintf("%v-%v", type_, name), natsAddress...) if s.registry, err = etcd.NewRegistry[etcd.ServiceNode](param.EtcdAddress, param.EtcdUsername, param.EtcdPassword); err != nil {
if err := s.nats.Connect(); err != nil {
// log.Error(err.Error())
s.BaseService.NotifyStop()
return nil, err return nil, err
} }
s.node = etcd.ServiceNode{
TypeId: param.TypeId,
Name: s.Name(),
Type: s.Type(),
Version: param.Version,
}
if err = s.registry.Register(s.node); err != nil {
return nil, err
}
s.registry.WatchServices()
s.nats = nat.NewNats(param.ServiceName, param.NatsAddress...)
if err = s.nats.Connect(); err != nil {
s.registry.UnregisterService()
return nil, err
}
// 订阅本服务名的topic
if err = s.Subscribe(Topic(s)); err != nil {
log.Error(err.Error())
}
return s, nil return s, nil
} }
func (n *NatsService) subscribeUpdateService() error {
return n.nats.SubscribeCb(updateTopic, func(m *nats.Msg) {
var node = &etcd.ServiceNode{}
_ = json.Unmarshal(m.Data, node)
// 不是同类服务不处理,是自己发出来的更新,也不处理
if node.Type != n.Type() || node.Name == n.Name() {
return
}
// 有新服务上线,本服务准备退出
if n.node.Version < node.Version {
n.NotifyStop()
n.WaitStop()
log.InfoF("%v auto exit, initiating shutdown...", n.Name())
os.Exit(0)
}
})
}
func (n *NatsService) Subscribe(topic string) error { func (n *NatsService) Subscribe(topic string) error {
return n.nats.Subscribe(topic, n.msg) return n.nats.Subscribe(topic, n.msg)
} }
@ -46,3 +106,13 @@ func (s *NatsService) Call(topic string, timeout time.Duration, msg []byte) ([]b
_ = msg _ = msg
return nil, nil return nil, nil
} }
// 从etcd中获取所有服务节点
func (s *NatsService) GetServiceNodes() *sync.Map {
return s.registry.GetNodes()
}
// 查找指定的服务节点信息
func (s *NatsService) FindServiceNode(serviceName string) (etcd.ServiceNode, error) {
return s.registry.FindNode(serviceName)
}

View File

@ -1,7 +1,7 @@
package service package service
import ( import (
"github.com/fox/fox/etcd" "fmt"
"github.com/fox/fox/log" "github.com/fox/fox/log"
"testing" "testing"
"time" "time"
@ -9,12 +9,19 @@ import (
const ( const (
GameSrv = "game" GameSrv = "game"
GameType = 1001
NatsAddress = "nats://192.168.232.128:4222" NatsAddress = "nats://192.168.232.128:4222"
EtcdAddress = "192.168.232.128:2379" EtcdAddress = "192.168.232.128:2379"
EtcdAddress2 = "114.132.124.145:2379" EtcdAddress2 = "114.132.124.145:2379"
NatsAddress2 = "nats://114.132.124.145:4222" NatsAddress2 = "nats://114.132.124.145:4222"
) )
type gameService struct {
*NatsService
//etcdTopic *etcd.Registry[etcd.TopicNode]
//srvTopic string
}
func newGameService() *gameService { func newGameService() *gameService {
_ = NatsAddress2 _ = NatsAddress2
_ = NatsAddress _ = NatsAddress
@ -23,45 +30,31 @@ func newGameService() *gameService {
var err error var err error
s := new(gameService) s := new(gameService)
if s.NatsService, err = NewNatsService(GameSrv, "1", s, NatsAddress2); err != nil { if s.NatsService, err = NewNatsService(&InitNatsServiceParams{
EtcdAddress: []string{EtcdAddress2},
EtcdUsername: "",
EtcdPassword: "",
NatsAddress: []string{NatsAddress2},
ServiceType: GameSrv,
ServiceName: fmt.Sprintf("%s-%d", GameSrv, 0),
OnFunc: nil,
TypeId: GameType,
Version: time.Now().Format("20060102150405"),
}); err != nil {
log.Fatal(err.Error()) log.Fatal(err.Error())
return nil return nil
} }
if s.etcdService, err = etcd.NewRegistry[etcd.ServiceNode]([]string{EtcdAddress2}, etcd.ServiceNode{}.EtcdRootKey(), "", ""); err != nil { //if s.etcdTopic, err = etcd.NewRegistry[etcd.TopicNode]([]string{EtcdAddress2}, "", ""); err != nil {
log.Error(err.Error()) // log.Error(err.Error())
s.NatsService.OnStop() // s.NatsService.OnStop()
return nil // return nil
} //}
endpoint := &etcd.ServiceNode{
Name: s.Name(),
Type: s.Type(),
Address: "",
Port: 0,
Version: "",
ServiceType: etcd.Unique,
}
if err = s.etcdService.Register(endpoint); err != nil {
log.Error(err.Error())
s.NatsService.OnStop()
return nil
}
s.OnInit() s.OnInit()
return s return s
} }
type gameService struct {
*NatsService
etcdService *etcd.Registry[etcd.ServiceNode]
etcdTopic *etcd.Registry[etcd.TopicNode]
srvTopic string
}
func (s *gameService) OnInit() { func (s *gameService) OnInit() {
s.etcdService.WatchServices() //s.etcdTopic.WatchServices()
s.etcdTopic.WatchServices()
if err := s.NatsService.Subscribe(Topic(s)); err != nil {
log.Error(err.Error())
}
if err := s.NatsService.QueueSubscribe(GroupTopic(s), GroupQueue(s)); err != nil { if err := s.NatsService.QueueSubscribe(GroupTopic(s), GroupQueue(s)); err != nil {
log.Error(err.Error()) log.Error(err.Error())
} }
@ -70,8 +63,7 @@ func (s *gameService) OnInit() {
} }
func (s *gameService) OnStop() { func (s *gameService) OnStop() {
s.etcdService.UnregisterService() //s.etcdTopic.UnregisterService()
s.etcdService.UnregisterService()
s.NatsService.OnStop() s.NatsService.OnStop()
log.Debug("OnStop") log.Debug("OnStop")
} }
@ -88,7 +80,7 @@ func TestGameService(t *testing.T) {
if err := s.Send(Topic(s), []byte(msg)); err != nil { if err := s.Send(Topic(s), []byte(msg)); err != nil {
log.Error(err.Error()) log.Error(err.Error())
} }
s.etcdService.GetNodes().Range(func(key, value interface{}) bool { s.GetServiceNodes().Range(func(key, value interface{}) bool {
log.Debug(s.Log("发现有服务:%v", value)) log.Debug(s.Log("发现有服务:%v", value))
return true return true
}) })

View File

@ -22,6 +22,11 @@ func (s *EchoService) OnStop() {
log.Debug("OnStop") log.Debug("OnStop")
} }
func (s *EchoService) CanStop() bool {
log.Debug("CanStop")
return true
}
func (s *EchoService) Send(topic string, msg []byte) error { func (s *EchoService) Send(topic string, msg []byte) error {
log.Debug(s.Log("send %v to topic:%v", string(msg), topic)) log.Debug(s.Log("send %v to topic:%v", string(msg), topic))
return nil return nil