优化节点call及send能力

This commit is contained in:
liuxiaobo 2025-06-16 22:30:58 +08:00
parent 5dd6f84197
commit 5b96d43038
6 changed files with 74 additions and 34 deletions

View File

@ -14,12 +14,36 @@ type INode interface {
MapKey() string MapKey() string
} }
type ServiceNodeStateType int
const (
SNST_Stateless = 0 // 无状态服务,如大厅,聊天服,匹配服,登陆服等
SNST_Stateful = 1 // 有状态服务,主要是各类玩法服,需要记录玩家在哪个玩法服里面,以便消息不会发到同玩法的其它节点服上
SNST_Ordered = 2 // 有序服务主要是db服这类需要根据玩家id hash到固定的节点上保证数据一致性
)
func (s ServiceNodeStateType) String() string {
switch s {
case SNST_Stateless:
return "stateless"
case SNST_Stateful:
return "stateful"
case SNST_Ordered:
return "ordered"
default:
return "unknown"
}
}
// 服务节点信息TypeId及Type都是标记同类型的节点Name是区别该节点与其它节点的字段 // 服务节点信息TypeId及Type都是标记同类型的节点Name是区别该节点与其它节点的字段
type ServiceNode struct { type ServiceNode struct {
TypeId int `json:"type_id"` // 服务类型id与Type字段功能一样 由proto定义gate通过该字段找到这类服务的所有节点信息 TypeId int `json:"type_id"` // 服务类型id与Type字段功能一样 由proto定义gate通过该字段找到这类服务的所有节点信息
Name string `json:"name"` // 服务名 多个同类服务依赖name区分:lobby1,lobby2,lobby3等等 Name string `json:"name"` // 服务名 多个同类服务依赖name区分:lobby1,lobby2,lobby3等等
Type string `json:"type"` // 服务类型:lobby, game, gate等等 Type string `json:"type"` // 服务类型:lobby, game, gate等等
Version string `json:"version"` // 版本号 Version string `json:"version"` // 版本号
MinMsgId int `json:"min_msg_id"` // 能处理的最小消息id
MaxMsgId int `json:"max_msg_id"` // 能处理的最大消息id
ServiceType ServiceNodeStateType `json:"service_type"` // 无状态节点或有状态节点或有序节点
} }
func (s ServiceNode) EtcdKey() string { func (s ServiceNode) EtcdKey() string {

View File

@ -1,19 +0,0 @@
package etcd
type ServiceType int
const (
Unique ServiceType = 1 // 唯一
Multiple ServiceType = 2 // 多个
)
func (s ServiceType) String() string {
switch s {
case Unique:
return "unique"
case Multiple:
return "multiple"
default:
return "unknown"
}
}

View File

@ -95,15 +95,30 @@ func (s *BaseService) RunWait(cb func() (retValue any)) (retValue any, err error
} }
} }
func (s *BaseService) Send(topic string, msg *ipb.InternalMsg) error { func (s *BaseService) SendByTopic(topic string, msg *ipb.InternalMsg) error {
if s.sender != nil { if s.sender != nil {
return s.sender.Send(topic, msg) return s.sender.SendByTopic(topic, msg)
} }
return s.Err("send is nil") return s.Err("send is nil")
} }
func (s *BaseService) Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) {
func (s *BaseService) SendByServiceId(serviceId int, msg *ipb.InternalMsg) error {
if s.sender != nil { if s.sender != nil {
return s.sender.Call(rpcTopic, timeout, msg) return s.sender.SendByServiceId(serviceId, msg)
}
return s.Err("send is nil")
}
func (s *BaseService) CallByTopic(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) {
if s.sender != nil {
return s.sender.CallByTopic(rpcTopic, timeout, msg)
}
return nil, s.Err("call is nil")
}
func (s *BaseService) CallByServiceId(serviceId int, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) {
if s.sender != nil {
return s.sender.CallByServiceId(serviceId, timeout, msg)
} }
return nil, s.Err("call is nil") return nil, s.Err("call is nil")
} }

View File

