240 lines
7.3 KiB
Go
240 lines
7.3 KiB
Go
package server
|
||
|
||
import (
|
||
"fmt"
|
||
"game/common/gameService"
|
||
"game/common/proto/pb"
|
||
"game/common/serviceName"
|
||
"game/common/topicName"
|
||
"game/server/gate/config"
|
||
"github.com/fox/fox/ipb"
|
||
"github.com/fox/fox/log"
|
||
"github.com/fox/fox/service"
|
||
"github.com/fox/fox/ws"
|
||
"github.com/golang/protobuf/proto"
|
||
"github.com/nats-io/nats.go"
|
||
"reflect"
|
||
)
|
||
|
||
var Gates []*GateService
|
||
|
||
type GateService struct {
|
||
*gameService.GameService
|
||
wss *ws.WsServer
|
||
}
|
||
|
||
func Init() {
|
||
for i := 0; i < config.Command.ServiceNum; i++ {
|
||
sid := config.Command.ServiceId + i
|
||
if gate := newGateService(sid); gate != nil {
|
||
Gates = append(Gates, gate)
|
||
}
|
||
}
|
||
}
|
||
|
||
func Stop() {
|
||
for _, gate := range Gates {
|
||
gate.NotifyStop()
|
||
}
|
||
for _, gate := range Gates {
|
||
gate.WaitStop()
|
||
}
|
||
}
|
||
|
||
func newGateService(serviceId int) *GateService {
|
||
s := new(GateService)
|
||
|
||
sName := fmt.Sprintf("%v-%d", serviceName.Gate, serviceId)
|
||
s.GameService = gameService.NewGameService(&service.InitNatsServiceParams{
|
||
EtcdAddress: config.Cfg.Etcd.Address,
|
||
EtcdUsername: "",
|
||
EtcdPassword: "",
|
||
NatsAddress: config.Cfg.Nats.Address,
|
||
ServiceType: serviceName.Gate,
|
||
ServiceName: sName,
|
||
OnFunc: s,
|
||
TypeId: int(pb.ServiceTypeId_STI_Gate),
|
||
Version: config.Cfg.BuildDate,
|
||
}, &config.Cfg.Redis)
|
||
_ = s.GameService.Subscribe(topicName.UserOnline)
|
||
|
||
addressPos := serviceId - config.Command.ServiceId
|
||
if len(config.GateCfg.Address) <= addressPos {
|
||
log.FatalF("Special address number must be greater than %d", addressPos)
|
||
return nil
|
||
}
|
||
wsAddress := config.GateCfg.Address[addressPos]
|
||
s.wss = ws.NewWsServer(wsAddress, s.WsOnMessage, s.WsOnDisconnect)
|
||
s.initProcessor()
|
||
s.OnInit()
|
||
return s
|
||
}
|
||
|
||
func (s *GateService) OnInit() {
|
||
err := s.NatsService.SubscribeCb(topicName.WorldMessage, func(m *nats.Msg) {
|
||
var iMsg = &ipb.InternalMsg{}
|
||
if err := proto.Unmarshal(m.Data, iMsg); err != nil {
|
||
return
|
||
}
|
||
s.RunOnce(func() {
|
||
s.wss.Rang(func(conn ws.IConn) bool {
|
||
s.connMessage(conn, iMsg)
|
||
return true
|
||
})
|
||
})
|
||
})
|
||
if err != nil {
|
||
log.Error(err.Error())
|
||
}
|
||
// if err := s.NatsService.QueueSubscribe(service.GroupTopic(s), service.GroupQueue(s)); err != nil {
|
||
// log.Error(err.Error())
|
||
// }
|
||
|
||
s.NatsService.Run()
|
||
s.wss.Run()
|
||
log.Debug("onInit")
|
||
}
|
||
|
||
func (s *GateService) CanStop() bool {
|
||
return true
|
||
}
|
||
|
||
func (s *GateService) OnStop() {
|
||
s.wss.Rang(func(conn ws.IConn) bool {
|
||
if conn.UserId() > 0 {
|
||
sName := s.BindService().LoadFromRedis(conn.UserId(), pb.ServiceTypeId_STI_Gate)
|
||
// 相同网关,则为正常下线,删除redis信息然后向内网广播下线
|
||
// 不同网关,异地登陆顶号 由其它网关通知本网关向玩家发送顶号消息(processor中处理),并主动关闭连接,不广播下线消息
|
||
if sName == s.Name() {
|
||
log.Debug(s.Log("玩家:%v 解绑网关:%v", conn.UserId(), s.Name()))
|
||
s.BindService().DelUserService(conn.UserId(), pb.ServiceTypeId_STI_Gate)
|
||
s.SendServiceMsg(topicName.UserOffline, conn, int32(pb.MsgId_NtfUserOfflineId), &pb.NtfUserOffline{UserId: conn.UserId()})
|
||
}
|
||
}
|
||
return true
|
||
|
||
})
|
||
s.NatsService.OnStop()
|
||
log.Debug("OnStop")
|
||
}
|
||
|
||
func (s *GateService) findConn(msg *ipb.InternalMsg) ws.IConn {
|
||
switch msg.MsgId {
|
||
case int32(pb.MsgId_RspUserLoginId):
|
||
conn, _ := s.wss.FindConnByConnId(msg.ConnId)
|
||
return conn
|
||
default:
|
||
conn, _ := s.wss.FindConnByUserId(msg.UserId)
|
||
return conn
|
||
}
|
||
}
|
||
|
||
// 处理消息
|
||
func (s *GateService) connMessage(conn ws.IConn, iMsg *ipb.InternalMsg) {
|
||
if req, err := s.Processor().Unmarshal(iMsg.MsgId, iMsg.Msg); err == nil {
|
||
err = s.Processor().Dispatch(iMsg.MsgId, iMsg, conn, req)
|
||
} else {
|
||
s.SendClientData(conn, iMsg.ServiceName, iMsg.MsgId, iMsg.Msg)
|
||
}
|
||
//log.Debug(s.Log("received service message:%v", pb.MsgId(iMsg.MsgId)))
|
||
}
|
||
|
||
// 处理其它服发送过来的消息。大部分是将消息转发给玩家
|
||
func (s *GateService) OnMessage(data []byte) error {
|
||
var iMsg = &ipb.InternalMsg{}
|
||
var err error
|
||
if err = proto.Unmarshal(data, iMsg); err != nil {
|
||
log.Error(err.Error())
|
||
return err
|
||
}
|
||
// conn不能为空,全服广播消息走WorldMessage
|
||
var conn = s.findConn(iMsg)
|
||
if conn == nil || reflect.ValueOf(conn).IsNil() {
|
||
//log.Error(s.Log("玩家:%v 连接:%v 不存在.msg:%v", iMsg.UserId, iMsg.ConnId, iMsg.MsgId))
|
||
return nil
|
||
}
|
||
s.connMessage(conn, iMsg)
|
||
return nil
|
||
}
|
||
|
||
// 运行在conn的read协程里。将消息转发给内部服务
|
||
func (s *GateService) WsOnMessage(conn ws.IConn, data []byte) {
|
||
msg := &pb.ClientMsg{}
|
||
if err := proto.Unmarshal(data, msg); err != nil {
|
||
log.Error(err.Error())
|
||
return
|
||
}
|
||
if msg.MsgId < 0 {
|
||
return
|
||
}
|
||
var topic string
|
||
if msg.ServiceName != "" {
|
||
topic = service.TopicEx(msg.ServiceName)
|
||
} else {
|
||
if node, err := s.BindService().FindServiceNode(msg.ServiceTid, conn.UserId()); err == nil {
|
||
topic, msg.ServiceName = service.TopicEx(node.Name), node.Name
|
||
} else {
|
||
log.ErrorF("uid:%v 查找节点:%v 失败:%v", conn.UserId(), msg.ServiceTid, err.Error())
|
||
}
|
||
}
|
||
if topic != "" {
|
||
if msg.MsgId == int32(pb.MsgId_ReqUserLoginId) {
|
||
// 加入登陆ip
|
||
req := &pb.ReqUserLogin{}
|
||
_ = proto.Unmarshal(msg.Data, req)
|
||
req.Ip = conn.Addr()
|
||
msg.Data, _ = proto.Marshal(req)
|
||
} else {
|
||
if conn.UserId() == 0 {
|
||
log.Error(s.Log("玩家uid不能为0,连接:%v", conn.Id()))
|
||
return
|
||
}
|
||
}
|
||
s.SendServiceData(topic, conn, msg.MsgId, msg.Data)
|
||
} else {
|
||
log.Error(s.Log("topic:%v not exist.user:%v", topic, conn.UserId()))
|
||
}
|
||
log.Debug(s.Log("received conn:%d user:%v message:%v", conn.Id(), conn.UserId(), pb.MsgId(msg.MsgId)))
|
||
}
|
||
|
||
// 向内部服务发送消息
|
||
func (s *GateService) SendServiceData(topic string, conn ws.IConn, msgId int32, data []byte) {
|
||
iMsg := ipb.MakeMsg(s.Name(), conn.Id(), conn.UserId(), msgId, data)
|
||
_ = s.SendByTopic(topic, iMsg)
|
||
}
|
||
|
||
// 向内部服务发送消息
|
||
func (s *GateService) SendServiceMsg(topic string, conn ws.IConn, msgId int32, msg proto.Message) {
|
||
log.Debug(s.Log("user:%v send to service:%v msg id:%v, msg:%v", conn.UserId(), topic, pb.MsgId(msgId), msg.String()))
|
||
data, _ := proto.Marshal(msg)
|
||
s.SendServiceData(topic, conn, msgId, data)
|
||
}
|
||
|
||
// 向玩家发送消息
|
||
func (s *GateService) SendClientData(conn ws.IConn, serviceName string, msgId int32, data []byte) {
|
||
cMsg := &pb.ClientMsg{ServiceName: serviceName, MsgId: msgId, Data: data}
|
||
dMsg, _ := proto.Marshal(cMsg)
|
||
_ = conn.SendMsg(dMsg)
|
||
}
|
||
|
||
// 向玩家发送消息
|
||
func (s *GateService) SendClientMsg(conn ws.IConn, serviceName string, msgId int32, msg proto.Message) {
|
||
log.Debug(s.Log("send to user:%v msg id:%v, msg:%v", conn.UserId(), pb.MsgId(msgId), msg.String()))
|
||
data, _ := proto.Marshal(msg)
|
||
s.SendClientData(conn, serviceName, msgId, data)
|
||
}
|
||
|
||
func (s *GateService) WsOnDisconnect(conn ws.IConn) {
|
||
if conn.UserId() > 0 {
|
||
sName := s.BindService().LoadFromRedis(conn.UserId(), pb.ServiceTypeId_STI_Gate)
|
||
// 相同网关,则为正常下线,删除redis信息然后向内网广播下线
|
||
// 不同网关,异地登陆顶号 由其它网关通知本网关向玩家发送顶号消息(processor中处理),并主动关闭连接,不广播下线消息
|
||
if sName == s.Name() {
|
||
log.Debug(s.Log("玩家:%v 解绑网关:%v", conn.UserId(), s.Name()))
|
||
s.BindService().DelUserService(conn.UserId(), pb.ServiceTypeId_STI_Gate)
|
||
s.SendServiceMsg(topicName.UserOffline, conn, int32(pb.MsgId_NtfUserOfflineId), &pb.NtfUserOffline{UserId: conn.UserId()})
|
||
}
|
||
log.Debug(s.Log("user:%v disconnect", conn.UserId()))
|
||
}
|
||
}
|