package server import ( "fmt" "game/common/proto/pb" "game/common/serviceName" "game/common/topicName" "game/common/userBindService" "game/server/gate/config" "game/server/gate/model" "github.com/fox/fox/ipb" "github.com/fox/fox/log" "github.com/fox/fox/processor" "github.com/fox/fox/service" "github.com/fox/fox/ws" "github.com/golang/protobuf/proto" "github.com/nats-io/nats.go" ) var Gates []*GateService type GateService struct { *service.NatsService wss *ws.WsServer processor *processor.Processor bindService *userBindService.UserBindService } func Init() { for i := 0; i < config.Command.ServiceNum; i++ { sid := config.Command.ServiceId + i if gate := newGateService(sid); gate != nil { Gates = append(Gates, gate) } } } func Stop() { for _, gate := range Gates { gate.NotifyStop() } for _, gate := range Gates { gate.WaitStop() } } func newGateService(serviceId int) *GateService { var err error s := new(GateService) sName := fmt.Sprintf("%v-%d", serviceName.Gate, serviceId) if s.NatsService, err = service.NewNatsService(&service.InitNatsServiceParams{ EtcdAddress: config.Cfg.Etcd.Address, EtcdUsername: "", EtcdPassword: "", NatsAddress: config.Cfg.Nats.Address, ServiceType: serviceName.Gate, ServiceName: sName, OnFunc: s, TypeId: int(pb.ServiceTypeId_STI_Gate), Version: config.Cfg.BuildDate, }); err != nil { log.Fatal(err.Error()) return nil } addressPos := serviceId - config.Command.ServiceId if len(config.GateCfg.Address) <= addressPos { log.FatalF("Special address number must be greater than %d", addressPos) return nil } wsAddress := config.GateCfg.Address[addressPos] s.wss = ws.NewWsServer(wsAddress, s.WsOnMessage, s.WsOnDisconnect) s.bindService = userBindService.NewUserBindService(model.UserBindServiceRedis, s.ServiceEtcd()) s.processor = processor.NewProcessor() s.initProcessor() s.OnInit() return s } func (s *GateService) OnInit() { err := s.NatsService.SubscribeCb(topicName.WorldMessage, func(m *nats.Msg) { var iMsg = &ipb.InternalMsg{} if err := proto.Unmarshal(m.Data, iMsg); err != nil { return } s.RunOnce(func() { s.wss.Rang(func(conn ws.IConn) bool { s.connMessage(conn, iMsg) return true }) }) }) if err != nil { log.Error(err.Error()) } // if err := s.NatsService.QueueSubscribe(service.GroupTopic(s), service.GroupQueue(s)); err != nil { // log.Error(err.Error()) // } s.NatsService.Run() s.wss.Run() log.Debug("onInit") } func (s *GateService) CanStop() bool { return true } func (s *GateService) OnStop() { s.NatsService.OnStop() log.Debug("OnStop") } func (s *GateService) findConn(msg *ipb.InternalMsg) ws.IConn { switch msg.MsgId { case int32(pb.MsgId_RspUserLoginId): conn, _ := s.wss.FindConnByConnId(msg.ConnId) return conn default: conn, _ := s.wss.FindConnByUserId(msg.UserId) return conn } } // 处理消息 func (s *GateService) connMessage(conn ws.IConn, iMsg *ipb.InternalMsg) { if req, err := s.processor.Unmarshal(iMsg.MsgId, iMsg.Msg); err == nil { err = s.processor.Dispatch(iMsg.MsgId, iMsg, conn, req) } else { s.SendClientData(conn, iMsg.ServiceName, iMsg.MsgId, iMsg.Msg) } //log.Debug(s.Log("received service message:%v", iMsg.MsgId)) } // 处理其它服发送过来的消息。大部分是将消息转发给玩家 func (s *GateService) OnMessage(data []byte) error { var iMsg = &ipb.InternalMsg{} var err error if err = proto.Unmarshal(data, iMsg); err != nil { log.Error(err.Error()) return err } // conn不能为空,全服广播消息走WorldMessage var conn = s.findConn(iMsg) if conn == nil { log.WarnF(s.Log("client not exist.msg:%v", iMsg.MsgId)) return nil } s.connMessage(conn, iMsg) return nil } // 运行在conn的read协程里。将消息转发给内部服务 func (s *GateService) WsOnMessage(conn ws.IConn, data []byte) { msg := &pb.ClientMsg{} if err := proto.Unmarshal(data, msg); err != nil { log.Error(err.Error()) return } if msg.MsgId < 0 { return } var topic string if msg.ServiceName != "" { topic = service.TopicEx(msg.ServiceName) } else { if node, err := s.bindService.FindServiceNode(msg.ServiceTid, conn.UserId()); err == nil { topic, msg.ServiceName = service.TopicEx(node.Name), node.Name } else { log.ErrorF("uid:%v 查找节点:%v 失败:%v", conn.UserId(), msg.ServiceTid, err.Error()) } } if topic != "" { if msg.MsgId == int32(pb.MsgId_ReqUserLoginId) { req := &pb.ReqUserLogin{} _ = proto.Unmarshal(msg.Data, req) req.Ip = conn.Addr() msg.Data, _ = proto.Marshal(req) } s.SendServiceData(topic, conn, msg.MsgId, msg.Data) } else { log.Error(s.Log("topic:%v not exist.user:%v", topic, conn.UserId())) } log.Debug(s.Log("received conn:%d user:%v message:%v", conn.Id(), conn.UserId(), pb.MsgId(msg.MsgId))) } // 向内部服务发送消息 func (s *GateService) SendServiceData(topic string, conn ws.IConn, msgId int32, data []byte) { iMsg := ipb.MakeMsg(s.Name(), conn.Id(), conn.UserId(), msgId, data) _ = s.Send(topic, iMsg) } // 向内部服务发送消息 func (s *GateService) SendServiceMsg(topic string, conn ws.IConn, msgId int32, msg proto.Message) { log.DebugF("user:%v send to service:%v msg id:%v, msg:%v", conn.UserId(), topic, pb.MsgId(msgId), msg.String()) data, _ := proto.Marshal(msg) s.SendServiceData(topic, conn, msgId, data) } // 向玩家发送消息 func (s *GateService) SendClientData(conn ws.IConn, serviceName string, msgId int32, data []byte) { cMsg := &pb.ClientMsg{ServiceName: serviceName, MsgId: msgId, Data: data} dMsg, _ := proto.Marshal(cMsg) _ = conn.SendMsg(dMsg) } // 向玩家发送消息 func (s *GateService) SendClientMsg(conn ws.IConn, serviceName string, msgId int32, msg proto.Message) { log.DebugF("send to user:%v msg id:%v, msg:%v", conn.UserId(), pb.MsgId(msgId), msg.String()) data, _ := proto.Marshal(msg) s.SendClientData(conn, serviceName, msgId, data) } func (s *GateService) WsOnDisconnect(conn ws.IConn) { if conn.UserId() > 0 { sName := s.bindService.LoadFromRedis(conn.UserId(), pb.ServiceTypeId_STI_Gate) // 相同网关,则为正常下线,删除redis信息然后向内网广播下线 // 不同网关,异地登陆顶号 由其它网关通知本网关向玩家发送顶号消息(processor中处理),并主动关闭连接,不广播下线消息 if sName == s.Name() { s.bindService.DelUserService(conn.UserId(), pb.ServiceTypeId_STI_Gate) s.SendServiceMsg(topicName.UserOffline, conn, int32(pb.MsgId_NtfUserOfflineId), &pb.NtfUserOffline{UserId: conn.UserId()}) } log.Debug(s.Log("user:%v disconnect", conn.UserId())) } }