重写路由规则

This commit is contained in:
liuxiaobo 2025-05-27 19:01:21 +08:00
parent e8e1758d72
commit fb7d528825
5 changed files with 160 additions and 106 deletions

View File

@ -7,9 +7,10 @@ import "service.proto";
message ClientMsg
{
ServiceTypeId service_id = 1; // id
int32 msg_id = 2; // id
bytes data = 3; //
ServiceTypeId service_tid = 1; // id
string sub_service_name = 2; // ()
int32 msg_id = 3; // id
bytes data = 4; //
}

View File

@ -22,12 +22,13 @@ const (
)
type ClientMsg struct {
state protoimpl.MessageState `protogen:"open.v1"`
ServiceId ServiceTypeId `protobuf:"varint,1,opt,name=service_id,json=serviceId,proto3,enum=pb.ServiceTypeId" json:"service_id,omitempty"` // 服务id
MsgId int32 `protobuf:"varint,2,opt,name=msg_id,json=msgId,proto3" json:"msg_id,omitempty"` // 消息id
Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` // 消息体
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
ServiceTid ServiceTypeId `protobuf:"varint,1,opt,name=service_tid,json=serviceTid,proto3,enum=pb.ServiceTypeId" json:"service_tid,omitempty"` // 服务id
SubServiceName string `protobuf:"bytes,2,opt,name=sub_service_name,json=subServiceName,proto3" json:"sub_service_name,omitempty"` // 具体的服务节点名(客户端进入新的场景,保存该节点名,提高路由速度)
MsgId int32 `protobuf:"varint,3,opt,name=msg_id,json=msgId,proto3" json:"msg_id,omitempty"` // 消息id
Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"` // 消息体
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ClientMsg) Reset() {
@ -60,13 +61,20 @@ func (*ClientMsg) Descriptor() ([]byte, []int) {
return file_client_proto_rawDescGZIP(), []int{0}
}
func (x *ClientMsg) GetServiceId() ServiceTypeId {
func (x *ClientMsg) GetServiceTid() ServiceTypeId {
if x != nil {
return x.ServiceId
return x.ServiceTid
}
return ServiceTypeId_STI_Unknown
}
func (x *ClientMsg) GetSubServiceName() string {
if x != nil {
return x.SubServiceName
}
return ""
}
func (x *ClientMsg) GetMsgId() int32 {
if x != nil {
return x.MsgId
@ -85,12 +93,13 @@ var File_client_proto protoreflect.FileDescriptor
const file_client_proto_rawDesc = "" +
"\n" +
"\fclient.proto\x12\x02pb\x1a\rservice.proto\"h\n" +
"\tClientMsg\x120\n" +
"\n" +
"service_id\x18\x01 \x01(\x0e2\x11.pb.ServiceTypeIdR\tserviceId\x12\x15\n" +
"\x06msg_id\x18\x02 \x01(\x05R\x05msgId\x12\x12\n" +
"\x04data\x18\x03 \x01(\fR\x04dataB\x11Z\x0fcommon/proto/pbb\x06proto3"
"\fclient.proto\x12\x02pb\x1a\rservice.proto\"\x94\x01\n" +
"\tClientMsg\x122\n" +
"\vservice_tid\x18\x01 \x01(\x0e2\x11.pb.ServiceTypeIdR\n" +
"serviceTid\x12(\n" +
"\x10sub_service_name\x18\x02 \x01(\tR\x0esubServiceName\x12\x15\n" +
"\x06msg_id\x18\x03 \x01(\x05R\x05msgId\x12\x12\n" +
"\x04data\x18\x04 \x01(\fR\x04dataB\x11Z\x0fcommon/proto/pbb\x06proto3"
var (
file_client_proto_rawDescOnce sync.Once
@ -110,7 +119,7 @@ var file_client_proto_goTypes = []any{
(ServiceTypeId)(0), // 1: pb.ServiceTypeId
}
var file_client_proto_depIdxs = []int32{
1, // 0: pb.ClientMsg.service_id:type_name -> pb.ServiceTypeId
1, // 0: pb.ClientMsg.service_tid:type_name -> pb.ServiceTypeId
1, // [1:1] is the sub-list for method output_type
1, // [1:1] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name

View File

@ -0,0 +1,114 @@
package userBindService
import (
"context"
"fmt"
"game/common/proto/pb"
"github.com/fox/fox/etcd"
"github.com/fox/fox/log"
"github.com/fox/fox/xrand"
"github.com/go-redis/redis/v8"
"time"
)
const (
prefix = "user_bind_service"
)
/*
采用服务器与客户端分担路由到对应服务节点的机制
比如现有两个麻将房(game1,game2)当客户端有指定路由节点game1服务器直接将消息路由到game1节点
客户端没有指定路由节点则服务器从redis查找曾经的绑定节点并验证有效然后转发到对应的节点
如果redis信息已经失效(服务有更新)则从etcd中获取该玩法下所有最新版本的节点(game1,game2)然后随机发送到其中一个节点并在redis中保存绑定关系
如果客户端所有消息都不指定具体的节点名则每次都需要从redis拉取绑定关系会影响路由速度
*/
type UserBindService struct {
rdb *redis.Client
etcdRegistry *etcd.Registry[etcd.ServiceNode]
}
func NewUserBindService(rdb *redis.Client, etcdRegistry *etcd.Registry[etcd.ServiceNode]) *UserBindService {
return &UserBindService{
rdb: rdb,
etcdRegistry: etcdRegistry,
}
}
func (m *UserBindService) makeRedisKey(userId int64, typeId pb.ServiceTypeId) string {
return fmt.Sprintf("%s_%d:%d", prefix, userId, int(typeId))
}
// 从redis中加载玩家曾经访问过的服务节点
func (m *UserBindService) loadFromRedis(userId int64, typeId pb.ServiceTypeId) string {
k := m.makeRedisKey(userId, typeId)
if sName, err := m.rdb.Get(context.Background(), k).Result(); err != nil {
log.Error(err.Error())
return ""
} else {
return sName
}
}
// 从redis中解除玩家与节点的绑定关系
func (m *UserBindService) DelUserService(userId int64, typeId pb.ServiceTypeId) {
k := m.makeRedisKey(userId, typeId)
_, _ = m.rdb.Del(context.Background(), k).Result()
}
// 从etcd中检查节点是否有效如果有game1(旧服),game2(新服),都算有效,但是旧服会拒绝新玩家进入,
// 此时旧服不止要拒绝新玩家还要删除redis中的绑定关系。方便客户端重新发消息时路由到新的服务。
func (m *UserBindService) serviceIsValid(serviceName string) bool {
valid := false
m.etcdRegistry.GetNodes().Range(func(k, v interface{}) bool {
if node, ok := v.(etcd.ServiceNode); ok {
if node.Name == serviceName {
valid = true
return false
}
}
return true
})
return valid
}
// 从etcd中找可用服务节点随机选择一个
func (m *UserBindService) RandServiceNode(typeId pb.ServiceTypeId) (*etcd.ServiceNode, error) {
var nodes []etcd.ServiceNode
var version string
m.etcdRegistry.GetNodes().Range(func(_, value any) bool {
if node, ok := value.(etcd.ServiceNode); ok && node.TypeId == int(typeId) {
if version < node.Version {
version = node.Version
}
}
return true
})
m.etcdRegistry.GetNodes().Range(func(_, value any) bool {
if node, ok := value.(etcd.ServiceNode); ok && node.TypeId == int(typeId) {
if version == node.Version {
nodes = append(nodes, node)
}
}
return true
})
if len(nodes) == 0 {
return nil, fmt.Errorf("not found service node.type id: %v", typeId)
}
n := xrand.IntN(len(nodes))
return &nodes[n], nil
}
// 根据服务类型,路由到对应的服务节点
func (m *UserBindService) FindServiceName(userId int64, typeId pb.ServiceTypeId) (string, error) {
// 内存中没有向redis中查询。redis中保留的服务节点不一定是可用的还需要向etcd中验证
if sName := m.loadFromRedis(userId, typeId); sName != "" && m.serviceIsValid(sName) {
return sName, nil
}
// redis也没有玩家的服务节点信息从etcd中找可用服务节点随机选择一个
node, err := m.RandServiceNode(typeId)
if err != nil {
return "", err
}
m.rdb.Set(context.Background(), m.makeRedisKey(userId, typeId), node.Name, 2*24*time.Hour)
return node.Name, nil
}

View File

@ -6,6 +6,7 @@ import (
"game/common/proto/pb"
"game/common/serviceName"
"game/common/topicName"
"game/common/userBindService"
"game/common/utils"
"game/server/gate/config"
"game/server/gate/model"
@ -14,7 +15,6 @@ import (
"github.com/fox/fox/processor"
"github.com/fox/fox/service"
"github.com/fox/fox/ws"
"github.com/fox/fox/xrand"
"github.com/golang/protobuf/proto"
)
@ -25,6 +25,7 @@ type GateService struct {
etcdService *etcd.Registry[etcd.ServiceNode]
wss *ws.WsServer
processor *processor.Processor
bindService *userBindService.UserBindService
}
func Init() {
@ -77,6 +78,8 @@ func newGateService(serviceId int) *GateService {
return nil
}
s.bindService = userBindService.NewUserBindService(model.UserRedis, s.etcdService)
s.processor = processor.NewProcessor()
s.initProcessor()
s.OnInit()
@ -125,42 +128,17 @@ func (s *GateService) OnMessage(data []byte) error {
return nil
}
func (s *GateService) findService(serviceTypeId pb.ServiceTypeId) *etcd.ServiceNode {
var nodes []*etcd.ServiceNode
var newVer string
s.etcdService.GetNodes().Range(func(_, value interface{}) bool {
if node, ok := value.(*etcd.ServiceNode); ok && node.TypeId == int(serviceTypeId) {
if newVer < node.Version {
newVer = node.Version
}
nodes = append(nodes, node)
}
return true
})
var newNodes []*etcd.ServiceNode
for _, node := range nodes {
if node.Version == newVer {
newNodes = append(newNodes, node)
}
}
if len(newNodes) == 0 {
return nil
}
return nodes[xrand.IntN(len(nodes))]
}
/*
查找topic,根据serviceTypeId以及玩家id查找玩家过往访问该服务的节点优先使用原节点
*/
func (s *GateService) findTopic(userId int64, serviceTypeId pb.ServiceTypeId) string {
if userId != 0 {
if sName, ok := userServiceMgr.FindServiceName(userId, serviceTypeId); ok {
if sName, err := s.bindService.FindServiceName(userId, serviceTypeId); err == nil {
return service.TopicEx(sName)
} else {
log.Error(err.Error())
}
}
if sNode := s.findService(serviceTypeId); sNode != nil {
return service.TopicEx(sNode.Name)
}
return ""
}
@ -171,10 +149,16 @@ func (s *GateService) WsOnMessage(conn ws.IConn, data []byte) {
log.Error(err.Error())
return
}
if topic := s.findTopic(conn.UserId(), msg.ServiceId); topic != "" {
iMsg := &ipb.InternalMsg{ConnId: conn.Id(), UserId: conn.UserId(), MsgId: msg.MsgId, Msg: msg.Data}
dMsg, _ := proto.Marshal(iMsg)
_ = s.Send(topic, dMsg)
var topic string
if msg.SubServiceName != "" {
topic = service.TopicEx(msg.SubServiceName)
} else {
topic = s.findTopic(conn.UserId(), msg.ServiceTid)
}
if topic != "" {
s.SendServiceData(topic, conn, msg.MsgId, msg.Data)
} else {
log.Error(s.Log("topic:%v not exist.user:%v", topic, conn.UserId()))
}
log.Debug(s.Log("client to gate:%v", utils.Marshal(msg)))
}
@ -211,7 +195,7 @@ func (s *GateService) SendClientMsg(conn ws.IConn, msgId int32, msg proto.Messag
func (s *GateService) WsOnDisconnect(conn ws.IConn) {
if conn.UserId() > 0 {
userServiceMgr.CleanUser(conn.UserId())
s.bindService.DelUserService(conn.UserId(), pb.ServiceTypeId_STI_Gate)
ug, err := model.NewUserGate().Get(conn.UserId(), s.Name())
if err != nil {
log.Error(err.Error())

View File

@ -1,54 +0,0 @@
package server
import (
"fmt"
"game/common/proto/pb"
"strings"
"sync"
)
var userServiceMgr userServiceManager
/*
记录玩家访问过的节点信息玩家下线后清除相关信息比如玩家在某个玩法服里掉线重连后要回到该房间应由对应的服务节点主动将玩家拉回房间
*/
type userServiceManager struct {
inService sync.Map // key:userId+service_type_id value:service_name
}
func (m *userServiceManager) makeKey(userId int64, typeId pb.ServiceTypeId) string {
return fmt.Sprintf("%s_%d", userId, typeId)
}
func (m *userServiceManager) Add(userId int64, typeId pb.ServiceTypeId, serviceName string) {
k := m.makeKey(userId, typeId)
m.inService.Store(k, serviceName)
}
// 玩家下线,清除他呆过的所有服务节点信息
func (m *userServiceManager) CleanUser(userId int64) {
var del []string
m.inService.Range(func(k, v interface{}) bool {
userServiceType := k.(string)
if strings.Contains(userServiceType, fmt.Sprintf("%d", userId)) {
del = append(del, userServiceType)
}
return true
})
for _, k := range del {
m.inService.Delete(k)
}
}
// todo:要考虑到旧服务可能已关闭这里要访问etcd中是否有该服务节点最好采用订阅机制让etcd变动时清除对应服务节点
func (m *userServiceManager) FindServiceName(userId int64, typeId pb.ServiceTypeId) (serviceName string, ok bool) {
var v any
if v, ok = m.inService.Load(m.makeKey(userId, typeId)); ok {
serviceName = v.(string)
}
return
}
func (m *userServiceManager) Del(userId int64, typeId pb.ServiceTypeId) {
m.inService.Delete(m.makeKey(userId, typeId))
}