package handler import ( "fmt" "github.com/golang-module/carbon/v2" "github.com/rabbitmq/amqp091-go" "samba/pkg/log" "samba/pkg/xtime" "samba/proto" "samba/stub" "samba/util/model" "samba/util/routingKey" "samba/util/util" "strconv" "time" ) func onUserOnline(_ *amqp091.Delivery, msg map[string]interface{}) { _, _, _, data := ParseMsg(msg) req, err := util.MapToStructT[proto.UserOnline](data) if err != nil { log.Error(err.Error()) return } uid, _ := strconv.ParseInt(req.Uid, 10, 64) log.Debug(fmt.Sprintf("玩家上线:%v", uid)) model.AddOnlineUser(uid) SendMsgToGate(uid, proto.RspLoginSuccessId, &proto.RspLoginSuccess{}) model.RecordUserOnline(uid) // 玩法服如需处理玩家上线消息,自己单独监听该消息就OK //gs := GameServerMgr.AllServer() //for _, s := range gs { // SendMsg(util.Direct(s.Type), s.RouterKey, "", uid, proto.NtfUserOnlineId, req) //} } func onUserOffline(_ *amqp091.Delivery, msg map[string]interface{}) { _, _, _, data := ParseMsg(msg) req, err := util.MapToStructT[proto.UserOnline](data) if err != nil { log.Error(err.Error()) return } uid, _ := strconv.ParseInt(req.Uid, 10, 64) log.Debug(fmt.Sprintf("玩家下线:%v", uid)) model.DelOnlineUser(uid) model.RecordUserOffline(uid, MatchService) } func onUpdateService(_ *amqp091.Delivery, msg map[string]interface{}) { _, _, _, data := ParseMsg(msg) ntf, err := util.MapToStructT[proto.NtfNewService](data) if err != nil { log.Error(err.Error()) return } GameServerMgr.Update(ntf) for _, gs := range GameServerMgr.gameServers { if gs.Type == ntf.Type && gs.IsClub == ntf.IsClub { SendMsg(util.Direct(gs.Type), gs.RouterKey, "", 0, proto.NtfNewServiceId, ntf) } } log.Debug(fmt.Sprintf("更新服务:%+v", ntf)) } func onUserReconnect(_ *amqp091.Delivery, msg map[string]interface{}) { _, _, uid, data := ParseMsg(msg) req, err := util.MapToStructT[proto.ReqReconnect](data) if err != nil { log.Error(err.Error()) return } gs := GameServerMgr.FindByRoomId(req.RoomId) if gs != nil { SendMsg(util.Direct(gs.Type), gs.RouterKey, "", uid, proto.ReqReconnectId, req) } } // 重连主动进入房间 func onEnterRoom(_ *amqp091.Delivery, msg map[string]interface{}) { _, _, uid, data := ParseMsg(msg) req, err := util.MapToStructT[proto.ReqEnterRoom](data) if err != nil { log.Error(err.Error()) return } if gs := GameServerMgr.FindByRoomId(req.RoomId); gs != nil { SendMsg(util.Direct(gs.Type), gs.RouterKey, "", uid, proto.ReqEnterRoomId, req) } } func onLeaveRoom(_ *amqp091.Delivery, msg map[string]interface{}) { _, _, uid, data := ParseMsg(msg) req, err := util.MapToStructT[proto.ReqLeaveRoom](data) if err != nil { log.Error(err.Error()) return } if gs := GameServerMgr.FindByRoomId(req.RoomId); gs != nil { SendMsg(util.Direct(gs.Type), gs.RouterKey, "", uid, proto.ReqLeaveRoomId, req) } } func onDisbandRoom(_ *amqp091.Delivery, msg map[string]interface{}) { _, _, uid, data := ParseMsg(msg) req, err := util.MapToStructT[proto.ReqDisbandRoom](data) if err != nil { log.Error(err.Error()) return } if gs := GameServerMgr.FindByRoomId(req.RoomId); gs != nil { SendMsg(util.Direct(gs.Type), gs.RouterKey, "", uid, proto.ReqDisbandRoomId, req) } } func onUpdateConfig(_ *amqp091.Delivery, msg map[string]interface{}) { _, _, uid, _ := ParseMsg(msg) model.InitStub() log.Debug("更新配置") gs := GameServerMgr.AllServer() for _, s := range gs { SendMsg(util.Direct(s.Type), s.RouterKey, "", uid, proto.NtfUpdateConfigId, proto.NtfUpdateConfig{}) } } func onBankruptSubsidy(_ *amqp091.Delivery, msg map[string]interface{}) { _, _, uid, _ := ParseMsg(msg) rsp := &proto.RspBankruptSubsidy{} // 不管成功与否都需要返回结果给客户端 defer SendMsgToGate(uid, proto.RspBankruptSubsidyId, rsp) uInfo, err := model.NewUserInfoOp().Load(uid) if err != nil { rsp.Code = proto.Internal log.Error(fmt.Sprintf("Load用户信息错误: %v", err)) return } brConf := stub.GetBankruptConf(model.GetVipLevelByUserId(uid)) if !xtime.IsTodayTimestamp(uInfo.BankruptTime) { // 时间戳非当天 uInfo.Bankrupt = 0 // 重置为0 } if uInfo.Bankrupt >= brConf.FreeCount { // 领取次数达到限额 rsp.Code = proto.SubsidyNotEnough return } SendMsgToDb(routingKey.DbKey(uInfo.UID), uInfo.UID, proto.ReqAddResourceId, &proto.ReqAddResource{ UserId: uInfo.UID, ResValue: int64(brConf.Quota), ResType: model.ResCoins, Reason: model.ReasonSubsidy, IsNotify: true, }) err = model.NewUserInfoOp().UpdateBankruptCount(uInfo.UID, uInfo.Bankrupt+1) if err != nil { rsp.Code = proto.Internal log.Error(fmt.Sprintf("Update破产信息错误: %v", err)) return } log.Debug(fmt.Sprintf("领取破产补助 uid: %d ", uInfo.UID)) } func onPlayingNum(_ *amqp091.Delivery, msg map[string]interface{}) { _, _, uid, _ := ParseMsg(msg) rsp := &proto.RspPlayingNum{} defer SendMsgToGate(uid, proto.RspPlayingNumId, rsp) counts, err := model.GetAllRoomPlayingNum() if err != nil { log.Error(err.Error()) rsp.Code = proto.Internal return } for k, v := range counts { counts[k] = int(model.CalculateFakeCount(int64(v))) } rsp.Counts = counts } func onUserOnlineDurationTicker(lastTime carbon.Carbon, lastCount int) { now := xtime.Now() count := len(model.UserOnlineRepo) defer func() { MatchService.NewTimer(10*time.Second, func() { onUserOnlineDurationTicker(now, count) }, false) }() if count != lastCount { // 人数有变动,写入redis err := model.NewUserOnlineOp().SetCurrent(int64(count)) if err != nil { log.Error(err.Error()) } } if lastTime.Day() == now.Day() { return } for uid, onlineTime := range model.UserOnlineRepo { dur := model.UserOnlineDuration{ UID: uid, OnlineTime: onlineTime.StdTime(), Duration: onlineTime.DiffInSeconds(now), RecordTime: lastTime.StdTime(), // 记录时间为昨天 } if err := dur.Flush(MatchService); err != nil { log.Error(fmt.Sprintf("failed to flush user uid=%d online duration %s", dur.UID, err)) } } }