diff --git a/etcd/etcd.go b/etcd/etcd.go index b5bf9cf..3c21d71 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -7,38 +7,31 @@ import ( "sync" ) -type resultT[T any] struct { - Value T -} - type Registry[T INode] struct { *etcdRegistryImpl nodes *sync.Map + me T } -func NewRegistry[T INode](endpoints []string, username, password string) (*Registry[T], error) { - var err error +func NewRegistry[T INode](endpoints []string, username, password string, me T) (*Registry[T], error) { + bs, err := json.Marshal(me) + if err != nil { + return nil, err + } + e := &Registry[T]{} - e.etcdRegistryImpl, err = newServiceRegistryImpl(endpoints, resultT[T]{}.Value.EtcdRootKey(), username, password, e.saveNode, e.replace) + e.etcdRegistryImpl, err = newServiceRegistryImpl(endpoints, me.EtcdRootKey(), username, password, e, me.EtcdKey(), string(bs)) e.nodes = &sync.Map{} return e, err } -func (r *Registry[T]) Register(node INode) error { - bs, err := json.Marshal(node) - if err != nil { - return err - } - return r.etcdRegistryImpl.Register(node.EtcdKey(), string(bs)) -} - // 保存当前服务 func (r *Registry[T]) saveNode(newNodes *sync.Map, jsonBytes []byte) { - var tmp = resultT[T]{} - if err := json.Unmarshal(jsonBytes, &tmp.Value); err != nil { + var tmp = new(T) + if err := json.Unmarshal(jsonBytes, tmp); err != nil { log.ErrorF(err.Error()) } - newNodes.Store(tmp.Value.MapKey(), tmp.Value) + newNodes.Store((*tmp).MapKey(), tmp) } // 保存当前服务 @@ -52,14 +45,14 @@ func (r *Registry[T]) GetNodes() *sync.Map { } // 根据inode的mapKey()查找对应的节点 -func (r *Registry[T]) FindNode(key string) (T, error) { - var tmp = resultT[T]{} +func (r *Registry[T]) FindNode(key string) (*T, error) { + var tmp = new(T) v, ok := r.nodes.Load(key) if !ok { - return tmp.Value, fmt.Errorf("%v not exist", key) + return tmp, fmt.Errorf("%v not exist", key) } - if tmp.Value, ok = v.(T); ok { - return tmp.Value, nil + if tmp, ok = v.(*T); ok { + return tmp, nil } - return tmp.Value, fmt.Errorf("%v 类型转换失败", key) + return nil, fmt.Errorf("%v 类型转换失败", key) } diff --git a/etcd/etcdImpl.go b/etcd/etcdImpl.go index 3e65022..68aaf5d 100644 --- a/etcd/etcdImpl.go +++ b/etcd/etcdImpl.go @@ -15,21 +15,21 @@ const ( DefaultDialTimeout = 3 * time.Second // 默认拨号超时时间 DefaultLeaseTTL = int64(10) // 默认租约TTL为60秒 DefaultKeepAliveInterval = 5 * time.Second // 默认续租间隔为30秒 - KeepAliveFailCount = 100 + KeepAliveFailCount = 1 ) type etcdRegistryImpl struct { cli *clientv3.Client leaseID clientv3.LeaseID - nodeKey string + meNodeKey string // 自身节点key + meNodeValue string // 自身节点json cancelFunc context.CancelFunc rootKey string - saveNodeFunc func(*sync.Map, []byte) - replaceFunc func(*sync.Map) + nodeOperator iNodeOperator } // 创建服务注册中心 -func newServiceRegistryImpl(endpoints []string, rootKey, username, password string, saveNode func(*sync.Map, []byte), replace func(*sync.Map)) (*etcdRegistryImpl, error) { +func newServiceRegistryImpl(endpoints []string, rootKey, username, password string, operator iNodeOperator, meKey, meValue string) (*etcdRegistryImpl, error) { cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: DefaultDialTimeout, @@ -40,20 +40,21 @@ func newServiceRegistryImpl(endpoints []string, rootKey, username, password stri return nil, err } - return &etcdRegistryImpl{ + impl := &etcdRegistryImpl{ cli: cli, rootKey: rootKey, - saveNodeFunc: saveNode, - replaceFunc: replace, - }, nil + nodeOperator: operator, + meNodeKey: meKey, + meNodeValue: meValue, + } + return impl, impl.register() } // 注册服务 RegisterService -func (sr *etcdRegistryImpl) Register(key, value string) error { +func (sr *etcdRegistryImpl) register() error { // 生成唯一服务key // /services/serviceType/serviceName - sr.nodeKey = key - log.DebugF("register %s to etcd", key) + log.DebugF("register key:%s value:%v to etcd", sr.meNodeKey, sr.meNodeValue) // 创建租约 ctx, cancel := context.WithCancel(context.Background()) @@ -73,13 +74,13 @@ func (sr *etcdRegistryImpl) Register(key, value string) error { // } // 写入ETCD - _, err = sr.cli.Put(ctx, key, value, clientv3.WithLease(sr.leaseID)) + _, err = sr.cli.Put(ctx, sr.meNodeKey, sr.meNodeValue, clientv3.WithLease(sr.leaseID)) if err != nil { return err } // 启动自动续租 - go sr.keepAlive(ctx) + ksync.GoSafe(func() { sr.keepAlive(ctx) }, nil) if err = sr.discoverServices(); err == nil { // ss := sr.GetService() @@ -105,6 +106,11 @@ func (sr *etcdRegistryImpl) keepAlive(ctx context.Context) { if retryCount > KeepAliveFailCount { log.DebugF("KeepAlive failed after %d retries: %v", KeepAliveFailCount, err) sr.UnregisterService() + if err = sr.register(); err != nil { + log.ErrorF("Reconnect failed: %v", err) + } else { + retryCount = 0 + } return } retryCount++ @@ -127,10 +133,10 @@ func (sr *etcdRegistryImpl) UnregisterService() { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - if _, err := sr.cli.Delete(ctx, sr.nodeKey); err != nil { - log.ErrorF("unregister:%v failed:%v from etcd", sr.nodeKey, err) + if _, err := sr.cli.Delete(ctx, sr.meNodeKey); err != nil { + log.ErrorF("unregister:%v failed:%v from etcd", sr.meNodeKey, err) } else { - log.DebugF("unregister:%v from etcd", sr.nodeKey) + log.DebugF("unregister:%v from etcd", sr.meNodeKey) } } @@ -144,13 +150,14 @@ func (sr *etcdRegistryImpl) discoverServices() error { if err != nil { return err } - // log.Debug(fmt.Sprintf("discoverServices srv:%s", srv)) + //log.Debug(fmt.Sprintf("discoverServices srv:%s", srv)) newNodes := &sync.Map{} for _, kv := range resp.Kvs { - sr.saveNodeFunc(newNodes, kv.Value) + sr.nodeOperator.saveNode(newNodes, kv.Value) + //log.Debug(fmt.Sprintf("save node key:%s value:%s", string(kv.Key), string(kv.Value))) } - sr.replaceFunc(newNodes) + sr.nodeOperator.replace(newNodes) return nil } diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index 794cd1a..7127665 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -33,19 +33,11 @@ func TestService(t *testing.T) { _ = etcdAddress2 log.Open("test.log", log.DebugL) // 创建注册中心 - registry, err := NewRegistry[ServiceNode]([]string{etcdAddress2}, "", "") - if err != nil { - log.Fatal(err.Error()) - return - } - - // 注册示例服务 - service := &ServiceNode{ + registry, err := NewRegistry[ServiceNode]([]string{etcdAddress2}, "", "", ServiceNode{ Name: "instance-1", Type: "user-service", - } - - if err := registry.Register(service); err != nil { + }) + if err != nil { log.Fatal(err.Error()) return } @@ -53,7 +45,7 @@ func TestService(t *testing.T) { // 监控服务变化 registry.WatchServices() - time.Sleep(10 * time.Second) + time.Sleep(60 * time.Second) registry.UnregisterService() } @@ -62,22 +54,15 @@ func TestTopicRegistry(t *testing.T) { _ = etcdAddress2 log.Open("test.log", log.DebugL) // 创建注册中心 - registry, err := NewRegistry[TopicNode]([]string{etcdAddress2}, "", "") + registry, err := NewRegistry[TopicNode]([]string{etcdAddress2}, "", "", TopicNode{ + Name: "instance-1", + Creator: "instance-1", + }) if err != nil { log.Fatal(err.Error()) return } - // 注册示例服务 - node := &TopicNode{ - Name: "instance-1", - Creator: "instance-1", - } - - if err := registry.Register(node); err != nil { - log.Fatal(err.Error()) - } - // 监控服务变化 registry.WatchServices() registry.GetNodes().Range(func(k, v interface{}) bool { diff --git a/etcd/interface.go b/etcd/interface.go new file mode 100644 index 0000000..8d13e0b --- /dev/null +++ b/etcd/interface.go @@ -0,0 +1,10 @@ +package etcd + +import "sync" + +type iNodeOperator interface { + // 保存节点 + saveNode(*sync.Map, []byte) + // 替换所有节点数据 + replace(*sync.Map) +}