修改bug

This commit is contained in:
liuxiaobo 2025-05-29 17:06:54 +08:00
parent e5f3086cfd
commit 6460a4613e
4 changed files with 17 additions and 9 deletions

View File

@ -2,6 +2,7 @@ package processor
import ( import (
"fmt" "fmt"
"github.com/fox/fox/log"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoreflect"
"reflect" "reflect"
@ -33,11 +34,13 @@ func (h *Processor) RegisterMessages(metas RegisterMetas) {
func (h *Processor) RegisterMessage(cmd int32, msg, delegate any) { func (h *Processor) RegisterMessage(cmd int32, msg, delegate any) {
h.types[cmd] = reflect.TypeOf(msg) h.types[cmd] = reflect.TypeOf(msg)
h.delegates[cmd] = reflect.ValueOf(delegate) h.delegates[cmd] = reflect.ValueOf(delegate)
log.DebugF("Unregister message %v", cmd)
} }
func (h *Processor) UnregisterMessage(cmd int32) { func (h *Processor) UnregisterMessage(cmd int32) {
delete(h.types, cmd) delete(h.types, cmd)
delete(h.delegates, cmd) delete(h.delegates, cmd)
log.DebugF("Unregister message %v", cmd)
} }
func (h *Processor) GetMessageType(cmd int32) reflect.Type { func (h *Processor) GetMessageType(cmd int32) reflect.Type {

View File

@ -97,9 +97,9 @@ func (s *BaseService) Send(topic string, msg *ipb.InternalMsg) error {
} }
return s.Err("send is nil") return s.Err("send is nil")
} }
func (s *BaseService) Call(rpcTopic string, msg *ipb.InternalMsg, cb func(msg *ipb.InternalMsg)) error { func (s *BaseService) Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error {
if s.sender != nil { if s.sender != nil {
return s.sender.Call(rpcTopic, msg, cb) return s.sender.Call(rpcTopic, msg, subMsg, cb)
} }
return s.Err("call is nil") return s.Err("call is nil")
} }

View File

@ -18,7 +18,7 @@ type IService interface {
// 向服务内部消息管道写入消息 // 向服务内部消息管道写入消息
Write(msg []byte) error Write(msg []byte) error
Send(topic string, msg *ipb.InternalMsg) error Send(topic string, msg *ipb.InternalMsg) error
Call(rpcTopic string, msg *ipb.InternalMsg, cb func(msg *ipb.InternalMsg)) error Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error
WaitStop() WaitStop()
NotifyStop() NotifyStop()
@ -26,7 +26,7 @@ type IService interface {
type ISender interface { type ISender interface {
Send(topic string, msg *ipb.InternalMsg) error Send(topic string, msg *ipb.InternalMsg) error
Call(rpcTopic string, msg *ipb.InternalMsg, cb func(msg *ipb.InternalMsg)) error Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error
} }
type IOnFunc interface { type IOnFunc interface {

View File

@ -101,7 +101,10 @@ func (n *NatsService) subscribeRpc() error {
return n.SubscribeCb(RpcTopic(n), func(m *nats.Msg) { return n.SubscribeCb(RpcTopic(n), func(m *nats.Msg) {
var iMsg = &ipb.InternalMsg{} var iMsg = &ipb.InternalMsg{}
_ = proto.Unmarshal(m.Data, iMsg) _ = proto.Unmarshal(m.Data, iMsg)
_ = n.rpcProcessor.Dispatch(iMsg.RetRpcMsgId, iMsg) if req, err := n.rpcProcessor.Unmarshal(iMsg.RetRpcMsgId, iMsg.Msg); err == nil {
err = n.rpcProcessor.Dispatch(iMsg.RetRpcMsgId, iMsg, req)
}
n.rpcProcessor.UnregisterMessage(iMsg.RetRpcMsgId)
}) })
} }
@ -133,17 +136,19 @@ func (s *NatsService) Send(topic string, msg *ipb.InternalMsg) error {
} }
// 注册rpc响应方法参数固定为func(*ipb.InternalMsg) // 注册rpc响应方法参数固定为func(*ipb.InternalMsg)
func (s *NatsService) RegisterRpcMessage(cmd int32, delegate any) { func (s *NatsService) RegisterRpcMessage(cmd int32, subMsg, delegate any) {
s.rpcProcessor.RegisterMessage(cmd, ipb.InternalMsg{}, delegate) s.rpcProcessor.RegisterMessage(cmd, subMsg, delegate)
} }
func (s *NatsService) Call(rpcTopic string, msg *ipb.InternalMsg, cb func(msg *ipb.InternalMsg)) error { // subMsg为ipb.InternalMsg.Data解出来的proto类型
// cb回调函数参数为InternalMsg, subMsg
func (s *NatsService) Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error {
data, _ := proto.Marshal(ipb.MakeRpcMsg(msg.ServiceName, msg.ConnId, msg.UserId, msg.MsgId, msg.Msg)) data, _ := proto.Marshal(ipb.MakeRpcMsg(msg.ServiceName, msg.ConnId, msg.UserId, msg.MsgId, msg.Msg))
err := s.nats.Publish(rpcTopic, data) err := s.nats.Publish(rpcTopic, data)
if err != nil { if err != nil {
return err return err
} }
s.RegisterRpcMessage(msg.RetRpcMsgId, cb) s.RegisterRpcMessage(msg.RetRpcMsgId, subMsg, cb)
return nil return nil
} }