diff --git a/nat/nats.go b/nat/nats.go index a7f18e7..f1fcc4f 100644 --- a/nat/nats.go +++ b/nat/nats.go @@ -122,8 +122,8 @@ func (n *Nats) Publish(topic string, msg []byte) error { return n.nc.Publish(topic, msg) } -func (n *Nats) Rpc(topic string, msg []byte) ([]byte, error) { - rsp, err := n.nc.Request(topic, msg, 30*time.Second) +func (n *Nats) Rpc(topic string, timeout time.Duration, msg []byte) ([]byte, error) { + rsp, err := n.nc.Request(topic, msg, timeout) if err != nil { return nil, err } diff --git a/service/baseService.go b/service/baseService.go index f0c2399..188e917 100644 --- a/service/baseService.go +++ b/service/baseService.go @@ -99,11 +99,11 @@ func (s *BaseService) Send(topic string, msg *ipb.InternalMsg) error { } return s.Err("send is nil") } -func (s *BaseService) Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error { +func (s *BaseService) Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) { if s.sender != nil { - return s.sender.Call(rpcTopic, msg, subMsg, cb) + return s.sender.Call(rpcTopic, timeout, msg) } - return s.Err("call is nil") + return nil, s.Err("call is nil") } func (s *BaseService) WaitStop() { diff --git a/service/iservice.go b/service/iservice.go index ea45d34..6068e95 100644 --- a/service/iservice.go +++ b/service/iservice.go @@ -18,7 +18,7 @@ type IService interface { // 向服务内部消息管道写入消息 Write(msg []byte) error Send(topic string, msg *ipb.InternalMsg) error - Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error + Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) WaitStop() NotifyStop() @@ -26,7 +26,7 @@ type IService interface { type ISender interface { Send(topic string, msg *ipb.InternalMsg) error - Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error + Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) } type IOnFunc interface { diff --git a/service/natsService.go b/service/natsService.go index fcb289a..8974b8d 100644 --- a/service/natsService.go +++ b/service/natsService.go @@ -11,6 +11,7 @@ import ( "github.com/nats-io/nats.go" "os" "sync" + "time" ) const ( @@ -104,9 +105,15 @@ func (n *NatsService) subscribeUpdateService() error { func (n *NatsService) subscribeRpc() error { return n.SubscribeCb(RpcTopic(n), func(m *nats.Msg) { var iMsg = &ipb.InternalMsg{} + // 承担接收返回值的任务 + var rsp = &ipb.InternalMsg{} _ = proto.Unmarshal(m.Data, iMsg) if req, err := n.rpcProcessor.Unmarshal(iMsg.RetRpcMsgId, iMsg.Msg); err == nil { - err = n.rpcProcessor.Dispatch(iMsg.RetRpcMsgId, iMsg, req) + if err = n.rpcProcessor.Dispatch(iMsg.RetRpcMsgId, iMsg, req, rsp); err != nil { + log.Error(err.Error()) + } + rspData, _ := proto.Marshal(rsp) + _ = m.Respond(rspData) } n.rpcProcessor.UnregisterMessage(iMsg.RetRpcMsgId) }) @@ -144,16 +151,15 @@ func (s *NatsService) RegisterRpcMessage(cmd int32, subMsg, delegate any) { s.rpcProcessor.RegisterMessage(cmd, subMsg, delegate) } -// subMsg为ipb.InternalMsg.Data解出来的proto类型 -// cb回调函数参数为InternalMsg, subMsg -func (s *NatsService) Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error { - data, _ := proto.Marshal(ipb.MakeRpcMsg(msg.ServiceName, msg.ConnId, msg.UserId, msg.MsgId, msg.Msg)) - err := s.nats.Publish(rpcTopic, data) +func (s *NatsService) Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) { + data, _ := proto.Marshal(msg) + response, err := s.nats.Rpc(rpcTopic, timeout, data) if err != nil { - return err + return nil, err } - s.RegisterRpcMessage(msg.RetRpcMsgId, subMsg, cb) - return nil + rspMsg := &ipb.InternalMsg{} + _ = proto.Unmarshal(response, rspMsg) + return rspMsg, nil } func (s *NatsService) ServiceEtcd() *etcd.Registry[etcd.ServiceNode] {