173 lines
3.9 KiB
Go
173 lines
3.9 KiB
Go
package etcd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/fox/fox/ksync"
|
|
"github.com/fox/fox/log"
|
|
"sync"
|
|
"time"
|
|
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
)
|
|
|
|
const (
|
|
DefaultDialTimeout = 3 * time.Second // 默认拨号超时时间
|
|
DefaultLeaseTTL = int64(10) // 默认租约TTL为60秒
|
|
DefaultKeepAliveInterval = 5 * time.Second // 默认续租间隔为30秒
|
|
KeepAliveFailCount = 100
|
|
)
|
|
|
|
type etcdRegistryImpl struct {
|
|
cli *clientv3.Client
|
|
leaseID clientv3.LeaseID
|
|
nodeKey string
|
|
cancelFunc context.CancelFunc
|
|
rootKey string
|
|
saveNodeFunc func(*sync.Map, []byte)
|
|
replaceFunc func(*sync.Map)
|
|
}
|
|
|
|
// 创建服务注册中心
|
|
func newServiceRegistryImpl(endpoints []string, rootKey, username, password string, saveNode func(*sync.Map, []byte), replace func(*sync.Map)) (*etcdRegistryImpl, error) {
|
|
cli, err := clientv3.New(clientv3.Config{
|
|
Endpoints: endpoints,
|
|
DialTimeout: DefaultDialTimeout,
|
|
Username: username,
|
|
Password: password,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &etcdRegistryImpl{
|
|
cli: cli,
|
|
rootKey: rootKey,
|
|
saveNodeFunc: saveNode,
|
|
replaceFunc: replace,
|
|
}, nil
|
|
}
|
|
|
|
// 注册服务 RegisterService
|
|
func (sr *etcdRegistryImpl) Register(key, value string) error {
|
|
// 生成唯一服务key
|
|
// /services/serviceType/serviceName
|
|
sr.nodeKey = key
|
|
log.DebugF("register %s to etcd", key)
|
|
|
|
// 创建租约
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sr.cancelFunc = cancel
|
|
|
|
// 申请租约
|
|
resp, err := sr.cli.Grant(ctx, DefaultLeaseTTL)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sr.leaseID = resp.ID
|
|
|
|
// 序列化服务信息
|
|
// data, err := json.Marshal(node)
|
|
// if err != nil {
|
|
// return err
|
|
// }
|
|
|
|
// 写入ETCD
|
|
_, err = sr.cli.Put(ctx, key, value, clientv3.WithLease(sr.leaseID))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 启动自动续租
|
|
go sr.keepAlive(ctx)
|
|
|
|
if err = sr.discoverServices(); err == nil {
|
|
// ss := sr.GetService()
|
|
// for _, s := range ss {
|
|
// log.Debug(fmt.Sprintf("Discovered services: %+v\n", s))
|
|
// }
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// 自动续租逻辑
|
|
func (sr *etcdRegistryImpl) keepAlive(ctx context.Context) {
|
|
retryCount := 0
|
|
ticker := time.NewTicker(DefaultKeepAliveInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
_, err := sr.cli.KeepAliveOnce(ctx, sr.leaseID)
|
|
if err != nil {
|
|
if retryCount > KeepAliveFailCount {
|
|
log.DebugF("KeepAlive failed after %d retries: %v", KeepAliveFailCount, err)
|
|
sr.UnregisterService()
|
|
return
|
|
}
|
|
retryCount++
|
|
// log.DebugF("KeepAlive error (retry %d/%d): %v", retryCount, KeepAliveFailCount, err)
|
|
} else {
|
|
retryCount = 0
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// 反注册服务
|
|
func (sr *etcdRegistryImpl) UnregisterService() {
|
|
if sr.cancelFunc != nil {
|
|
sr.cancelFunc()
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
|
|
if _, err := sr.cli.Delete(ctx, sr.nodeKey); err != nil {
|
|
log.ErrorF("unregister:%v failed:%v from etcd", sr.nodeKey, err)
|
|
} else {
|
|
log.DebugF("unregister:%v from etcd", sr.nodeKey)
|
|
}
|
|
}
|
|
|
|
// 服务发现
|
|
func (sr *etcdRegistryImpl) discoverServices() error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
|
|
srv := fmt.Sprintf("/%s/", sr.rootKey)
|
|
resp, err := sr.cli.Get(ctx, srv, clientv3.WithPrefix())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// log.Debug(fmt.Sprintf("discoverServices srv:%s", srv))
|
|
|
|
newNodes := &sync.Map{}
|
|
for _, kv := range resp.Kvs {
|
|
sr.saveNodeFunc(newNodes, kv.Value)
|
|
}
|
|
sr.replaceFunc(newNodes)
|
|
|
|
return nil
|
|
}
|
|
|
|
// 监控服务变化
|
|
func (sr *etcdRegistryImpl) WatchServices() {
|
|
watchKey := fmt.Sprintf("/%s/", sr.rootKey)
|
|
ksync.GoSafe(func() {
|
|
rch := sr.cli.Watch(context.Background(), watchKey, clientv3.WithPrefix())
|
|
for resp := range rch {
|
|
for range resp.Events {
|
|
// 当有变化时获取最新服务列表
|
|
if err := sr.discoverServices(); err != nil {
|
|
log.Error(err.Error())
|
|
}
|
|
}
|
|
}
|
|
}, nil)
|
|
}
|