package etcd import ( "context" "fmt" "github.com/fox/fox/ksync" "github.com/fox/fox/log" "sync" "time" clientv3 "go.etcd.io/etcd/client/v3" ) const ( DefaultDialTimeout = 3 * time.Second // 默认拨号超时时间 DefaultLeaseTTL = int64(10) // 默认租约TTL为60秒 DefaultKeepAliveInterval = 5 * time.Second // 默认续租间隔为30秒 KeepAliveFailCount = 1 ) type etcdRegistryImpl struct { cli *clientv3.Client leaseID clientv3.LeaseID meNodeKey string // 自身节点key meNodeValue string // 自身节点json cancelFunc context.CancelFunc rootKey string nodeOperator iNodeOperator } // 创建服务注册中心 func newServiceRegistryImpl(endpoints []string, rootKey, username, password string, operator iNodeOperator, meKey, meValue string) (*etcdRegistryImpl, error) { cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: DefaultDialTimeout, Username: username, Password: password, }) if err != nil { return nil, err } impl := &etcdRegistryImpl{ cli: cli, rootKey: rootKey, nodeOperator: operator, meNodeKey: meKey, meNodeValue: meValue, } if err = impl.register(); err != nil { return nil, err } impl.watchServices() return impl, nil } // 注册服务 RegisterService func (sr *etcdRegistryImpl) register() error { // 生成唯一服务key // /services/serviceType/serviceName log.DebugF("register key:%s value:%v to etcd", sr.meNodeKey, sr.meNodeValue) // 创建租约 ctx, cancel := context.WithCancel(context.Background()) sr.cancelFunc = cancel // 申请租约 resp, err := sr.cli.Grant(ctx, DefaultLeaseTTL) if err != nil { return err } sr.leaseID = resp.ID // 序列化服务信息 // data, err := json.Marshal(node) // if err != nil { // return err // } // 写入ETCD _, err = sr.cli.Put(ctx, sr.meNodeKey, sr.meNodeValue, clientv3.WithLease(sr.leaseID)) if err != nil { return err } // 启动自动续租 ksync.GoSafe(func() { sr.keepAlive(ctx) }, nil) if err = sr.discoverServices(); err == nil { // ss := sr.GetService() // for _, s := range ss { // log.Debug(fmt.Sprintf("Discovered services: %+v\n", s)) // } } return nil } // 自动续租逻辑 func (sr *etcdRegistryImpl) keepAlive(ctx context.Context) { retryCount := 0 ticker := time.NewTicker(DefaultKeepAliveInterval) defer ticker.Stop() for { select { case <-ticker.C: _, err := sr.cli.KeepAliveOnce(ctx, sr.leaseID) if err != nil { if retryCount > KeepAliveFailCount { log.DebugF("KeepAlive failed after %d retries: %v", KeepAliveFailCount, err) sr.UnregisterService() if err = sr.register(); err != nil { log.ErrorF("Reconnect failed: %v", err) } else { retryCount = 0 } return } retryCount++ // log.DebugF("KeepAlive error (retry %d/%d): %v", retryCount, KeepAliveFailCount, err) } else { retryCount = 0 } case <-ctx.Done(): return } } } // 反注册服务 func (sr *etcdRegistryImpl) UnregisterService() { if sr.cancelFunc != nil { sr.cancelFunc() } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() if _, err := sr.cli.Delete(ctx, sr.meNodeKey); err != nil { log.ErrorF("unregister:%v failed:%v from etcd", sr.meNodeKey, err) } else { log.DebugF("unregister:%v from etcd", sr.meNodeKey) } } // 服务发现 func (sr *etcdRegistryImpl) discoverServices() error { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() srv := fmt.Sprintf("/%s/", sr.rootKey) resp, err := sr.cli.Get(ctx, srv, clientv3.WithPrefix()) if err != nil { return err } //log.Debug(fmt.Sprintf("discoverServices srv:%s", srv)) newNodes := &sync.Map{} for _, kv := range resp.Kvs { sr.nodeOperator.saveNode(newNodes, kv.Value) //log.Debug(fmt.Sprintf("save node key:%s value:%s", string(kv.Key), string(kv.Value))) } sr.nodeOperator.replace(newNodes) return nil } // 监控服务变化 func (sr *etcdRegistryImpl) watchServices() { watchKey := fmt.Sprintf("/%s/", sr.rootKey) ksync.GoSafe(func() { rch := sr.cli.Watch(context.Background(), watchKey, clientv3.WithPrefix()) for resp := range rch { for range resp.Events { // 当有变化时获取最新服务列表 if err := sr.discoverServices(); err != nil { log.Error(err.Error()) } } } }, nil) }