From 5b96d43038838b1c5806bdddd47dd0c9eda52ca7 Mon Sep 17 00:00:00 2001 From: liuxiaobo <1224730913@qq.com> Date: Mon, 16 Jun 2025 22:30:58 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=8A=82=E7=82=B9call?= =?UTF-8?q?=E5=8F=8Asend=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- etcd/inode.go | 32 ++++++++++++++++++++++++++++---- etcd/serviceType.go | 19 ------------------- service/baseService.go | 23 +++++++++++++++++++---- service/iservice.go | 12 ++++++++---- service/natsService.go | 20 ++++++++++++++++++-- service/natsService_test.go | 2 +- 6 files changed, 74 insertions(+), 34 deletions(-) delete mode 100644 etcd/serviceType.go diff --git a/etcd/inode.go b/etcd/inode.go index 52ba877..db261af 100644 --- a/etcd/inode.go +++ b/etcd/inode.go @@ -14,12 +14,36 @@ type INode interface { MapKey() string } +type ServiceNodeStateType int + +const ( + SNST_Stateless = 0 // 无状态服务,如大厅,聊天服,匹配服,登陆服等 + SNST_Stateful = 1 // 有状态服务,主要是各类玩法服,需要记录玩家在哪个玩法服里面,以便消息不会发到同玩法的其它节点服上 + SNST_Ordered = 2 // 有序服务,主要是db服这类需要根据玩家id hash到固定的节点上,保证数据一致性 +) + +func (s ServiceNodeStateType) String() string { + switch s { + case SNST_Stateless: + return "stateless" + case SNST_Stateful: + return "stateful" + case SNST_Ordered: + return "ordered" + default: + return "unknown" + } +} + // 服务节点信息,TypeId及Type都是标记同类型的节点,Name是区别该节点与其它节点的字段 type ServiceNode struct { - TypeId int `json:"type_id"` // 服务类型id,与Type字段功能一样, 由proto定义,gate通过该字段找到这类服务的所有节点信息 - Name string `json:"name"` // 服务名 多个同类服务依赖name区分:lobby1,lobby2,lobby3等等 - Type string `json:"type"` // 服务类型:lobby, game, gate等等 - Version string `json:"version"` // 版本号 + TypeId int `json:"type_id"` // 服务类型id,与Type字段功能一样, 由proto定义,gate通过该字段找到这类服务的所有节点信息 + Name string `json:"name"` // 服务名 多个同类服务依赖name区分:lobby1,lobby2,lobby3等等 + Type string `json:"type"` // 服务类型:lobby, game, gate等等 + Version string `json:"version"` // 版本号 + MinMsgId int `json:"min_msg_id"` // 能处理的最小消息id + MaxMsgId int `json:"max_msg_id"` // 能处理的最大消息id + ServiceType ServiceNodeStateType `json:"service_type"` // 无状态节点或有状态节点或有序节点 } func (s ServiceNode) EtcdKey() string { diff --git a/etcd/serviceType.go b/etcd/serviceType.go deleted file mode 100644 index ae1ba5e..0000000 --- a/etcd/serviceType.go +++ /dev/null @@ -1,19 +0,0 @@ -package etcd - -type ServiceType int - -const ( - Unique ServiceType = 1 // 唯一 - Multiple ServiceType = 2 // 多个 -) - -func (s ServiceType) String() string { - switch s { - case Unique: - return "unique" - case Multiple: - return "multiple" - default: - return "unknown" - } -} diff --git a/service/baseService.go b/service/baseService.go index 760529f..bf3bda9 100644 --- a/service/baseService.go +++ b/service/baseService.go @@ -95,15 +95,30 @@ func (s *BaseService) RunWait(cb func() (retValue any)) (retValue any, err error } } -func (s *BaseService) Send(topic string, msg *ipb.InternalMsg) error { +func (s *BaseService) SendByTopic(topic string, msg *ipb.InternalMsg) error { if s.sender != nil { - return s.sender.Send(topic, msg) + return s.sender.SendByTopic(topic, msg) } return s.Err("send is nil") } -func (s *BaseService) Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) { + +func (s *BaseService) SendByServiceId(serviceId int, msg *ipb.InternalMsg) error { if s.sender != nil { - return s.sender.Call(rpcTopic, timeout, msg) + return s.sender.SendByServiceId(serviceId, msg) + } + return s.Err("send is nil") +} + +func (s *BaseService) CallByTopic(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) { + if s.sender != nil { + return s.sender.CallByTopic(rpcTopic, timeout, msg) + } + return nil, s.Err("call is nil") +} + +func (s *BaseService) CallByServiceId(serviceId int, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) { + if s.sender != nil { + return s.sender.CallByServiceId(serviceId, timeout, msg) } return nil, s.Err("call is nil") } diff --git a/service/iservice.go b/service/iservice.go index 6068e95..b9af172 100644 --- a/service/iservice.go +++ b/service/iservice.go @@ -17,16 +17,20 @@ type IService interface { // 向服务内部消息管道写入消息 Write(msg []byte) error - Send(topic string, msg *ipb.InternalMsg) error - Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) + SendByTopic(topic string, msg *ipb.InternalMsg) error + SendByServiceId(serviceId int, msg *ipb.InternalMsg) error + CallByTopic(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) + CallByServiceId(serviceId int, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) WaitStop() NotifyStop() } type ISender interface { - Send(topic string, msg *ipb.InternalMsg) error - Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) + SendByTopic(topic string, msg *ipb.InternalMsg) error + SendByServiceId(serviceId int, msg *ipb.InternalMsg) error + CallByTopic(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) + CallByServiceId(serviceId int, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) } type IOnFunc interface { diff --git a/service/natsService.go b/service/natsService.go index 4c9c11f..a12c92e 100644 --- a/service/natsService.go +++ b/service/natsService.go @@ -2,6 +2,7 @@ package service import ( "encoding/json" + "fmt" "github.com/fox/fox/etcd" "github.com/fox/fox/ipb" "github.com/fox/fox/log" @@ -136,15 +137,22 @@ func (s *NatsService) OnStop() { s.nats.Close() } -func (s *NatsService) Send(topic string, msg *ipb.InternalMsg) error { +func (s *NatsService) SendByTopic(topic string, msg *ipb.InternalMsg) error { data, _ := proto.Marshal(msg) return s.nats.Publish(topic, data) } +// 依赖子类实现,通过节点id获取topic名再调用SendByTopic +func (s *NatsService) SendByServiceId(serviceId int, msg *ipb.InternalMsg) error { + _ = serviceId + _ = msg + return nil +} + // call会阻塞主协程,需要起新协程去等待返回值,业务逻辑则转回主协程处理。 // 不要将处理返回值的业务逻辑放到新协程,避免业务逻辑中有数据并发问题 // 比如并发call拉用户数据,然后操作本地的user map -func (s *NatsService) Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) { +func (s *NatsService) CallByTopic(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) { data, _ := proto.Marshal(msg) response, err := s.nats.Rpc(rpcTopic, timeout, data) if err != nil { @@ -155,6 +163,14 @@ func (s *NatsService) Call(rpcTopic string, timeout time.Duration, msg *ipb.Inte return rspMsg, nil } +// 依赖子类实现,通过节点id获取topic名再调用CallByTopic +func (s *NatsService) CallByServiceId(serviceId int, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) { + _ = serviceId + _ = msg + _ = timeout + return nil, fmt.Errorf("需要子类实现") +} + func (s *NatsService) ServiceEtcd() *etcd.Registry[etcd.ServiceNode] { return s.registry } diff --git a/service/natsService_test.go b/service/natsService_test.go index 5c4c48b..cacf004 100644 --- a/service/natsService_test.go +++ b/service/natsService_test.go @@ -78,7 +78,7 @@ func TestGameService(t *testing.T) { log.Open("test.log", log.DebugL) s := newGameService() msg := ipb.MakeMsg(GameSrv, 0, 1, 2, []byte("hello world")) - if err := s.Send(Topic(s), msg); err != nil { + if err := s.SendByTopic(Topic(s), msg); err != nil { log.Error(err.Error()) } s.GetServiceNodes().Range(func(key, value interface{}) bool {