From b07cff2f5d9cfa6dfb7fbd8c784e37c21506e8ae Mon Sep 17 00:00:00 2001 From: liuxiaobo <1224730913@qq.com> Date: Mon, 2 Jun 2025 21:37:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ipb/helper.go | 25 ++++++++++++++----------- ipb/helper_test.go | 2 ++ service/natsService.go | 3 +++ 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/ipb/helper.go b/ipb/helper.go index 9301bc0..f10c4a9 100644 --- a/ipb/helper.go +++ b/ipb/helper.go @@ -1,5 +1,7 @@ package ipb +import "encoding/json" + func MakeMsg(serviceName string, connId uint32, userId int64, msgId int32, msg []byte) *InternalMsg { return &InternalMsg{ ServiceName: serviceName, @@ -10,14 +12,15 @@ func MakeMsg(serviceName string, connId uint32, userId int64, msgId int32, msg [ } } -//func MakeRpcMsg(serviceName string, connId uint32, userId int64, msgId int32, msg []byte) *InternalMsg { -// return &InternalMsg{ -// ServiceName: serviceName, -// ConnId: connId, -// UserId: userId, -// MsgId: msgId, -// Msg: msg, -// Type: MsgType_RpcMsg, -// RetRpcMsgId: genRpcId(), -// } -//} +func MakeRpcMsg[T any](rpcMsgId string, userId int64, msg *T) *InternalMsg { + data, _ := json.Marshal(msg) + return &InternalMsg{ + ServiceName: "", + ConnId: 0, + UserId: userId, + MsgId: 0, + Msg: data, + Type: MsgType_RpcMsg, + RpcMsgId: rpcMsgId, + } +} diff --git a/ipb/helper_test.go b/ipb/helper_test.go index 36eb894..181afae 100644 --- a/ipb/helper_test.go +++ b/ipb/helper_test.go @@ -13,4 +13,6 @@ func TestWipeWarn(t *testing.T) { password := fmt.Sprintf("%x", md5.Sum([]byte(username))) t.Log(password) + s := "hello world" + t.Log(MakeRpcMsg[string]("test.rpc", 0, &s)) } diff --git a/service/natsService.go b/service/natsService.go index 483e5bb..a4275d4 100644 --- a/service/natsService.go +++ b/service/natsService.go @@ -144,6 +144,9 @@ func (s *NatsService) Send(topic string, msg *ipb.InternalMsg) error { return s.nats.Publish(topic, data) } +// call会阻塞主协程,需要起新协程去等待返回值,业务逻辑则转回主协程处理。 +// 不要将处理返回值的业务逻辑放到新协程,避免业务逻辑中有数据并发问题 +// 比如并发call拉用户数据,然后操作本地的user map func (s *NatsService) Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) { data, _ := proto.Marshal(msg) response, err := s.nats.Rpc(rpcTopic, timeout, data)