diff --git a/etcd/etcd.go b/etcd/etcd.go index 8b1df65..24dda6f 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -17,10 +17,10 @@ type Registry[T INode] struct { nodes sync.Map } -func NewRegistry[T INode](endpoints []string, rootKey, username, password string) (*Registry[T], error) { +func NewRegistry[T INode](endpoints []string, username, password string) (*Registry[T], error) { var err error e := &Registry[T]{} - e.etcdRegistryImpl, err = newServiceRegistryImpl(endpoints, rootKey, username, password, e.saveNode) + e.etcdRegistryImpl, err = newServiceRegistryImpl(endpoints, resultT[T]{}.Value.EtcdRootKey(), username, password, e.saveNode) return e, err } @@ -46,7 +46,7 @@ func (r *Registry[T]) GetNodes() *sync.Map { return &r.nodes } -// 获取当前根节点下所有节点信息 +// 根据inode的mapKey()查找对应的节点 func (r *Registry[T]) FindNode(key string) (T, error) { var tmp = resultT[T]{Err: nil} v, ok := r.nodes.Load(key) diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index 045d8ec..794cd1a 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -33,21 +33,21 @@ func TestService(t *testing.T) { _ = etcdAddress2 log.Open("test.log", log.DebugL) // 创建注册中心 - registry, err := NewRegistry[ServiceNode]([]string{etcdAddress2}, rootKeyServices, "", "") + registry, err := NewRegistry[ServiceNode]([]string{etcdAddress2}, "", "") if err != nil { log.Fatal(err.Error()) + return } // 注册示例服务 service := &ServiceNode{ - Name: "instance-1", - Type: "user-service", - Address: "localhost", - Port: 8080, + Name: "instance-1", + Type: "user-service", } if err := registry.Register(service); err != nil { log.Fatal(err.Error()) + return } // 监控服务变化 @@ -62,9 +62,10 @@ func TestTopicRegistry(t *testing.T) { _ = etcdAddress2 log.Open("test.log", log.DebugL) // 创建注册中心 - registry, err := NewRegistry[TopicNode]([]string{etcdAddress2}, rootKeyTopic, "", "") + registry, err := NewRegistry[TopicNode]([]string{etcdAddress2}, "", "") if err != nil { log.Fatal(err.Error()) + return } // 注册示例服务 @@ -79,12 +80,12 @@ func TestTopicRegistry(t *testing.T) { // 监控服务变化 registry.WatchServices() - for _, n := range registry.GetNodes() { - log.DebugF("发现topic:%v, 创建者:%v", n.Name, n.Creator) - if v, err := registry.FindNode(n.MapKey()); err == nil { - log.DebugF("topic:%v exist", v.Name) + registry.GetNodes().Range(func(k, v interface{}) bool { + if node, ok := v.(TopicNode); ok { + log.DebugF("this is topic:%v", node.Name) } - } + return true + }) time.Sleep(60 * time.Second) registry.UnregisterService() diff --git a/etcd/inode.go b/etcd/inode.go index 3ffeff2..52ba877 100644 --- a/etcd/inode.go +++ b/etcd/inode.go @@ -16,13 +16,10 @@ type INode interface { // 服务节点信息,TypeId及Type都是标记同类型的节点,Name是区别该节点与其它节点的字段 type ServiceNode struct { - TypeId int `json:"type_id"` // 服务类型id,与Type字段功能一样, 由proto定义,gate通过该字段找到这类服务的所有节点信息 - Name string `json:"name"` // 服务名 多个同类服务依赖name区分:lobby1,lobby2,lobby3等等 - Type string `json:"type"` // 服务类型:lobby, game, gate等等 - Address string `json:"address"` // 地址 - Port int `json:"port"` // 端口 - Version string `json:"version"` // 版本号 - ServiceType ServiceType `json:"service_type"` // 服务类型 + TypeId int `json:"type_id"` // 服务类型id,与Type字段功能一样, 由proto定义,gate通过该字段找到这类服务的所有节点信息 + Name string `json:"name"` // 服务名 多个同类服务依赖name区分:lobby1,lobby2,lobby3等等 + Type string `json:"type"` // 服务类型:lobby, game, gate等等 + Version string `json:"version"` // 版本号 } func (s ServiceNode) EtcdKey() string { diff --git a/nat/nats.go b/nat/nats.go index 00eb993..a7f18e7 100644 --- a/nat/nats.go +++ b/nat/nats.go @@ -96,6 +96,17 @@ func (n *Nats) SubscribeRpc(topic string, rpcHandler RpcHandler) error { return nil } +// 订阅回调,极少用。调用方自行保证并发性问题 +func (n *Nats) SubscribeCb(topic string, cb func(m *nats.Msg)) error { + sub, err := n.nc.Subscribe(topic, cb) + if err != nil { + return err + } + n.addSub(sub) + return nil +} + +// 订阅,消息统一放入msgChan交给服务自行处理 func (n *Nats) Subscribe(topic string, msgChan *safeChan.ByteChan) error { sub, err := n.nc.Subscribe(topic, func(m *nats.Msg) { _ = msgChan.Write(m.Data) diff --git a/service/baseService.go b/service/baseService.go index 3bdf016..88c8bca 100644 --- a/service/baseService.go +++ b/service/baseService.go @@ -18,12 +18,12 @@ type BaseService struct { onFunc IOnFunc sender ISender - msg *safeChan.SafeChan[[]byte] - job *safeChan.SafeChan[func()] - stop context.Context - stopFunc context.CancelFunc - waitStop context.Context - waitStopFunc context.CancelFunc + msg *safeChan.SafeChan[[]byte] + job *safeChan.SafeChan[func()] + readyStop context.Context // 通知关闭的chan + readyStopFunc context.CancelFunc // NotifyStop会调用该方法,触发服务进入关闭流程 + waitStop context.Context + waitStopFunc context.CancelFunc } func NewBaseService(type_, name string, onFunc IOnFunc, sender ISender) *BaseService { @@ -37,7 +37,7 @@ func NewBaseService(type_, name string, onFunc IOnFunc, sender ISender) *BaseSer msg: safeChan.NewSafeChan[[]byte](128), job: safeChan.NewSafeChan[func()](128), } - s.stop, s.stopFunc = context.WithCancel(context.Background()) + s.readyStop, s.readyStopFunc = context.WithCancel(context.Background()) s.waitStop, s.waitStopFunc = context.WithCancel(context.Background()) //s.Run() @@ -58,8 +58,8 @@ func (s *BaseService) Write(msg []byte) error { func (s *BaseService) RunOnce(cb func()) { select { - case <-s.stop.Done(): - log.Error(s.Log("want stop, can not call RunOnce function")) + case <-s.readyStop.Done(): + log.Error(s.Log("want readyStop, can not call RunOnce function")) return default: _ = s.job.Write(cb) @@ -68,8 +68,8 @@ func (s *BaseService) RunOnce(cb func()) { func (s *BaseService) RunWait(cb func() (retValue any)) (retValue any, err error) { select { - case <-s.stop.Done(): - err = fmt.Errorf(s.Log("want stop, can not call RunOnce function")) + case <-s.readyStop.Done(): + err = fmt.Errorf(s.Log("want readyStop, can not call RunOnce function")) log.Error(err.Error()) return nil, err default: @@ -111,24 +111,21 @@ func (s *BaseService) WaitStop() { } func (s *BaseService) NotifyStop() { - s.stopFunc() - // log.Debug(fmt.Sprintf("notify %v service stop", s.name)) + s.readyStopFunc() + // log.Debug(fmt.Sprintf("notify %v service readyStop", s.name)) } -func (s *BaseService) allChanEmpty() bool { - if s.job.Size() == 0 && s.msg.Size() == 0 { - return true - } - return false -} +//func (s *BaseService) allChanEmpty() bool { +// if s.job.Size() == 0 && s.msg.Size() == 0 { +// return true +// } +// return false +//} func (s *BaseService) canStop() bool { select { - case <-s.stop.Done(): - if s.allChanEmpty() { - return true - } - return false + case <-s.readyStop.Done(): + return true default: return false } @@ -136,16 +133,19 @@ func (s *BaseService) canStop() bool { func (s *BaseService) run() { for { - if s.canStop() { - if s.onFunc != nil { - s.onFunc.OnStop() - s.waitStopFunc() - } + if s.onFunc.CanStop() && s.canStop() { + s.msg.Close() + s.job.Close() + s.Timer.CancelAllTimer() + s.Timer.Close() + + s.onFunc.OnStop() + s.waitStopFunc() break } select { case msg, ok := <-s.msg.Reader(): - if ok && s.onFunc != nil { + if ok { _ = s.onFunc.OnMessage(msg) } case cb, ok := <-s.job.Reader(): @@ -156,13 +156,6 @@ func (s *BaseService) run() { if ok && t != nil && t.Func != nil { t.Func() } - case _ = <-s.stop.Done(): - if s.onFunc != nil { - s.msg.Close() - s.job.Close() - s.Timer.CancelAllTimer() - s.Timer.Close() - } } } } diff --git a/service/iservice.go b/service/iservice.go index 99f6074..f916a6a 100644 --- a/service/iservice.go +++ b/service/iservice.go @@ -29,6 +29,8 @@ type ISender interface { } type IOnFunc interface { + // 由子服务实现,比如玩法服需要等待所有玩家退出才能关闭服务 + CanStop() bool OnStop() OnInit() OnMessage(msg []byte) error diff --git a/service/natsService.go b/service/natsService.go index 27096b7..67e5de0 100644 --- a/service/natsService.go +++ b/service/natsService.go @@ -1,28 +1,88 @@ package service import ( - "fmt" + "encoding/json" + "github.com/fox/fox/etcd" + "github.com/fox/fox/log" "github.com/fox/fox/nat" + "github.com/nats-io/nats.go" + "os" + "sync" "time" ) -type NatsService struct { - *BaseService - nats *nat.Nats +const ( + updateTopic = "update.service.topic" +) + +// 初始化服务所需要的参数 +type InitNatsServiceParams struct { + EtcdAddress []string + EtcdUsername string + EtcdPassword string + NatsAddress []string + ServiceType string // 本服务的服务类型 + ServiceName string // 本服务的服务名,唯一索引 + OnFunc IOnFunc // 包含初始化,停止,服务的消息处理 + TypeId int // 与ServiceType一一对应 + Version string } -func NewNatsService(type_, name string, onFunc IOnFunc, natsAddress ...string) (*NatsService, error) { +type NatsService struct { + *BaseService + nats *nat.Nats + registry *etcd.Registry[etcd.ServiceNode] + node etcd.ServiceNode // 本服务节点信息 +} + +func NewNatsService(param *InitNatsServiceParams) (*NatsService, error) { + var err error s := new(NatsService) - s.BaseService = NewBaseService(type_, name, onFunc, s) - s.nats = nat.NewNats(fmt.Sprintf("%v-%v", type_, name), natsAddress...) - if err := s.nats.Connect(); err != nil { - // log.Error(err.Error()) - s.BaseService.NotifyStop() + s.BaseService = NewBaseService(param.ServiceType, param.ServiceName, param.OnFunc, s) + if s.registry, err = etcd.NewRegistry[etcd.ServiceNode](param.EtcdAddress, param.EtcdUsername, param.EtcdPassword); err != nil { return nil, err } + s.node = etcd.ServiceNode{ + TypeId: param.TypeId, + Name: s.Name(), + Type: s.Type(), + Version: param.Version, + } + if err = s.registry.Register(s.node); err != nil { + return nil, err + } + s.registry.WatchServices() + + s.nats = nat.NewNats(param.ServiceName, param.NatsAddress...) + if err = s.nats.Connect(); err != nil { + s.registry.UnregisterService() + return nil, err + } + // 订阅本服务名的topic + if err = s.Subscribe(Topic(s)); err != nil { + log.Error(err.Error()) + } return s, nil } +func (n *NatsService) subscribeUpdateService() error { + return n.nats.SubscribeCb(updateTopic, func(m *nats.Msg) { + var node = &etcd.ServiceNode{} + _ = json.Unmarshal(m.Data, node) + // 不是同类服务不处理,是自己发出来的更新,也不处理 + if node.Type != n.Type() || node.Name == n.Name() { + return + } + // 有新服务上线,本服务准备退出 + if n.node.Version < node.Version { + n.NotifyStop() + n.WaitStop() + log.InfoF("%v auto exit, initiating shutdown...", n.Name()) + os.Exit(0) + } + }) +} + func (n *NatsService) Subscribe(topic string) error { return n.nats.Subscribe(topic, n.msg) } @@ -46,3 +106,13 @@ func (s *NatsService) Call(topic string, timeout time.Duration, msg []byte) ([]b _ = msg return nil, nil } + +// 从etcd中获取所有服务节点 +func (s *NatsService) GetServiceNodes() *sync.Map { + return s.registry.GetNodes() +} + +// 查找指定的服务节点信息 +func (s *NatsService) FindServiceNode(serviceName string) (etcd.ServiceNode, error) { + return s.registry.FindNode(serviceName) +} diff --git a/service/natsService_test.go b/service/natsService_test.go index 2755804..d199b4f 100644 --- a/service/natsService_test.go +++ b/service/natsService_test.go @@ -1,7 +1,7 @@ package service import ( - "github.com/fox/fox/etcd" + "fmt" "github.com/fox/fox/log" "testing" "time" @@ -9,12 +9,19 @@ import ( const ( GameSrv = "game" + GameType = 1001 NatsAddress = "nats://192.168.232.128:4222" EtcdAddress = "192.168.232.128:2379" EtcdAddress2 = "114.132.124.145:2379" NatsAddress2 = "nats://114.132.124.145:4222" ) +type gameService struct { + *NatsService + //etcdTopic *etcd.Registry[etcd.TopicNode] + //srvTopic string +} + func newGameService() *gameService { _ = NatsAddress2 _ = NatsAddress @@ -23,45 +30,31 @@ func newGameService() *gameService { var err error s := new(gameService) - if s.NatsService, err = NewNatsService(GameSrv, "1", s, NatsAddress2); err != nil { + if s.NatsService, err = NewNatsService(&InitNatsServiceParams{ + EtcdAddress: []string{EtcdAddress2}, + EtcdUsername: "", + EtcdPassword: "", + NatsAddress: []string{NatsAddress2}, + ServiceType: GameSrv, + ServiceName: fmt.Sprintf("%s-%d", GameSrv, 0), + OnFunc: nil, + TypeId: GameType, + Version: time.Now().Format("20060102150405"), + }); err != nil { log.Fatal(err.Error()) return nil } - if s.etcdService, err = etcd.NewRegistry[etcd.ServiceNode]([]string{EtcdAddress2}, etcd.ServiceNode{}.EtcdRootKey(), "", ""); err != nil { - log.Error(err.Error()) - s.NatsService.OnStop() - return nil - } - endpoint := &etcd.ServiceNode{ - Name: s.Name(), - Type: s.Type(), - Address: "", - Port: 0, - Version: "", - ServiceType: etcd.Unique, - } - if err = s.etcdService.Register(endpoint); err != nil { - log.Error(err.Error()) - s.NatsService.OnStop() - return nil - } + //if s.etcdTopic, err = etcd.NewRegistry[etcd.TopicNode]([]string{EtcdAddress2}, "", ""); err != nil { + // log.Error(err.Error()) + // s.NatsService.OnStop() + // return nil + //} s.OnInit() return s } -type gameService struct { - *NatsService - etcdService *etcd.Registry[etcd.ServiceNode] - etcdTopic *etcd.Registry[etcd.TopicNode] - srvTopic string -} - func (s *gameService) OnInit() { - s.etcdService.WatchServices() - s.etcdTopic.WatchServices() - if err := s.NatsService.Subscribe(Topic(s)); err != nil { - log.Error(err.Error()) - } + //s.etcdTopic.WatchServices() if err := s.NatsService.QueueSubscribe(GroupTopic(s), GroupQueue(s)); err != nil { log.Error(err.Error()) } @@ -70,8 +63,7 @@ func (s *gameService) OnInit() { } func (s *gameService) OnStop() { - s.etcdService.UnregisterService() - s.etcdService.UnregisterService() + //s.etcdTopic.UnregisterService() s.NatsService.OnStop() log.Debug("OnStop") } @@ -88,7 +80,7 @@ func TestGameService(t *testing.T) { if err := s.Send(Topic(s), []byte(msg)); err != nil { log.Error(err.Error()) } - s.etcdService.GetNodes().Range(func(key, value interface{}) bool { + s.GetServiceNodes().Range(func(key, value interface{}) bool { log.Debug(s.Log("发现有服务:%v", value)) return true }) diff --git a/service/service_test.go b/service/service_test.go index 70b8651..eb0222a 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -22,6 +22,11 @@ func (s *EchoService) OnStop() { log.Debug("OnStop") } +func (s *EchoService) CanStop() bool { + log.Debug("CanStop") + return true +} + func (s *EchoService) Send(topic string, msg []byte) error { log.Debug(s.Log("send %v to topic:%v", string(msg), topic)) return nil