修改rpc机制

This commit is contained in:
liuxiaobo 2025-06-01 09:57:01 +08:00
parent ea6d9745af
commit 028190fc5b
4 changed files with 22 additions and 16 deletions

View File

@ -122,8 +122,8 @@ func (n *Nats) Publish(topic string, msg []byte) error {
return n.nc.Publish(topic, msg) return n.nc.Publish(topic, msg)
} }
func (n *Nats) Rpc(topic string, msg []byte) ([]byte, error) { func (n *Nats) Rpc(topic string, timeout time.Duration, msg []byte) ([]byte, error) {
rsp, err := n.nc.Request(topic, msg, 30*time.Second) rsp, err := n.nc.Request(topic, msg, timeout)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -99,11 +99,11 @@ func (s *BaseService) Send(topic string, msg *ipb.InternalMsg) error {
} }
return s.Err("send is nil") return s.Err("send is nil")
} }
func (s *BaseService) Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error { func (s *BaseService) Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) {
if s.sender != nil { if s.sender != nil {
return s.sender.Call(rpcTopic, msg, subMsg, cb) return s.sender.Call(rpcTopic, timeout, msg)
} }
return s.Err("call is nil") return nil, s.Err("call is nil")
} }
func (s *BaseService) WaitStop() { func (s *BaseService) WaitStop() {

View File

@ -18,7 +18,7 @@ type IService interface {
// 向服务内部消息管道写入消息 // 向服务内部消息管道写入消息
Write(msg []byte) error Write(msg []byte) error
Send(topic string, msg *ipb.InternalMsg) error Send(topic string, msg *ipb.InternalMsg) error
Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error)
WaitStop() WaitStop()
NotifyStop() NotifyStop()
@ -26,7 +26,7 @@ type IService interface {
type ISender interface { type ISender interface {
Send(topic string, msg *ipb.InternalMsg) error Send(topic string, msg *ipb.InternalMsg) error
Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error)
} }
type IOnFunc interface { type IOnFunc interface {

View File

@ -11,6 +11,7 @@ import (
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"os" "os"
"sync" "sync"
"time"
) )
const ( const (
@ -104,9 +105,15 @@ func (n *NatsService) subscribeUpdateService() error {
func (n *NatsService) subscribeRpc() error { func (n *NatsService) subscribeRpc() error {
return n.SubscribeCb(RpcTopic(n), func(m *nats.Msg) { return n.SubscribeCb(RpcTopic(n), func(m *nats.Msg) {
var iMsg = &ipb.InternalMsg{} var iMsg = &ipb.InternalMsg{}
// 承担接收返回值的任务
var rsp = &ipb.InternalMsg{}
_ = proto.Unmarshal(m.Data, iMsg) _ = proto.Unmarshal(m.Data, iMsg)
if req, err := n.rpcProcessor.Unmarshal(iMsg.RetRpcMsgId, iMsg.Msg); err == nil { if req, err := n.rpcProcessor.Unmarshal(iMsg.RetRpcMsgId, iMsg.Msg); err == nil {
err = n.rpcProcessor.Dispatch(iMsg.RetRpcMsgId, iMsg, req) if err = n.rpcProcessor.Dispatch(iMsg.RetRpcMsgId, iMsg, req, rsp); err != nil {
log.Error(err.Error())
}
rspData, _ := proto.Marshal(rsp)
_ = m.Respond(rspData)
} }
n.rpcProcessor.UnregisterMessage(iMsg.RetRpcMsgId) n.rpcProcessor.UnregisterMessage(iMsg.RetRpcMsgId)
}) })
@ -144,16 +151,15 @@ func (s *NatsService) RegisterRpcMessage(cmd int32, subMsg, delegate any) {
s.rpcProcessor.RegisterMessage(cmd, subMsg, delegate) s.rpcProcessor.RegisterMessage(cmd, subMsg, delegate)
} }
// subMsg为ipb.InternalMsg.Data解出来的proto类型 func (s *NatsService) Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) {
// cb回调函数参数为InternalMsg, subMsg data, _ := proto.Marshal(msg)
func (s *NatsService) Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error { response, err := s.nats.Rpc(rpcTopic, timeout, data)
data, _ := proto.Marshal(ipb.MakeRpcMsg(msg.ServiceName, msg.ConnId, msg.UserId, msg.MsgId, msg.Msg))
err := s.nats.Publish(rpcTopic, data)
if err != nil { if err != nil {
return err return nil, err
} }
s.RegisterRpcMessage(msg.RetRpcMsgId, subMsg, cb) rspMsg := &ipb.InternalMsg{}
return nil _ = proto.Unmarshal(response, rspMsg)
return rspMsg, nil
} }
func (s *NatsService) ServiceEtcd() *etcd.Registry[etcd.ServiceNode] { func (s *NatsService) ServiceEtcd() *etcd.Registry[etcd.ServiceNode] {