diff --git a/common/pb/client.proto b/common/pb/client.proto index 93ac7ba..6c54b72 100644 --- a/common/pb/client.proto +++ b/common/pb/client.proto @@ -7,9 +7,10 @@ import "service.proto"; message ClientMsg { - ServiceTypeId service_id = 1; // 服务id - int32 msg_id = 2; // 消息id - bytes data = 3; // 消息体 + ServiceTypeId service_tid = 1; // 服务id + string sub_service_name = 2; // 具体的服务节点名(客户端进入新的场景,保存该节点名,提高路由速度) + int32 msg_id = 3; // 消息id + bytes data = 4; // 消息体 } diff --git a/common/proto/pb/client.pb.go b/common/proto/pb/client.pb.go index 7263c76..36a0f5c 100644 --- a/common/proto/pb/client.pb.go +++ b/common/proto/pb/client.pb.go @@ -22,12 +22,13 @@ const ( ) type ClientMsg struct { - state protoimpl.MessageState `protogen:"open.v1"` - ServiceId ServiceTypeId `protobuf:"varint,1,opt,name=service_id,json=serviceId,proto3,enum=pb.ServiceTypeId" json:"service_id,omitempty"` // 服务id - MsgId int32 `protobuf:"varint,2,opt,name=msg_id,json=msgId,proto3" json:"msg_id,omitempty"` // 消息id - Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` // 消息体 - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + ServiceTid ServiceTypeId `protobuf:"varint,1,opt,name=service_tid,json=serviceTid,proto3,enum=pb.ServiceTypeId" json:"service_tid,omitempty"` // 服务id + SubServiceName string `protobuf:"bytes,2,opt,name=sub_service_name,json=subServiceName,proto3" json:"sub_service_name,omitempty"` // 具体的服务节点名(客户端进入新的场景,保存该节点名,提高路由速度) + MsgId int32 `protobuf:"varint,3,opt,name=msg_id,json=msgId,proto3" json:"msg_id,omitempty"` // 消息id + Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"` // 消息体 + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *ClientMsg) Reset() { @@ -60,13 +61,20 @@ func (*ClientMsg) Descriptor() ([]byte, []int) { return file_client_proto_rawDescGZIP(), []int{0} } -func (x *ClientMsg) GetServiceId() ServiceTypeId { +func (x *ClientMsg) GetServiceTid() ServiceTypeId { if x != nil { - return x.ServiceId + return x.ServiceTid } return ServiceTypeId_STI_Unknown } +func (x *ClientMsg) GetSubServiceName() string { + if x != nil { + return x.SubServiceName + } + return "" +} + func (x *ClientMsg) GetMsgId() int32 { if x != nil { return x.MsgId @@ -85,12 +93,13 @@ var File_client_proto protoreflect.FileDescriptor const file_client_proto_rawDesc = "" + "\n" + - "\fclient.proto\x12\x02pb\x1a\rservice.proto\"h\n" + - "\tClientMsg\x120\n" + - "\n" + - "service_id\x18\x01 \x01(\x0e2\x11.pb.ServiceTypeIdR\tserviceId\x12\x15\n" + - "\x06msg_id\x18\x02 \x01(\x05R\x05msgId\x12\x12\n" + - "\x04data\x18\x03 \x01(\fR\x04dataB\x11Z\x0fcommon/proto/pbb\x06proto3" + "\fclient.proto\x12\x02pb\x1a\rservice.proto\"\x94\x01\n" + + "\tClientMsg\x122\n" + + "\vservice_tid\x18\x01 \x01(\x0e2\x11.pb.ServiceTypeIdR\n" + + "serviceTid\x12(\n" + + "\x10sub_service_name\x18\x02 \x01(\tR\x0esubServiceName\x12\x15\n" + + "\x06msg_id\x18\x03 \x01(\x05R\x05msgId\x12\x12\n" + + "\x04data\x18\x04 \x01(\fR\x04dataB\x11Z\x0fcommon/proto/pbb\x06proto3" var ( file_client_proto_rawDescOnce sync.Once @@ -110,7 +119,7 @@ var file_client_proto_goTypes = []any{ (ServiceTypeId)(0), // 1: pb.ServiceTypeId } var file_client_proto_depIdxs = []int32{ - 1, // 0: pb.ClientMsg.service_id:type_name -> pb.ServiceTypeId + 1, // 0: pb.ClientMsg.service_tid:type_name -> pb.ServiceTypeId 1, // [1:1] is the sub-list for method output_type 1, // [1:1] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name diff --git a/common/userBindService/userService.go b/common/userBindService/userService.go new file mode 100644 index 0000000..7a1c597 --- /dev/null +++ b/common/userBindService/userService.go @@ -0,0 +1,114 @@ +package userBindService + +import ( + "context" + "fmt" + "game/common/proto/pb" + "github.com/fox/fox/etcd" + "github.com/fox/fox/log" + "github.com/fox/fox/xrand" + "github.com/go-redis/redis/v8" + "time" +) + +const ( + prefix = "user_bind_service" +) + +/* +采用服务器与客户端分担路由到对应服务节点的机制。 +比如现有两个麻将房(game1,game2),当客户端有指定路由节点game1,服务器直接将消息路由到game1节点。 +客户端没有指定路由节点,则服务器从redis查找曾经的绑定节点,并验证有效然后转发到对应的节点。 +如果redis信息已经失效(服务有更新),则从etcd中获取该玩法下所有最新版本的节点(game1,game2),然后随机发送到其中一个节点,并在redis中保存绑定关系。 +如果客户端所有消息都不指定具体的节点名,则每次都需要从redis拉取绑定关系,会影响路由速度。 +*/ +type UserBindService struct { + rdb *redis.Client + etcdRegistry *etcd.Registry[etcd.ServiceNode] +} + +func NewUserBindService(rdb *redis.Client, etcdRegistry *etcd.Registry[etcd.ServiceNode]) *UserBindService { + return &UserBindService{ + rdb: rdb, + etcdRegistry: etcdRegistry, + } +} + +func (m *UserBindService) makeRedisKey(userId int64, typeId pb.ServiceTypeId) string { + return fmt.Sprintf("%s_%d:%d", prefix, userId, int(typeId)) +} + +// 从redis中加载玩家曾经访问过的服务节点 +func (m *UserBindService) loadFromRedis(userId int64, typeId pb.ServiceTypeId) string { + k := m.makeRedisKey(userId, typeId) + if sName, err := m.rdb.Get(context.Background(), k).Result(); err != nil { + log.Error(err.Error()) + return "" + } else { + return sName + } +} + +// 从redis中解除玩家与节点的绑定关系 +func (m *UserBindService) DelUserService(userId int64, typeId pb.ServiceTypeId) { + k := m.makeRedisKey(userId, typeId) + _, _ = m.rdb.Del(context.Background(), k).Result() +} + +// 从etcd中检查节点是否有效,如果有game1(旧服),game2(新服),都算有效,但是旧服会拒绝新玩家进入, +// 此时旧服不止要拒绝新玩家,还要删除redis中的绑定关系。方便客户端重新发消息时路由到新的服务。 +func (m *UserBindService) serviceIsValid(serviceName string) bool { + valid := false + m.etcdRegistry.GetNodes().Range(func(k, v interface{}) bool { + if node, ok := v.(etcd.ServiceNode); ok { + if node.Name == serviceName { + valid = true + return false + } + } + return true + }) + return valid +} + +// 从etcd中找可用服务节点随机选择一个 +func (m *UserBindService) RandServiceNode(typeId pb.ServiceTypeId) (*etcd.ServiceNode, error) { + var nodes []etcd.ServiceNode + var version string + m.etcdRegistry.GetNodes().Range(func(_, value any) bool { + if node, ok := value.(etcd.ServiceNode); ok && node.TypeId == int(typeId) { + if version < node.Version { + version = node.Version + } + } + return true + }) + m.etcdRegistry.GetNodes().Range(func(_, value any) bool { + if node, ok := value.(etcd.ServiceNode); ok && node.TypeId == int(typeId) { + if version == node.Version { + nodes = append(nodes, node) + } + } + return true + }) + if len(nodes) == 0 { + return nil, fmt.Errorf("not found service node.type id: %v", typeId) + } + n := xrand.IntN(len(nodes)) + return &nodes[n], nil +} + +// 根据服务类型,路由到对应的服务节点 +func (m *UserBindService) FindServiceName(userId int64, typeId pb.ServiceTypeId) (string, error) { + // 内存中没有,向redis中查询。redis中保留的服务节点不一定是可用的,还需要向etcd中验证 + if sName := m.loadFromRedis(userId, typeId); sName != "" && m.serviceIsValid(sName) { + return sName, nil + } + // redis也没有玩家的服务节点信息,从etcd中找可用服务节点随机选择一个 + node, err := m.RandServiceNode(typeId) + if err != nil { + return "", err + } + m.rdb.Set(context.Background(), m.makeRedisKey(userId, typeId), node.Name, 2*24*time.Hour) + return node.Name, nil +} diff --git a/server/gate/server/service.go b/server/gate/server/service.go index d6d4a7f..30cd6a7 100644 --- a/server/gate/server/service.go +++ b/server/gate/server/service.go @@ -6,6 +6,7 @@ import ( "game/common/proto/pb" "game/common/serviceName" "game/common/topicName" + "game/common/userBindService" "game/common/utils" "game/server/gate/config" "game/server/gate/model" @@ -14,7 +15,6 @@ import ( "github.com/fox/fox/processor" "github.com/fox/fox/service" "github.com/fox/fox/ws" - "github.com/fox/fox/xrand" "github.com/golang/protobuf/proto" ) @@ -25,6 +25,7 @@ type GateService struct { etcdService *etcd.Registry[etcd.ServiceNode] wss *ws.WsServer processor *processor.Processor + bindService *userBindService.UserBindService } func Init() { @@ -77,6 +78,8 @@ func newGateService(serviceId int) *GateService { return nil } + s.bindService = userBindService.NewUserBindService(model.UserRedis, s.etcdService) + s.processor = processor.NewProcessor() s.initProcessor() s.OnInit() @@ -125,42 +128,17 @@ func (s *GateService) OnMessage(data []byte) error { return nil } -func (s *GateService) findService(serviceTypeId pb.ServiceTypeId) *etcd.ServiceNode { - var nodes []*etcd.ServiceNode - var newVer string - s.etcdService.GetNodes().Range(func(_, value interface{}) bool { - if node, ok := value.(*etcd.ServiceNode); ok && node.TypeId == int(serviceTypeId) { - if newVer < node.Version { - newVer = node.Version - } - nodes = append(nodes, node) - } - return true - }) - var newNodes []*etcd.ServiceNode - for _, node := range nodes { - if node.Version == newVer { - newNodes = append(newNodes, node) - } - } - if len(newNodes) == 0 { - return nil - } - return nodes[xrand.IntN(len(nodes))] -} - /* 查找topic,根据serviceTypeId以及玩家id查找玩家过往访问该服务的节点,优先使用原节点 */ func (s *GateService) findTopic(userId int64, serviceTypeId pb.ServiceTypeId) string { if userId != 0 { - if sName, ok := userServiceMgr.FindServiceName(userId, serviceTypeId); ok { + if sName, err := s.bindService.FindServiceName(userId, serviceTypeId); err == nil { return service.TopicEx(sName) + } else { + log.Error(err.Error()) } } - if sNode := s.findService(serviceTypeId); sNode != nil { - return service.TopicEx(sNode.Name) - } return "" } @@ -171,10 +149,16 @@ func (s *GateService) WsOnMessage(conn ws.IConn, data []byte) { log.Error(err.Error()) return } - if topic := s.findTopic(conn.UserId(), msg.ServiceId); topic != "" { - iMsg := &ipb.InternalMsg{ConnId: conn.Id(), UserId: conn.UserId(), MsgId: msg.MsgId, Msg: msg.Data} - dMsg, _ := proto.Marshal(iMsg) - _ = s.Send(topic, dMsg) + var topic string + if msg.SubServiceName != "" { + topic = service.TopicEx(msg.SubServiceName) + } else { + topic = s.findTopic(conn.UserId(), msg.ServiceTid) + } + if topic != "" { + s.SendServiceData(topic, conn, msg.MsgId, msg.Data) + } else { + log.Error(s.Log("topic:%v not exist.user:%v", topic, conn.UserId())) } log.Debug(s.Log("client to gate:%v", utils.Marshal(msg))) } @@ -211,7 +195,7 @@ func (s *GateService) SendClientMsg(conn ws.IConn, msgId int32, msg proto.Messag func (s *GateService) WsOnDisconnect(conn ws.IConn) { if conn.UserId() > 0 { - userServiceMgr.CleanUser(conn.UserId()) + s.bindService.DelUserService(conn.UserId(), pb.ServiceTypeId_STI_Gate) ug, err := model.NewUserGate().Get(conn.UserId(), s.Name()) if err != nil { log.Error(err.Error()) diff --git a/server/gate/server/userService.go b/server/gate/server/userService.go deleted file mode 100644 index 2a1ba39..0000000 --- a/server/gate/server/userService.go +++ /dev/null @@ -1,54 +0,0 @@ -package server - -import ( - "fmt" - "game/common/proto/pb" - "strings" - "sync" -) - -var userServiceMgr userServiceManager - -/* -记录玩家访问过的节点信息,玩家下线后清除相关信息,比如玩家在某个玩法服里掉线,重连后要回到该房间,应由对应的服务节点主动将玩家拉回房间 -*/ -type userServiceManager struct { - inService sync.Map // key:userId+service_type_id value:service_name -} - -func (m *userServiceManager) makeKey(userId int64, typeId pb.ServiceTypeId) string { - return fmt.Sprintf("%s_%d", userId, typeId) -} - -func (m *userServiceManager) Add(userId int64, typeId pb.ServiceTypeId, serviceName string) { - k := m.makeKey(userId, typeId) - m.inService.Store(k, serviceName) -} - -// 玩家下线,清除他呆过的所有服务节点信息 -func (m *userServiceManager) CleanUser(userId int64) { - var del []string - m.inService.Range(func(k, v interface{}) bool { - userServiceType := k.(string) - if strings.Contains(userServiceType, fmt.Sprintf("%d", userId)) { - del = append(del, userServiceType) - } - return true - }) - for _, k := range del { - m.inService.Delete(k) - } -} - -// todo:要考虑到旧服务可能已关闭,这里要访问etcd中是否有该服务节点,最好采用订阅机制,让etcd变动时清除对应服务节点 -func (m *userServiceManager) FindServiceName(userId int64, typeId pb.ServiceTypeId) (serviceName string, ok bool) { - var v any - if v, ok = m.inService.Load(m.makeKey(userId, typeId)); ok { - serviceName = v.(string) - } - return -} - -func (m *userServiceManager) Del(userId int64, typeId pb.ServiceTypeId) { - m.inService.Delete(m.makeKey(userId, typeId)) -}