game/server/gate/server/service.go
2025-05-29 20:30:31 +08:00

235 lines
6.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package server
import (
"fmt"
"game/common/proto/pb"
"game/common/serviceName"
"game/common/topicName"
"game/common/userBindService"
"game/common/utils"
"game/server/gate/config"
"game/server/gate/model"
"github.com/fox/fox/ipb"
"github.com/fox/fox/log"
"github.com/fox/fox/processor"
"github.com/fox/fox/service"
"github.com/fox/fox/ws"
"github.com/golang/protobuf/proto"
"github.com/nats-io/nats.go"
)
var Gates []*GateService
type GateService struct {
*service.NatsService
wss *ws.WsServer
processor *processor.Processor
bindService *userBindService.UserBindService
}
func Init() {
for i := 0; i < config.Command.ServiceNum; i++ {
sid := config.Command.ServiceId + i
if gate := newGateService(sid); gate != nil {
Gates = append(Gates, gate)
}
}
}
func Stop() {
for _, gate := range Gates {
gate.NotifyStop()
}
for _, gate := range Gates {
gate.WaitStop()
}
}
func newGateService(serviceId int) *GateService {
var err error
s := new(GateService)
sName := fmt.Sprintf("%v-%d", serviceName.Gate, serviceId)
if s.NatsService, err = service.NewNatsService(&service.InitNatsServiceParams{
EtcdAddress: config.Cfg.Etcd.Address,
EtcdUsername: "",
EtcdPassword: "",
NatsAddress: config.Cfg.Nats.Address,
ServiceType: serviceName.Gate,
ServiceName: sName,
OnFunc: s,
TypeId: int(pb.ServiceTypeId_STI_Gate),
Version: config.Cfg.BuildDate,
}); err != nil {
log.Fatal(err.Error())
return nil
}
addressPos := serviceId - config.Command.ServiceId
if len(config.Cfg.Special.Address) <= addressPos {
log.FatalF("Special address number must be greater than %d", addressPos)
return nil
}
wsAddress := config.Cfg.Special.Address[addressPos]
s.wss = ws.NewWsServer(wsAddress, s.WsOnMessage, s.WsOnDisconnect)
s.bindService = userBindService.NewUserBindService(model.UserRedis, s.ServiceEtcd())
s.processor = processor.NewProcessor()
s.initProcessor()
s.OnInit()
return s
}
func (s *GateService) OnInit() {
err := s.NatsService.SubscribeCb(topicName.WorldMessage, func(m *nats.Msg) {
var iMsg = &ipb.InternalMsg{}
if err := proto.Unmarshal(m.Data, iMsg); err != nil {
return
}
s.RunOnce(func() {
s.wss.Rang(func(conn ws.IConn) bool {
s.connMessage(conn, iMsg)
return true
})
})
})
if err != nil {
log.Error(err.Error())
}
// if err := s.NatsService.QueueSubscribe(service.GroupTopic(s), service.GroupQueue(s)); err != nil {
// log.Error(err.Error())
// }
s.NatsService.Run()
s.wss.Run()
log.Debug("onInit")
}
func (s *GateService) CanStop() bool {
return true
}
func (s *GateService) OnStop() {
s.NatsService.OnStop()
log.Debug("OnStop")
}
func (s *GateService) findConn(msg *ipb.InternalMsg) ws.IConn {
switch msg.MsgId {
case int32(pb.MsgId_S2CUserLoginId):
conn, _ := s.wss.FindConnByConnId(msg.ConnId)
return conn
default:
conn, _ := s.wss.FindConnByUserId(msg.UserId)
return conn
}
}
// 处理消息
func (s *GateService) connMessage(conn ws.IConn, iMsg *ipb.InternalMsg) {
if req, err := s.processor.Unmarshal(iMsg.MsgId, iMsg.Msg); err == nil {
err = s.processor.Dispatch(iMsg.MsgId, iMsg, conn, req)
} else {
s.SendClientData(conn, iMsg.ServiceName, iMsg.MsgId, iMsg.Msg)
}
//log.Debug(s.Log("on message:%v", string(msg)))
}
// 处理其它服发送过来的消息。大部分是将消息转发给玩家
func (s *GateService) OnMessage(data []byte) error {
var iMsg = &ipb.InternalMsg{}
var err error
if err = proto.Unmarshal(data, iMsg); err != nil {
log.Error(err.Error())
return err
}
// conn不能为空全服广播消息走WorldMessage
var conn = s.findConn(iMsg)
if conn == nil {
log.WarnF(s.Log("client not exist.msg:%v", iMsg.MsgId))
return nil
}
s.connMessage(conn, iMsg)
return nil
}
/*
查找topic,根据serviceTypeId以及玩家id查找玩家过往访问该服务的节点优先使用原节点
*/
func (s *GateService) findTopic(userId int64, serviceTypeId pb.ServiceTypeId) (topic, sName string) {
if userId != 0 {
var err error
if sName, err = s.bindService.FindServiceName(userId, serviceTypeId); err == nil {
return service.TopicEx(sName), sName
} else {
log.Error(err.Error())
}
}
return "", sName
}
// 运行在conn的read协程里。将消息转发给内部服务
func (s *GateService) WsOnMessage(conn ws.IConn, data []byte) {
msg := &pb.ClientMsg{}
if err := proto.Unmarshal(data, msg); err != nil {
log.Error(err.Error())
return
}
var topic string
if msg.ServiceName != "" {
topic = service.TopicEx(msg.ServiceName)
} else {
topic, msg.ServiceName = s.findTopic(conn.UserId(), msg.ServiceTid)
}
if topic != "" {
if msg.MsgId == int32(pb.MsgId_C2SUserLoginId) {
req := &pb.C2SUserLogin{}
_ = proto.Unmarshal(msg.Data, req)
req.Ip = conn.Addr()
msg.Data, _ = proto.Marshal(req)
}
s.SendServiceData(topic, conn, msg.MsgId, msg.Data)
} else {
log.Error(s.Log("topic:%v not exist.user:%v", topic, conn.UserId()))
}
log.Debug(s.Log("client to gate:%v", utils.Marshal(msg)))
}
// 向内部服务发送消息
func (s *GateService) SendServiceData(topic string, conn ws.IConn, msgId int32, data []byte) {
iMsg := ipb.MakeMsg(s.Name(), conn.Id(), conn.UserId(), msgId, data)
_ = s.Send(topic, iMsg)
}
// 向内部服务发送消息
func (s *GateService) SendServiceMsg(topic string, conn ws.IConn, msgId int32, msg proto.Message) {
data, _ := proto.Marshal(msg)
s.SendServiceData(topic, conn, msgId, data)
}
// 向玩家发送消息
func (s *GateService) SendClientData(conn ws.IConn, serviceName string, msgId int32, data []byte) {
cMsg := &pb.ClientMsg{ServiceName: serviceName, MsgId: msgId, Data: data}
dMsg, _ := proto.Marshal(cMsg)
_ = conn.SendMsg(dMsg)
}
// 向玩家发送消息
func (s *GateService) SendClientMsg(conn ws.IConn, serviceName string, msgId int32, msg proto.Message) {
data, _ := proto.Marshal(msg)
s.SendClientData(conn, serviceName, msgId, data)
}
func (s *GateService) WsOnDisconnect(conn ws.IConn) {
if conn.UserId() > 0 {
sName := s.bindService.LoadFromRedis(conn.UserId(), pb.ServiceTypeId_STI_Gate)
// 相同网关则为正常下线删除redis信息然后向内网广播下线
// 不同网关,异地登陆顶号 由其它网关通知本网关向玩家发送顶号消息(processor中处理),并主动关闭连接,不广播下线消息
if sName == s.Name() {
s.bindService.DelUserService(conn.UserId(), pb.ServiceTypeId_STI_Gate)
s.SendServiceMsg(topicName.UserOffline, conn, int32(pb.MsgId_NtfUserOfflineId), &pb.NtfUserOffline{UserId: conn.UserId()})
}
log.Debug(s.Log("user:%v disconnect", conn.UserId()))
}
}