@ -17,16 +17,20 @@ type IService interface {
// 向服务内部消息管道写入消息 // 向服务内部消息管道写入消息
Write(msg []byte) error Write(msg []byte) error
Send(topic string, msg *ipb.InternalMsg) error SendByTopic(topic string, msg *ipb.InternalMsg) error
Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) SendByServiceId(serviceId int, msg *ipb.InternalMsg) error
CallByTopic(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error)
CallByServiceId(serviceId int, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error)
WaitStop() WaitStop()
NotifyStop() NotifyStop()
} }
type ISender interface { type ISender interface {
Send(topic string, msg *ipb.InternalMsg) error SendByTopic(topic string, msg *ipb.InternalMsg) error
Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) SendByServiceId(serviceId int, msg *ipb.InternalMsg) error
CallByTopic(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error)
CallByServiceId(serviceId int, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error)
} }
type IOnFunc interface { type IOnFunc interface {

View File

@ -2,6 +2,7 @@ package service
import ( import (
"encoding/json" "encoding/json"
"fmt"
"github.com/fox/fox/etcd" "github.com/fox/fox/etcd"
"github.com/fox/fox/ipb" "github.com/fox/fox/ipb"
"github.com/fox/fox/log" "github.com/fox/fox/log"
@ -136,15 +137,22 @@ func (s *NatsService) OnStop() {
s.nats.Close() s.nats.Close()
} }
func (s *NatsService) Send(topic string, msg *ipb.InternalMsg) error { func (s *NatsService) SendByTopic(topic string, msg *ipb.InternalMsg) error {
data, _ := proto.Marshal(msg) data, _ := proto.Marshal(msg)
return s.nats.Publish(topic, data) return s.nats.Publish(topic, data)
} }
// 依赖子类实现通过节点id获取topic名再调用SendByTopic
func (s *NatsService) SendByServiceId(serviceId int, msg *ipb.InternalMsg) error {
_ = serviceId
_ = msg
return nil
}
// call会阻塞主协程需要起新协程去等待返回值业务逻辑则转回主协程处理。 // call会阻塞主协程需要起新协程去等待返回值业务逻辑则转回主协程处理。
// 不要将处理返回值的业务逻辑放到新协程,避免业务逻辑中有数据并发问题 // 不要将处理返回值的业务逻辑放到新协程,避免业务逻辑中有数据并发问题
// 比如并发call拉用户数据然后操作本地的user map // 比如并发call拉用户数据然后操作本地的user map
func (s *NatsService) Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) { func (s *NatsService) CallByTopic(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) {
data, _ := proto.Marshal(msg) data, _ := proto.Marshal(msg)
response, err := s.nats.Rpc(rpcTopic, timeout, data) response, err := s.nats.Rpc(rpcTopic, timeout, data)
if err != nil { if err != nil {
@ -155,6 +163,14 @@ func (s *NatsService) Call(rpcTopic string, timeout time.Duration, msg *ipb.Inte
return rspMsg, nil return rspMsg, nil
} }
// 依赖子类实现通过节点id获取topic名再调用CallByTopic
func (s *NatsService) CallByServiceId(serviceId int, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) {
_ = serviceId
_ = msg
_ = timeout
return nil, fmt.Errorf("需要子类实现")
}
func (s *NatsService) ServiceEtcd() *etcd.Registry[etcd.ServiceNode] { func (s *NatsService) ServiceEtcd() *etcd.Registry[etcd.ServiceNode] {
return s.registry return s.registry
} }

View File

@ -78,7 +78,7 @@ func TestGameService(t *testing.T) {
log.Open("test.log", log.DebugL) log.Open("test.log", log.DebugL)
s := newGameService() s := newGameService()
msg := ipb.MakeMsg(GameSrv, 0, 1, 2, []byte("hello world")) msg := ipb.MakeMsg(GameSrv, 0, 1, 2, []byte("hello world"))
if err := s.Send(Topic(s), msg); err != nil { if err := s.SendByTopic(Topic(s), msg); err != nil {
log.Error(err.Error()) log.Error(err.Error())
} }
s.GetServiceNodes().Range(func(key, value interface{}) bool { s.GetServiceNodes().Range(func(key, value interface{}) bool {