diff --git a/internalPb/internal.proto b/internalPb/internal.proto index bb4edf8..0d778eb 100644 --- a/internalPb/internal.proto +++ b/internalPb/internal.proto @@ -22,7 +22,7 @@ message InternalMsg int32 msg_id = 4; // 消息id bytes msg = 5; // 消息 MsgType type = 6; // 消息类型 - int32 ret_rpc_msg_id = 7; // 依赖本字段把数据回传给service_name节点的rpc processor去处理 + string rpc_msg_id = 7; // rpc msg id } diff --git a/ipb/internal.pb.go b/ipb/internal.pb.go index e99ffc9..715b284 100644 --- a/ipb/internal.pb.go +++ b/ipb/internal.pb.go @@ -115,13 +115,13 @@ func (MsgType) EnumDescriptor() ([]byte, []int) { type InternalMsg struct { state protoimpl.MessageState `protogen:"open.v1"` - ServiceName string `protobuf:"bytes,1,opt,name=service_name,json=serviceName,proto3" json:"service_name,omitempty"` // 该服务类型下的具体的服务节点名,需要保证该消息是该服务节点发的。否则可能会导致客户端出现路由错误 - ConnId uint32 `protobuf:"varint,2,opt,name=conn_id,json=connId,proto3" json:"conn_id,omitempty"` // 刚登陆时没有user_id,只有conn_id - UserId int64 `protobuf:"varint,3,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` // 玩家id - MsgId int32 `protobuf:"varint,4,opt,name=msg_id,json=msgId,proto3" json:"msg_id,omitempty"` // 消息id - Msg []byte `protobuf:"bytes,5,opt,name=msg,proto3" json:"msg,omitempty"` // 消息 - Type MsgType `protobuf:"varint,6,opt,name=type,proto3,enum=ipb.MsgType" json:"type,omitempty"` // 消息类型 - RetRpcMsgId int32 `protobuf:"varint,7,opt,name=ret_rpc_msg_id,json=retRpcMsgId,proto3" json:"ret_rpc_msg_id,omitempty"` // 依赖本字段把数据回传给service_name节点的rpc processor去处理 + ServiceName string `protobuf:"bytes,1,opt,name=service_name,json=serviceName,proto3" json:"service_name,omitempty"` // 该服务类型下的具体的服务节点名,需要保证该消息是该服务节点发的。否则可能会导致客户端出现路由错误 + ConnId uint32 `protobuf:"varint,2,opt,name=conn_id,json=connId,proto3" json:"conn_id,omitempty"` // 刚登陆时没有user_id,只有conn_id + UserId int64 `protobuf:"varint,3,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` // 玩家id + MsgId int32 `protobuf:"varint,4,opt,name=msg_id,json=msgId,proto3" json:"msg_id,omitempty"` // 消息id + Msg []byte `protobuf:"bytes,5,opt,name=msg,proto3" json:"msg,omitempty"` // 消息 + Type MsgType `protobuf:"varint,6,opt,name=type,proto3,enum=ipb.MsgType" json:"type,omitempty"` // 消息类型 + RpcMsgId string `protobuf:"bytes,7,opt,name=rpc_msg_id,json=rpcMsgId,proto3" json:"rpc_msg_id,omitempty"` // rpc msg id unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -198,26 +198,27 @@ func (x *InternalMsg) GetType() MsgType { return MsgType_NormalMsg } -func (x *InternalMsg) GetRetRpcMsgId() int32 { +func (x *InternalMsg) GetRpcMsgId() string { if x != nil { - return x.RetRpcMsgId + return x.RpcMsgId } - return 0 + return "" } var File_internal_proto protoreflect.FileDescriptor const file_internal_proto_rawDesc = "" + "\n" + - "\x0einternal.proto\x12\x03ipb\"\xd2\x01\n" + + "\x0einternal.proto\x12\x03ipb\"\xcb\x01\n" + "\vInternalMsg\x12!\n" + "\fservice_name\x18\x01 \x01(\tR\vserviceName\x12\x17\n" + "\aconn_id\x18\x02 \x01(\rR\x06connId\x12\x17\n" + "\auser_id\x18\x03 \x01(\x03R\x06userId\x12\x15\n" + "\x06msg_id\x18\x04 \x01(\x05R\x05msgId\x12\x10\n" + "\x03msg\x18\x05 \x01(\fR\x03msg\x12 \n" + - "\x04type\x18\x06 \x01(\x0e2\f.ipb.MsgTypeR\x04type\x12#\n" + - "\x0eret_rpc_msg_id\x18\a \x01(\x05R\vretRpcMsgId*+\n" + + "\x04type\x18\x06 \x01(\x0e2\f.ipb.MsgTypeR\x04type\x12\x1c\n" + + "\n" + + "rpc_msg_id\x18\a \x01(\tR\brpcMsgId*+\n" + "\x05MsgId\x12\v\n" + "\aUnknown\x10\x00\x12\x15\n" + "\bInternal\x10\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01*$\n" + diff --git a/processor/pb/internal.pb.go b/processor/pb/internal.pb.go deleted file mode 100644 index a043c86..0000000 --- a/processor/pb/internal.pb.go +++ /dev/null @@ -1,277 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.36.6 -// protoc v6.31.0 -// source: internal.proto - -package pb - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" - unsafe "unsafe" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type MsgId int32 - -const ( - MsgId_Unknown MsgId = 0 - MsgId_Internal MsgId = -1 // 内部消息id -) - -// Enum value maps for MsgId. -var ( - MsgId_name = map[int32]string{ - 0: "Unknown", - -1: "Internal", - } - MsgId_value = map[string]int32{ - "Unknown": 0, - "Internal": -1, - } -) - -func (x MsgId) Enum() *MsgId { - p := new(MsgId) - *p = x - return p -} - -func (x MsgId) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (MsgId) Descriptor() protoreflect.EnumDescriptor { - return file_internal_proto_enumTypes[0].Descriptor() -} - -func (MsgId) Type() protoreflect.EnumType { - return &file_internal_proto_enumTypes[0] -} - -func (x MsgId) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use MsgId.Descriptor instead. -func (MsgId) EnumDescriptor() ([]byte, []int) { - return file_internal_proto_rawDescGZIP(), []int{0} -} - -type InternalMsg struct { - state protoimpl.MessageState `protogen:"open.v1"` - ConnId uint32 `protobuf:"varint,1,opt,name=conn_id,json=connId,proto3" json:"conn_id,omitempty"` // 刚登陆时没有user_id,只有conn_id - UserId int64 `protobuf:"varint,2,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` // 玩家id - MsgId int32 `protobuf:"varint,3,opt,name=msg_id,json=msgId,proto3" json:"msg_id,omitempty"` // 消息id - Msg []byte `protobuf:"bytes,4,opt,name=msg,proto3" json:"msg,omitempty"` // 消息 - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *InternalMsg) Reset() { - *x = InternalMsg{} - mi := &file_internal_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *InternalMsg) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*InternalMsg) ProtoMessage() {} - -func (x *InternalMsg) ProtoReflect() protoreflect.Message { - mi := &file_internal_proto_msgTypes[0] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use InternalMsg.ProtoReflect.Descriptor instead. -func (*InternalMsg) Descriptor() ([]byte, []int) { - return file_internal_proto_rawDescGZIP(), []int{0} -} - -func (x *InternalMsg) GetConnId() uint32 { - if x != nil { - return x.ConnId - } - return 0 -} - -func (x *InternalMsg) GetUserId() int64 { - if x != nil { - return x.UserId - } - return 0 -} - -func (x *InternalMsg) GetMsgId() int32 { - if x != nil { - return x.MsgId - } - return 0 -} - -func (x *InternalMsg) GetMsg() []byte { - if x != nil { - return x.Msg - } - return nil -} - -// 网关解包客户端消息 -type C2SMessage struct { - state protoimpl.MessageState `protogen:"open.v1"` - ServiceType int32 `protobuf:"varint,1,opt,name=service_type,json=serviceType,proto3" json:"service_type,omitempty"` // 服务类型,通过该值判断发往lobby,game,chat等内部服务 - MsgId int32 `protobuf:"varint,2,opt,name=msg_id,json=msgId,proto3" json:"msg_id,omitempty"` // 消息id - UserId int64 `protobuf:"varint,3,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` // 玩家id - Msg []byte `protobuf:"bytes,4,opt,name=msg,proto3" json:"msg,omitempty"` // 消息 - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *C2SMessage) Reset() { - *x = C2SMessage{} - mi := &file_internal_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *C2SMessage) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*C2SMessage) ProtoMessage() {} - -func (x *C2SMessage) ProtoReflect() protoreflect.Message { - mi := &file_internal_proto_msgTypes[1] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use C2SMessage.ProtoReflect.Descriptor instead. -func (*C2SMessage) Descriptor() ([]byte, []int) { - return file_internal_proto_rawDescGZIP(), []int{1} -} - -func (x *C2SMessage) GetServiceType() int32 { - if x != nil { - return x.ServiceType - } - return 0 -} - -func (x *C2SMessage) GetMsgId() int32 { - if x != nil { - return x.MsgId - } - return 0 -} - -func (x *C2SMessage) GetUserId() int64 { - if x != nil { - return x.UserId - } - return 0 -} - -func (x *C2SMessage) GetMsg() []byte { - if x != nil { - return x.Msg - } - return nil -} - -var File_internal_proto protoreflect.FileDescriptor - -const file_internal_proto_rawDesc = "" + - "\n" + - "\x0einternal.proto\x12\x03ipb\"h\n" + - "\vInternalMsg\x12\x17\n" + - "\aconn_id\x18\x01 \x01(\rR\x06connId\x12\x17\n" + - "\auser_id\x18\x02 \x01(\x03R\x06userId\x12\x15\n" + - "\x06msg_id\x18\x03 \x01(\x05R\x05msgId\x12\x10\n" + - "\x03msg\x18\x04 \x01(\fR\x03msg\"q\n" + - "\n" + - "C2SMessage\x12!\n" + - "\fservice_type\x18\x01 \x01(\x05R\vserviceType\x12\x15\n" + - "\x06msg_id\x18\x02 \x01(\x05R\x05msgId\x12\x17\n" + - "\auser_id\x18\x03 \x01(\x03R\x06userId\x12\x10\n" + - "\x03msg\x18\x04 \x01(\fR\x03msg*+\n" + - "\x05MsgId\x12\v\n" + - "\aUnknown\x10\x00\x12\x15\n" + - "\bInternal\x10\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01B\x12Z\x10common/proto/ipbb\x06proto3" - -var ( - file_internal_proto_rawDescOnce sync.Once - file_internal_proto_rawDescData []byte -) - -func file_internal_proto_rawDescGZIP() []byte { - file_internal_proto_rawDescOnce.Do(func() { - file_internal_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_internal_proto_rawDesc), len(file_internal_proto_rawDesc))) - }) - return file_internal_proto_rawDescData -} - -var file_internal_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_internal_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_internal_proto_goTypes = []any{ - (MsgId)(0), // 0: ipb.MsgId - (*InternalMsg)(nil), // 1: ipb.InternalMsg - (*C2SMessage)(nil), // 2: ipb.C2SMessage -} -var file_internal_proto_depIdxs = []int32{ - 0, // [0:0] is the sub-list for method output_type - 0, // [0:0] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_internal_proto_init() } -func file_internal_proto_init() { - if File_internal_proto != nil { - return - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: unsafe.Slice(unsafe.StringData(file_internal_proto_rawDesc), len(file_internal_proto_rawDesc)), - NumEnums: 1, - NumMessages: 2, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_internal_proto_goTypes, - DependencyIndexes: file_internal_proto_depIdxs, - EnumInfos: file_internal_proto_enumTypes, - MessageInfos: file_internal_proto_msgTypes, - }.Build() - File_internal_proto = out.File - file_internal_proto_goTypes = nil - file_internal_proto_depIdxs = nil -} diff --git a/processor/processor_test.go b/processor/processor_test.go index d70946e..dec8499 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -2,12 +2,12 @@ package processor import ( "fmt" - "github.com/fox/fox/processor/pb" + "github.com/fox/fox/ipb" "github.com/golang/protobuf/proto" "testing" ) -func onChat(userId int64, req *pb.InternalMsg) { +func onChat(userId int64, req *ipb.InternalMsg) { _ = userId fmt.Println("onChat.", string(req.Msg)) } @@ -15,14 +15,14 @@ func onChat(userId int64, req *pb.InternalMsg) { func TestProcessor(t *testing.T) { p := NewProcessor() p.RegisterMessages(RegisterMetas{ - pb.MsgId_Internal: {pb.InternalMsg{}, onChat}, + ipb.MsgId_Internal: {ipb.InternalMsg{}, onChat}, }) - tmp := &pb.InternalMsg{UserId: 1, ConnId: 1, MsgId: int32(pb.MsgId_Internal), Msg: []byte("hello world")} + tmp := &ipb.InternalMsg{UserId: 1, ConnId: 1, MsgId: int32(ipb.MsgId_Internal), Msg: []byte("hello world")} data, _ := proto.Marshal(tmp) - req, _ := p.Unmarshal(int32(pb.MsgId_Internal), data) - if err := p.Dispatch(int32(pb.MsgId_Internal), int64(1), req); err != nil { + req, _ := p.Unmarshal(int32(ipb.MsgId_Internal), data) + if err := p.Dispatch(int32(ipb.MsgId_Internal), int64(1), req); err != nil { t.Error(err) } } @@ -34,15 +34,15 @@ func registerMessage(p *Processor, cmd int32, msg any, cb any) { func TestProcessorAny(t *testing.T) { p := NewProcessor() p.RegisterMessages(RegisterMetas{ - pb.MsgId_Internal: {pb.InternalMsg{}, onChat}, + ipb.MsgId_Internal: {ipb.InternalMsg{}, onChat}, }) - registerMessage(p, int32(pb.MsgId_Internal), pb.InternalMsg{}, onChat) + registerMessage(p, int32(ipb.MsgId_Internal), ipb.InternalMsg{}, onChat) - tmp := &pb.InternalMsg{UserId: 1, ConnId: 1, MsgId: int32(pb.MsgId_Internal), Msg: []byte("hello world")} + tmp := &ipb.InternalMsg{UserId: 1, ConnId: 1, MsgId: int32(ipb.MsgId_Internal), Msg: []byte("hello world")} data, _ := proto.Marshal(tmp) - req, _ := p.Unmarshal(int32(pb.MsgId_Internal), data) - if err := p.Dispatch(int32(pb.MsgId_Internal), int64(1), req); err != nil { + req, _ := p.Unmarshal(int32(ipb.MsgId_Internal), data) + if err := p.Dispatch(int32(ipb.MsgId_Internal), int64(1), req); err != nil { t.Error(err) } } diff --git a/processor/rpcProcessor.go b/processor/rpcProcessor.go new file mode 100644 index 0000000..932fa91 --- /dev/null +++ b/processor/rpcProcessor.go @@ -0,0 +1,43 @@ +package processor + +import ( + "fmt" + "github.com/fox/fox/ipb" + "github.com/fox/fox/log" +) + +type RpcHandler func(*ipb.InternalMsg) *ipb.InternalMsg + +func NewRpcProcessor() *RpcProcessor { + return &RpcProcessor{ + delegates: make(map[string]RpcHandler), + } +} + +type RpcProcessor struct { + delegates map[string]RpcHandler +} + +func (h *RpcProcessor) RegisterMessages(metas map[string]RpcHandler) { + for cmd, handler := range metas { + h.RegisterMessage(cmd, handler) + } +} + +func (h *RpcProcessor) RegisterMessage(cmd string, delegate RpcHandler) { + h.delegates[cmd] = delegate + log.DebugF("processor register message %v", cmd) +} + +func (h *RpcProcessor) UnregisterMessage(cmd string) { + delete(h.delegates, cmd) + log.DebugF("processor unregister message %v", cmd) +} + +func (h *RpcProcessor) Dispatch(cmd string, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) { + delegate, ok := h.delegates[cmd] + if !ok { + return nil, fmt.Errorf("cmd %v delegates not found", cmd) + } + return delegate(msg), nil +} diff --git a/service/natsService.go b/service/natsService.go index 8974b8d..483e5bb 100644 --- a/service/natsService.go +++ b/service/natsService.go @@ -35,7 +35,7 @@ type NatsService struct { *BaseService nats *nat.Nats registry *etcd.Registry[etcd.ServiceNode] - rpcProcessor *processor.Processor + RpcProcessor *processor.RpcProcessor node etcd.ServiceNode // 本服务节点信息 } @@ -46,6 +46,7 @@ func NewNatsService(param *InitNatsServiceParams) (*NatsService, error) { if s.registry, err = etcd.NewRegistry[etcd.ServiceNode](param.EtcdAddress, param.EtcdUsername, param.EtcdPassword); err != nil { return nil, err } + s.RpcProcessor = processor.NewRpcProcessor() s.node = etcd.ServiceNode{ TypeId: param.TypeId, Name: s.Name(), @@ -105,17 +106,14 @@ func (n *NatsService) subscribeUpdateService() error { func (n *NatsService) subscribeRpc() error { return n.SubscribeCb(RpcTopic(n), func(m *nats.Msg) { var iMsg = &ipb.InternalMsg{} - // 承担接收返回值的任务 - var rsp = &ipb.InternalMsg{} _ = proto.Unmarshal(m.Data, iMsg) - if req, err := n.rpcProcessor.Unmarshal(iMsg.RetRpcMsgId, iMsg.Msg); err == nil { - if err = n.rpcProcessor.Dispatch(iMsg.RetRpcMsgId, iMsg, req, rsp); err != nil { - log.Error(err.Error()) - } - rspData, _ := proto.Marshal(rsp) - _ = m.Respond(rspData) + rsp, err := n.RpcProcessor.Dispatch(iMsg.RpcMsgId, iMsg) + if err != nil { + log.Error(err.Error()) + return } - n.rpcProcessor.UnregisterMessage(iMsg.RetRpcMsgId) + rspData, _ := proto.Marshal(rsp) + _ = m.Respond(rspData) }) } @@ -146,11 +144,6 @@ func (s *NatsService) Send(topic string, msg *ipb.InternalMsg) error { return s.nats.Publish(topic, data) } -// 注册rpc响应方法,参数固定为func(*ipb.InternalMsg) -func (s *NatsService) RegisterRpcMessage(cmd int32, subMsg, delegate any) { - s.rpcProcessor.RegisterMessage(cmd, subMsg, delegate) -} - func (s *NatsService) Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) { data, _ := proto.Marshal(msg) response, err := s.nats.Rpc(rpcTopic, timeout, data)