etcd重连机制

This commit is contained in:
liuxiaobo 2025-06-16 22:06:59 +08:00
parent 253c69783f
commit 5dd6f84197
5 changed files with 17 additions and 17 deletions

View File

@ -31,7 +31,7 @@ func (r *Registry[T]) saveNode(newNodes *sync.Map, jsonBytes []byte) {
if err := json.Unmarshal(jsonBytes, tmp); err != nil { if err := json.Unmarshal(jsonBytes, tmp); err != nil {
log.ErrorF(err.Error()) log.ErrorF(err.Error())
} }
newNodes.Store((*tmp).MapKey(), tmp) newNodes.Store((*tmp).MapKey(), *tmp)
} }
// 保存当前服务 // 保存当前服务
@ -45,14 +45,14 @@ func (r *Registry[T]) GetNodes() *sync.Map {
} }
// 根据inode的mapKey()查找对应的节点 // 根据inode的mapKey()查找对应的节点
func (r *Registry[T]) FindNode(key string) (*T, error) { func (r *Registry[T]) FindNode(key string) (T, error) {
var tmp = new(T) var tmp = new(T)
v, ok := r.nodes.Load(key) v, ok := r.nodes.Load(key)
if !ok { if !ok {
return tmp, fmt.Errorf("%v not exist", key) return *tmp, fmt.Errorf("%v not exist", key)
} }
if tmp, ok = v.(*T); ok { if node, ok := v.(T); ok {
return tmp, nil return node, nil
} }
return nil, fmt.Errorf("%v 类型转换失败", key) return *tmp, fmt.Errorf("%v 类型转换失败", key)
} }

View File

@ -47,7 +47,11 @@ func newServiceRegistryImpl(endpoints []string, rootKey, username, password stri
meNodeKey: meKey, meNodeKey: meKey,
meNodeValue: meValue, meNodeValue: meValue,
} }
return impl, impl.register() if err = impl.register(); err != nil {
return nil, err
}
impl.watchServices()
return impl, nil
} }
// 注册服务 RegisterService // 注册服务 RegisterService
@ -163,7 +167,7 @@ func (sr *etcdRegistryImpl) discoverServices() error {
} }
// 监控服务变化 // 监控服务变化
func (sr *etcdRegistryImpl) WatchServices() { func (sr *etcdRegistryImpl) watchServices() {
watchKey := fmt.Sprintf("/%s/", sr.rootKey) watchKey := fmt.Sprintf("/%s/", sr.rootKey)
ksync.GoSafe(func() { ksync.GoSafe(func() {
rch := sr.cli.Watch(context.Background(), watchKey, clientv3.WithPrefix()) rch := sr.cli.Watch(context.Background(), watchKey, clientv3.WithPrefix())

View File

@ -43,7 +43,7 @@ func TestService(t *testing.T) {
} }
// 监控服务变化 // 监控服务变化
registry.WatchServices() registry.watchServices()
time.Sleep(60 * time.Second) time.Sleep(60 * time.Second)
registry.UnregisterService() registry.UnregisterService()
@ -64,7 +64,7 @@ func TestTopicRegistry(t *testing.T) {
} }
// 监控服务变化 // 监控服务变化
registry.WatchServices() registry.watchServices()
registry.GetNodes().Range(func(k, v interface{}) bool { registry.GetNodes().Range(func(k, v interface{}) bool {
if node, ok := v.(TopicNode); ok { if node, ok := v.(TopicNode); ok {
log.DebugF("this is topic:%v", node.Name) log.DebugF("this is topic:%v", node.Name)

View File

@ -43,20 +43,16 @@ func NewNatsService(param *InitNatsServiceParams) (*NatsService, error) {
var err error var err error
s := new(NatsService) s := new(NatsService)
s.BaseService = NewBaseService(param.ServiceType, param.ServiceName, param.OnFunc, s) s.BaseService = NewBaseService(param.ServiceType, param.ServiceName, param.OnFunc, s)
if s.registry, err = etcd.NewRegistry[etcd.ServiceNode](param.EtcdAddress, param.EtcdUsername, param.EtcdPassword); err != nil {
return nil, err
}
s.RpcProcessor = processor.NewRpcProcessor()
s.node = etcd.ServiceNode{ s.node = etcd.ServiceNode{
TypeId: param.TypeId, TypeId: param.TypeId,
Name: s.Name(), Name: s.Name(),
Type: s.Type(), Type: s.Type(),
Version: param.Version, Version: param.Version,
} }
if err = s.registry.Register(s.node); err != nil { if s.registry, err = etcd.NewRegistry[etcd.ServiceNode](param.EtcdAddress, param.EtcdUsername, param.EtcdPassword, s.node); err != nil {
return nil, err return nil, err
} }
s.registry.WatchServices() s.RpcProcessor = processor.NewRpcProcessor()
s.nats = nat.NewNats(param.ServiceName, param.NatsAddress...) s.nats = nat.NewNats(param.ServiceName, param.NatsAddress...)
if err = s.nats.Connect(); err != nil { if err = s.nats.Connect(); err != nil {

View File

@ -55,7 +55,7 @@ func newGameService() *gameService {
} }
func (s *gameService) OnInit() { func (s *gameService) OnInit() {
//s.etcdTopic.WatchServices() //s.etcdTopic.watchServices()
if err := s.NatsService.QueueSubscribe(GroupTopic(s), GroupQueue(s)); err != nil { if err := s.NatsService.QueueSubscribe(GroupTopic(s), GroupQueue(s)); err != nil {
log.Error(err.Error()) log.Error(err.Error())
} }