154 lines
4.5 KiB
Go
154 lines
4.5 KiB
Go
package handler
|
||
|
||
import (
|
||
"encoding/json"
|
||
"fmt"
|
||
"github.com/rabbitmq/amqp091-go"
|
||
"os"
|
||
"samba/pkg/ksync"
|
||
"samba/pkg/log"
|
||
"samba/pkg/rmq"
|
||
"samba/pkg/servername"
|
||
"samba/pkg/service"
|
||
"samba/proto"
|
||
. "samba/server/cacheta/service"
|
||
"samba/util/playtype"
|
||
|
||
. "samba/server/game/baseroom"
|
||
"samba/util/config"
|
||
"samba/util/routingKey"
|
||
"samba/util/util"
|
||
"time"
|
||
)
|
||
|
||
var MsgHandler = map[string]MessageHandler{
|
||
proto.ReqCachetaReconnectId: onReconnect,
|
||
proto.NtfUserOfflineId: onUserOffline,
|
||
proto.ReqMatchRoomId: onMatchRoom,
|
||
proto.ReqLeaveRoomId: onLeaveRoom,
|
||
proto.ReqEnterRoomId: onEnterRoom,
|
||
proto.ReqSetPokersId: onSetPokers,
|
||
proto.NtfUpdateConfigId: onUpdateConfig,
|
||
proto.NtfNewServiceId: onUpdateService,
|
||
proto.ReqDisbandRoomId: onDisbandRoom,
|
||
}
|
||
|
||
type MessageHandler func(d *amqp091.Delivery, msg map[string]interface{})
|
||
|
||
func RegisterMsgHandler(s service.IService) bool {
|
||
//if err := s.QueueBind(QueueName(), routingKey.UserOnline, util.Topic(servername.User)); err != nil {
|
||
// log.Error(err.Error())
|
||
// return false
|
||
//}
|
||
if err := s.QueueBind(QueueName(), routingKey.UserOffline, util.Topic(servername.User)); err != nil {
|
||
log.Error(err.Error())
|
||
return false
|
||
}
|
||
|
||
if err := s.QueueBind(QueueName(), QueueName(), util.Direct(servername.Cacheta)); err != nil {
|
||
log.Error(err.Error())
|
||
return false
|
||
}
|
||
return true
|
||
}
|
||
|
||
func handlerMessage(_ service.IService, d *amqp091.Delivery) {
|
||
var msg map[string]interface{}
|
||
if err := json.Unmarshal(d.Body, &msg); err != nil {
|
||
log.Error(fmt.Sprintf("consume message error: %v.body:%v", err, string(d.Body)))
|
||
return
|
||
}
|
||
msgId, roomId, uid, _ := ParseMsg(msg)
|
||
if fn, ok := MsgHandler[msgId]; ok && fn != nil {
|
||
fn(d, msg)
|
||
} else {
|
||
rm := RoomMgr.Find(roomId)
|
||
if rm != nil {
|
||
log.Debug(fmt.Sprintf("player:%v recv msgId:%v msg:%v", uid, msgId, string(d.Body)))
|
||
rm.OnMessage(msgId, msg)
|
||
} else {
|
||
log.Error(fmt.Sprintf("room id:%v not exist.msgId:%v", roomId, msgId))
|
||
}
|
||
}
|
||
}
|
||
|
||
func InitService() {
|
||
RoomMgr.Init(&CathetaService)
|
||
opts := []service.Option{
|
||
service.SetOnInit(func(s service.IService) bool {
|
||
if err := s.ExchangeDeclare(util.Direct(servername.Cacheta), rmq.ExchangeDirect); err != nil {
|
||
log.Error(err.Error())
|
||
return false
|
||
}
|
||
if err := s.ExchangeDeclare(util.Topic(servername.Cacheta), rmq.ExchangeTopic); err != nil {
|
||
log.Error(err.Error())
|
||
return false
|
||
}
|
||
if err := s.QueueDeclare(QueueName()); err != nil {
|
||
log.Error(err.Error())
|
||
}
|
||
if !RegisterMsgHandler(s) {
|
||
return false
|
||
}
|
||
if err := s.Consume(QueueName()); err != nil {
|
||
log.Error(err.Error())
|
||
return false
|
||
}
|
||
log.Info(fmt.Sprintf("service:%v init.is club:%v", s.Name(), config.Cmd.IsClub > 0))
|
||
return true
|
||
}),
|
||
service.SetOnNotifyStop(func(s service.IService) {
|
||
RoomMgr.NotifyStop()
|
||
}),
|
||
service.SetCanStop(func(s service.IService) bool {
|
||
//log.Info(fmt.Sprintf("room count is zero:%v", RoomMgr.IsStopped()))
|
||
return RoomMgr.IsStopped()
|
||
}),
|
||
service.SetOnStop(func(s service.IService) {
|
||
if err := s.ConsumeDelete(); err != nil {
|
||
log.Error(err.Error())
|
||
} else {
|
||
log.Info(fmt.Sprintf("delete consume channle"))
|
||
}
|
||
if err := s.QueueDelete(QueueName()); err != nil {
|
||
log.Error(err.Error())
|
||
} else {
|
||
log.Info(fmt.Sprintf("delete queue:%v", QueueName()))
|
||
}
|
||
log.Info(fmt.Sprintf("service:%v stop", s.Name()))
|
||
}),
|
||
}
|
||
CathetaService.IService = service.NewService(servername.Cacheta, QueueName(), config.RabbitmqUrl(), handlerMessage, opts...)
|
||
|
||
cfg := &config.Cmd
|
||
newServiceNtf := &proto.NtfNewService{Type: CathetaService.Type(), RouterKey: QueueName(),
|
||
RoomMaxId: cfg.MaxRoomId, RoomMinId: cfg.MinRoomId, IsClub: cfg.IsClub > 0}
|
||
newServiceNtf.PlayType = append(newServiceNtf.PlayType, int(playtype.PtCacheta))
|
||
SendMsg(util.Direct(servername.Hall), routingKey.Hall, 0, "", 0, proto.NtfNewServiceId, newServiceNtf)
|
||
logCurrentRoomCount(CathetaService)
|
||
|
||
//stub.Rooms[10061].MinPlayers = 2
|
||
}
|
||
|
||
func StopService() {
|
||
CathetaService.NotifyStop()
|
||
CathetaService.WaitStop()
|
||
}
|
||
|
||
func logCurrentRoomCount(s service.IService) {
|
||
log.Info(fmt.Sprintf("current room num:%v", RoomMgr.Count()))
|
||
if CathetaService.AutoWaitStop {
|
||
RoomMgr.DelEmptyRoom()
|
||
}
|
||
if CathetaService.AutoWaitStop && RoomMgr.Count() == 0 {
|
||
log.Info("旧服务,且房间数为0,自动关闭")
|
||
ksync.GoSafe(func() {
|
||
StopService()
|
||
log.Info("auto exit, initiating shutdown...")
|
||
os.Exit(0)
|
||
}, nil)
|
||
|
||
}
|
||
s.NewTimer(15*time.Minute, func() { logCurrentRoomCount(s) }, false)
|
||
}
|