diff --git a/processor/processor.go b/processor/processor.go index 7861edd..c79373e 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -2,6 +2,7 @@ package processor import ( "fmt" + "github.com/fox/fox/log" "github.com/golang/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" "reflect" @@ -33,11 +34,13 @@ func (h *Processor) RegisterMessages(metas RegisterMetas) { func (h *Processor) RegisterMessage(cmd int32, msg, delegate any) { h.types[cmd] = reflect.TypeOf(msg) h.delegates[cmd] = reflect.ValueOf(delegate) + log.DebugF("Unregister message %v", cmd) } func (h *Processor) UnregisterMessage(cmd int32) { delete(h.types, cmd) delete(h.delegates, cmd) + log.DebugF("Unregister message %v", cmd) } func (h *Processor) GetMessageType(cmd int32) reflect.Type { diff --git a/service/baseService.go b/service/baseService.go index c056e47..842195b 100644 --- a/service/baseService.go +++ b/service/baseService.go @@ -97,9 +97,9 @@ func (s *BaseService) Send(topic string, msg *ipb.InternalMsg) error { } return s.Err("send is nil") } -func (s *BaseService) Call(rpcTopic string, msg *ipb.InternalMsg, cb func(msg *ipb.InternalMsg)) error { +func (s *BaseService) Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error { if s.sender != nil { - return s.sender.Call(rpcTopic, msg, cb) + return s.sender.Call(rpcTopic, msg, subMsg, cb) } return s.Err("call is nil") } diff --git a/service/iservice.go b/service/iservice.go index 4e0b8f3..ea45d34 100644 --- a/service/iservice.go +++ b/service/iservice.go @@ -18,7 +18,7 @@ type IService interface { // 向服务内部消息管道写入消息 Write(msg []byte) error Send(topic string, msg *ipb.InternalMsg) error - Call(rpcTopic string, msg *ipb.InternalMsg, cb func(msg *ipb.InternalMsg)) error + Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error WaitStop() NotifyStop() @@ -26,7 +26,7 @@ type IService interface { type ISender interface { Send(topic string, msg *ipb.InternalMsg) error - Call(rpcTopic string, msg *ipb.InternalMsg, cb func(msg *ipb.InternalMsg)) error + Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error } type IOnFunc interface { diff --git a/service/natsService.go b/service/natsService.go index 77e5bdf..27a8712 100644 --- a/service/natsService.go +++ b/service/natsService.go @@ -101,7 +101,10 @@ func (n *NatsService) subscribeRpc() error { return n.SubscribeCb(RpcTopic(n), func(m *nats.Msg) { var iMsg = &ipb.InternalMsg{} _ = proto.Unmarshal(m.Data, iMsg) - _ = n.rpcProcessor.Dispatch(iMsg.RetRpcMsgId, iMsg) + if req, err := n.rpcProcessor.Unmarshal(iMsg.RetRpcMsgId, iMsg.Msg); err == nil { + err = n.rpcProcessor.Dispatch(iMsg.RetRpcMsgId, iMsg, req) + } + n.rpcProcessor.UnregisterMessage(iMsg.RetRpcMsgId) }) } @@ -133,17 +136,19 @@ func (s *NatsService) Send(topic string, msg *ipb.InternalMsg) error { } // 注册rpc响应方法,参数固定为func(*ipb.InternalMsg) -func (s *NatsService) RegisterRpcMessage(cmd int32, delegate any) { - s.rpcProcessor.RegisterMessage(cmd, ipb.InternalMsg{}, delegate) +func (s *NatsService) RegisterRpcMessage(cmd int32, subMsg, delegate any) { + s.rpcProcessor.RegisterMessage(cmd, subMsg, delegate) } -func (s *NatsService) Call(rpcTopic string, msg *ipb.InternalMsg, cb func(msg *ipb.InternalMsg)) error { +// subMsg为ipb.InternalMsg.Data解出来的proto类型 +// cb回调函数参数为InternalMsg, subMsg +func (s *NatsService) Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error { data, _ := proto.Marshal(ipb.MakeRpcMsg(msg.ServiceName, msg.ConnId, msg.UserId, msg.MsgId, msg.Msg)) err := s.nats.Publish(rpcTopic, data) if err != nil { return err } - s.RegisterRpcMessage(msg.RetRpcMsgId, cb) + s.RegisterRpcMessage(msg.RetRpcMsgId, subMsg, cb) return nil }