package userBindService import ( "context" "errors" "fmt" "game/common/proto/pb" "game/common/utils" "github.com/fox/fox/etcd" "github.com/fox/fox/log" "github.com/fox/fox/xrand" "github.com/go-redis/redis/v8" ) const ( prefix = "user_bind_service" ) /* 采用服务器与客户端分担路由到对应服务节点的机制。 比如现有两个麻将房(game1,game2),当客户端有指定路由节点game1,服务器直接将消息路由到game1节点。 客户端没有指定路由节点,则服务器从redis查找曾经的绑定节点,并验证有效然后转发到对应的节点。 如果redis信息已经失效(服务有更新),则从etcd中获取该玩法下所有最新版本的节点(game1,game2),然后随机发送到其中一个节点,并在redis中保存绑定关系。 如果客户端所有消息都不指定具体的节点名,则每次都需要从redis拉取绑定关系,会影响路由速度。 服务分为3种类型: 1.有状态服,如各玩法服,需要保存玩家在哪个状态服里。 --1.1 这一类服务需要使用UserBindService.HashServiceNode来获取节点 2.无状态服,如大厅服,登陆服,聊天服,这些服务不持有任何状态,包括玩家数据。无需保存玩家在哪个服务。直接通过group.topic随机路由即可。 --2.1 这一类服务无需走UserBindService,直接通过group.topic路由即可。 3.有序hash服,如db服,需要根据玩家id hash到固定的db服以保证操作的一致性。 --3.1 这一类服务需要使用UserBindService.HashServiceNode来获取节点 */ type UserBindService struct { rdb *redis.Client etcdRegistry *etcd.Registry[etcd.ServiceNode] } func NewUserBindService(rdb *redis.Client, etcdRegistry *etcd.Registry[etcd.ServiceNode]) *UserBindService { return &UserBindService{ rdb: rdb, etcdRegistry: etcdRegistry, } } func (m *UserBindService) makeRedisKey(userId int64) string { return fmt.Sprintf("%s:%d", prefix, userId) } func (m *UserBindService) makeRedisSubKey(typeId pb.ServiceTypeId) string { return typeId.String() } // 从redis中加载玩家曾经访问过的服务节点名 func (m *UserBindService) LoadFromRedis(userId int64, typeId pb.ServiceTypeId) string { key := m.makeRedisKey(userId) subKey := m.makeRedisSubKey(typeId) if sName, err := m.rdb.HGet(context.Background(), key, subKey).Result(); err != nil { if !errors.Is(err, redis.Nil) { log.Error(err.Error()) } return "" } else { return sName } } // 从redis中解除玩家与节点的绑定关系 func (m *UserBindService) DelUserService(userId int64, typeId pb.ServiceTypeId) { key := m.makeRedisKey(userId) subKey := m.makeRedisSubKey(typeId) _, _ = m.rdb.HDel(context.Background(), key, subKey).Result() } // 保存玩家与节点的绑定关系至从redis func (m *UserBindService) SaveUserService(userId int64, typeId pb.ServiceTypeId, serviceName string) { if userId == 0 { log.Error("玩家不能为0") return } key := m.makeRedisKey(userId) subKey := m.makeRedisSubKey(typeId) m.rdb.HSet(context.Background(), key, subKey, serviceName) } // 从etcd中检查节点是否有效,如果有game1(旧服),game2(新服),都算有效,但是旧服会拒绝新玩家进入, // 此时旧服不止要拒绝新玩家,还要删除redis中的绑定关系。方便客户端重新发消息时路由到新的服务。 func (m *UserBindService) findServiceNodeByServiceName(serviceName string) (*etcd.ServiceNode, bool) { var sNode *etcd.ServiceNode valid := false m.etcdRegistry.RangeNode(func(_ string, node *etcd.ServiceNode) bool { if node.Name == serviceName { valid = true sNode = node return false } return true }) return sNode, valid } func (m *UserBindService) stringAllServiceNode() string { var nodes []*etcd.ServiceNode m.etcdRegistry.RangeNode(func(_ string, node *etcd.ServiceNode) bool { nodes = append(nodes, node) return true }) return utils.JsonMarshal(nodes) } // 获取最新的节点 func (m *UserBindService) getLatestServiceNode(typeId pb.ServiceTypeId) []etcd.ServiceNode { var nodes []etcd.ServiceNode var version string // 查找最新版本号 m.etcdRegistry.RangeNode(func(_ string, node *etcd.ServiceNode) bool { if node.TypeId == int(typeId) { if version < node.Version { version = node.Version } } return true }) m.etcdRegistry.RangeNode(func(_ string, node *etcd.ServiceNode) bool { if node.TypeId == int(typeId) { if version == node.Version { nodes = append(nodes, *node) } } return true }) return nodes } // 从etcd中找可用服务节点随机选择一个 func (m *UserBindService) randServiceNode(typeId pb.ServiceTypeId) (*etcd.ServiceNode, error) { var nodes = m.getLatestServiceNode(typeId) if len(nodes) == 0 { return nil, fmt.Errorf("not found service node.type id:%v. all node:%v", typeId, m.stringAllServiceNode()) } n := xrand.RandN(len(nodes)) return &nodes[n], nil } // 从etcd中hash一个服务节点。不需要保存玩家在哪个服务。 // 该服务是类似于db服这种有序操作的hash服 func (m *UserBindService) HashServiceNode(typeId pb.ServiceTypeId, uid int64) (*etcd.ServiceNode, error) { var nodes = m.getLatestServiceNode(typeId) if len(nodes) == 0 { return nil, fmt.Errorf("not found service node.type id:%v. all node:%v", typeId, m.stringAllServiceNode()) } n := uid % int64(len(nodes)) return &nodes[n], nil } // 要查找的服务必须是状态服,无状态服不需要查找指定服务。 // 如果玩家是首次使用该服务,则随机一个最新服务并保存玩家该服务节点。下次查找时返回该节点。 // 如果是老玩家,则从redis中查找他呆的节点返回 func (m *UserBindService) FindServiceNode(typeId pb.ServiceTypeId, userId int64) (*etcd.ServiceNode, error) { if userId > 0 { // 向redis中查询。redis中保留的服务节点不一定是可用的,还需要向etcd中验证 sName := m.LoadFromRedis(userId, typeId) // log.DebugF("user:%v查找到服务节点:%s", userId, sName) if node, ok := m.findServiceNodeByServiceName(sName); ok { return node, nil } } // log.DebugF("user:%v查找到服务节点:%s为无效节点", userId, sName) // redis也没有玩家的服务节点信息,从etcd中找可用服务节点随机选择一个 node, err := m.randServiceNode(typeId) if err != nil { log.ErrorF("etcd中随机一个服务节点时,错误:%v", err) return nil, err } // log.DebugF("etcd中随机一个服务节点:%s", node.Name) if m.CheckServiceNodeStateType(typeId) == etcd.SNST_Stateful { m.SaveUserService(userId, typeId, node.Name) } return node, nil } // 对玩家所在有所有节点做操作 func (m *UserBindService) RangeUserAllServiceNode(userId int64, proc func(node *etcd.ServiceNode) bool) { maps, err := m.rdb.HGetAll(context.Background(), m.makeRedisKey(userId)).Result() if err != nil && err != redis.Nil { log.ErrorF("user:%v get all service error:%v", userId, err.Error()) return } if len(maps) > 0 { m.etcdRegistry.RangeNode(func(_ string, node *etcd.ServiceNode) bool { if _, ok := maps[node.Name]; ok { if !proc(node) { return false } } return true }) } return } // 获取节点是有状态或无状态或有序节点 func (m *UserBindService) CheckServiceNodeStateType(serviceId pb.ServiceTypeId) etcd.ServiceNodeStateType { return etcd.ServiceNodeStateType(serviceId / 10000) }