diff --git a/etcd/etcd.go b/etcd/etcd.go index 3c21d71..1f42ae3 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -31,7 +31,7 @@ func (r *Registry[T]) saveNode(newNodes *sync.Map, jsonBytes []byte) { if err := json.Unmarshal(jsonBytes, tmp); err != nil { log.ErrorF(err.Error()) } - newNodes.Store((*tmp).MapKey(), tmp) + newNodes.Store((*tmp).MapKey(), *tmp) } // 保存当前服务 @@ -45,14 +45,14 @@ func (r *Registry[T]) GetNodes() *sync.Map { } // 根据inode的mapKey()查找对应的节点 -func (r *Registry[T]) FindNode(key string) (*T, error) { +func (r *Registry[T]) FindNode(key string) (T, error) { var tmp = new(T) v, ok := r.nodes.Load(key) if !ok { - return tmp, fmt.Errorf("%v not exist", key) + return *tmp, fmt.Errorf("%v not exist", key) } - if tmp, ok = v.(*T); ok { - return tmp, nil + if node, ok := v.(T); ok { + return node, nil } - return nil, fmt.Errorf("%v 类型转换失败", key) + return *tmp, fmt.Errorf("%v 类型转换失败", key) } diff --git a/etcd/etcdImpl.go b/etcd/etcdImpl.go index 68aaf5d..615dc7e 100644 --- a/etcd/etcdImpl.go +++ b/etcd/etcdImpl.go @@ -47,7 +47,11 @@ func newServiceRegistryImpl(endpoints []string, rootKey, username, password stri meNodeKey: meKey, meNodeValue: meValue, } - return impl, impl.register() + if err = impl.register(); err != nil { + return nil, err + } + impl.watchServices() + return impl, nil } // 注册服务 RegisterService @@ -163,7 +167,7 @@ func (sr *etcdRegistryImpl) discoverServices() error { } // 监控服务变化 -func (sr *etcdRegistryImpl) WatchServices() { +func (sr *etcdRegistryImpl) watchServices() { watchKey := fmt.Sprintf("/%s/", sr.rootKey) ksync.GoSafe(func() { rch := sr.cli.Watch(context.Background(), watchKey, clientv3.WithPrefix()) diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index 7127665..e82fd04 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -43,7 +43,7 @@ func TestService(t *testing.T) { } // 监控服务变化 - registry.WatchServices() + registry.watchServices() time.Sleep(60 * time.Second) registry.UnregisterService() @@ -64,7 +64,7 @@ func TestTopicRegistry(t *testing.T) { } // 监控服务变化 - registry.WatchServices() + registry.watchServices() registry.GetNodes().Range(func(k, v interface{}) bool { if node, ok := v.(TopicNode); ok { log.DebugF("this is topic:%v", node.Name) diff --git a/service/natsService.go b/service/natsService.go index c337e13..4c9c11f 100644 --- a/service/natsService.go +++ b/service/natsService.go @@ -43,20 +43,16 @@ func NewNatsService(param *InitNatsServiceParams) (*NatsService, error) { var err error s := new(NatsService) s.BaseService = NewBaseService(param.ServiceType, param.ServiceName, param.OnFunc, s) - if s.registry, err = etcd.NewRegistry[etcd.ServiceNode](param.EtcdAddress, param.EtcdUsername, param.EtcdPassword); err != nil { - return nil, err - } - s.RpcProcessor = processor.NewRpcProcessor() s.node = etcd.ServiceNode{ TypeId: param.TypeId, Name: s.Name(), Type: s.Type(), Version: param.Version, } - if err = s.registry.Register(s.node); err != nil { + if s.registry, err = etcd.NewRegistry[etcd.ServiceNode](param.EtcdAddress, param.EtcdUsername, param.EtcdPassword, s.node); err != nil { return nil, err } - s.registry.WatchServices() + s.RpcProcessor = processor.NewRpcProcessor() s.nats = nat.NewNats(param.ServiceName, param.NatsAddress...) if err = s.nats.Connect(); err != nil { diff --git a/service/natsService_test.go b/service/natsService_test.go index 571d4eb..5c4c48b 100644 --- a/service/natsService_test.go +++ b/service/natsService_test.go @@ -55,7 +55,7 @@ func newGameService() *gameService { } func (s *gameService) OnInit() { - //s.etcdTopic.WatchServices() + //s.etcdTopic.watchServices() if err := s.NatsService.QueueSubscribe(GroupTopic(s), GroupQueue(s)); err != nil { log.Error(err.Error()) }