etcd重连机制

This commit is contained in:
liuxiaobo 2025-06-16 22:00:26 +08:00
parent d6e7c512ad
commit 253c69783f
4 changed files with 62 additions and 67 deletions

View File

@ -7,38 +7,31 @@ import (
"sync"
)
type resultT[T any] struct {
Value T
}
type Registry[T INode] struct {
*etcdRegistryImpl
nodes *sync.Map
me T
}
func NewRegistry[T INode](endpoints []string, username, password string, me T) (*Registry[T], error) {
bs, err := json.Marshal(me)
if err != nil {
return nil, err
}
func NewRegistry[T INode](endpoints []string, username, password string) (*Registry[T], error) {
var err error
e := &Registry[T]{}
e.etcdRegistryImpl, err = newServiceRegistryImpl(endpoints, resultT[T]{}.Value.EtcdRootKey(), username, password, e.saveNode, e.replace)
e.etcdRegistryImpl, err = newServiceRegistryImpl(endpoints, me.EtcdRootKey(), username, password, e, me.EtcdKey(), string(bs))
e.nodes = &sync.Map{}
return e, err
}
func (r *Registry[T]) Register(node INode) error {
bs, err := json.Marshal(node)
if err != nil {
return err
}
return r.etcdRegistryImpl.Register(node.EtcdKey(), string(bs))
}
// 保存当前服务
func (r *Registry[T]) saveNode(newNodes *sync.Map, jsonBytes []byte) {
var tmp = resultT[T]{}
if err := json.Unmarshal(jsonBytes, &tmp.Value); err != nil {
var tmp = new(T)
if err := json.Unmarshal(jsonBytes, tmp); err != nil {
log.ErrorF(err.Error())
}
newNodes.Store(tmp.Value.MapKey(), tmp.Value)
newNodes.Store((*tmp).MapKey(), tmp)
}
// 保存当前服务
@ -52,14 +45,14 @@ func (r *Registry[T]) GetNodes() *sync.Map {
}
// 根据inode的mapKey()查找对应的节点
func (r *Registry[T]) FindNode(key string) (T, error) {
var tmp = resultT[T]{}
func (r *Registry[T]) FindNode(key string) (*T, error) {
var tmp = new(T)
v, ok := r.nodes.Load(key)
if !ok {
return tmp.Value, fmt.Errorf("%v not exist", key)
return tmp, fmt.Errorf("%v not exist", key)
}
if tmp.Value, ok = v.(T); ok {
return tmp.Value, nil
if tmp, ok = v.(*T); ok {
return tmp, nil
}
return tmp.Value, fmt.Errorf("%v 类型转换失败", key)
return nil, fmt.Errorf("%v 类型转换失败", key)
}

View File

@ -15,21 +15,21 @@ const (
DefaultDialTimeout = 3 * time.Second // 默认拨号超时时间
DefaultLeaseTTL = int64(10) // 默认租约TTL为60秒
DefaultKeepAliveInterval = 5 * time.Second // 默认续租间隔为30秒
KeepAliveFailCount = 100
KeepAliveFailCount = 1
)
type etcdRegistryImpl struct {
cli *clientv3.Client
leaseID clientv3.LeaseID
nodeKey string
meNodeKey string // 自身节点key
meNodeValue string // 自身节点json
cancelFunc context.CancelFunc
rootKey string
saveNodeFunc func(*sync.Map, []byte)
replaceFunc func(*sync.Map)
nodeOperator iNodeOperator
}
// 创建服务注册中心
func newServiceRegistryImpl(endpoints []string, rootKey, username, password string, saveNode func(*sync.Map, []byte), replace func(*sync.Map)) (*etcdRegistryImpl, error) {
func newServiceRegistryImpl(endpoints []string, rootKey, username, password string, operator iNodeOperator, meKey, meValue string) (*etcdRegistryImpl, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: DefaultDialTimeout,
@ -40,20 +40,21 @@ func newServiceRegistryImpl(endpoints []string, rootKey, username, password stri
return nil, err
}
return &etcdRegistryImpl{
impl := &etcdRegistryImpl{
cli: cli,
rootKey: rootKey,
saveNodeFunc: saveNode,
replaceFunc: replace,
}, nil
nodeOperator: operator,
meNodeKey: meKey,
meNodeValue: meValue,
}
return impl, impl.register()
}
// 注册服务 RegisterService
func (sr *etcdRegistryImpl) Register(key, value string) error {
func (sr *etcdRegistryImpl) register() error {
// 生成唯一服务key
// /services/serviceType/serviceName
sr.nodeKey = key
log.DebugF("register %s to etcd", key)
log.DebugF("register key:%s value:%v to etcd", sr.meNodeKey, sr.meNodeValue)
// 创建租约
ctx, cancel := context.WithCancel(context.Background())
@ -73,13 +74,13 @@ func (sr *etcdRegistryImpl) Register(key, value string) error {
// }
// 写入ETCD
_, err = sr.cli.Put(ctx, key, value, clientv3.WithLease(sr.leaseID))
_, err = sr.cli.Put(ctx, sr.meNodeKey, sr.meNodeValue, clientv3.WithLease(sr.leaseID))
if err != nil {
return err
}
// 启动自动续租
go sr.keepAlive(ctx)
ksync.GoSafe(func() { sr.keepAlive(ctx) }, nil)
if err = sr.discoverServices(); err == nil {
// ss := sr.GetService()
@ -105,6 +106,11 @@ func (sr *etcdRegistryImpl) keepAlive(ctx context.Context) {
if retryCount > KeepAliveFailCount {
log.DebugF("KeepAlive failed after %d retries: %v", KeepAliveFailCount, err)
sr.UnregisterService()
if err = sr.register(); err != nil {
log.ErrorF("Reconnect failed: %v", err)
} else {
retryCount = 0
}
return
}
retryCount++
@ -127,10 +133,10 @@ func (sr *etcdRegistryImpl) UnregisterService() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if _, err := sr.cli.Delete(ctx, sr.nodeKey); err != nil {
log.ErrorF("unregister:%v failed:%v from etcd", sr.nodeKey, err)
if _, err := sr.cli.Delete(ctx, sr.meNodeKey); err != nil {
log.ErrorF("unregister:%v failed:%v from etcd", sr.meNodeKey, err)
} else {
log.DebugF("unregister:%v from etcd", sr.nodeKey)
log.DebugF("unregister:%v from etcd", sr.meNodeKey)
}
}
@ -148,9 +154,10 @@ func (sr *etcdRegistryImpl) discoverServices() error {
newNodes := &sync.Map{}
for _, kv := range resp.Kvs {
sr.saveNodeFunc(newNodes, kv.Value)
sr.nodeOperator.saveNode(newNodes, kv.Value)
//log.Debug(fmt.Sprintf("save node key:%s value:%s", string(kv.Key), string(kv.Value)))
}
sr.replaceFunc(newNodes)
sr.nodeOperator.replace(newNodes)
return nil
}

View File

@ -33,19 +33,11 @@ func TestService(t *testing.T) {
_ = etcdAddress2
log.Open("test.log", log.DebugL)
// 创建注册中心
registry, err := NewRegistry[ServiceNode]([]string{etcdAddress2}, "", "")
if err != nil {
log.Fatal(err.Error())
return
}
// 注册示例服务
service := &ServiceNode{
registry, err := NewRegistry[ServiceNode]([]string{etcdAddress2}, "", "", ServiceNode{
Name: "instance-1",
Type: "user-service",
}
if err := registry.Register(service); err != nil {
})
if err != nil {
log.Fatal(err.Error())
return
}
@ -53,7 +45,7 @@ func TestService(t *testing.T) {
// 监控服务变化
registry.WatchServices()
time.Sleep(10 * time.Second)
time.Sleep(60 * time.Second)
registry.UnregisterService()
}
@ -62,22 +54,15 @@ func TestTopicRegistry(t *testing.T) {
_ = etcdAddress2
log.Open("test.log", log.DebugL)
// 创建注册中心
registry, err := NewRegistry[TopicNode]([]string{etcdAddress2}, "", "")
registry, err := NewRegistry[TopicNode]([]string{etcdAddress2}, "", "", TopicNode{
Name: "instance-1",
Creator: "instance-1",
})
if err != nil {
log.Fatal(err.Error())
return
}
// 注册示例服务
node := &TopicNode{
Name: "instance-1",
Creator: "instance-1",
}
if err := registry.Register(node); err != nil {
log.Fatal(err.Error())
}
// 监控服务变化
registry.WatchServices()
registry.GetNodes().Range(func(k, v interface{}) bool {

10
etcd/interface.go Normal file
View File

@ -0,0 +1,10 @@
package etcd
import "sync"
type iNodeOperator interface {
// 保存节点
saveNode(*sync.Map, []byte)
// 替换所有节点数据
replace(*sync.Map)
}