package handler import ( "encoding/json" "fmt" "github.com/rabbitmq/amqp091-go" "reflect" "samba/pkg/log" "samba/pkg/rmq" "samba/pkg/servername" "samba/pkg/service" "samba/pkg/task" "samba/util/config" "samba/util/routingKey" "samba/util/util" "strconv" "strings" ) var OtherService service.IService var Env = map[string]any{} func SetEnv(k string, v any) { Env[strings.ToLower(k)] = v //log.Info(fmt.Sprintf("SetEnv key=%s,value=%v", k, v)) } func GetEnv[T any](k string) (v T, ok bool) { val, ok := Env[strings.ToLower(k)] if !ok { return v, ok } v, ok = val.(T) return } func ParseMsg(msg map[string]interface{}) (msgId string, roomId int, uid int64, data interface{}) { var v int64 msgId, _ = msg["a"].(string) if iv, ok := msg["r"]; ok { tv := reflect.TypeOf(iv) switch tv.Kind() { case reflect.Float64: roomId = int(iv.(float64)) case reflect.Int: roomId = iv.(int) case reflect.Int64: roomId = int(iv.(int64)) case reflect.String: v, _ = strconv.ParseInt(msg["r"].(string), 10, 64) roomId = int(v) default: log.Error(fmt.Sprintf("map:%+v, room:%v type:%v", msg, iv, tv.Kind())) } } if iv, ok := msg["uid"]; ok { tv := reflect.TypeOf(iv) switch tv.Kind() { case reflect.Float64: uid = int64(iv.(float64)) case reflect.Int64: uid = iv.(int64) default: log.Error(fmt.Sprintf("map:%+v, uid:%v type:%v", msg, iv, tv.Kind())) } } data = msg["p"] return } func handlerMessage(_ service.IService, d *amqp091.Delivery) { var msg map[string]interface{} if err := json.Unmarshal(d.Body, &msg); err != nil { log.Error(fmt.Sprintf("consume message error: %v.body:%v", err, string(d.Body))) return } msgId, _, uid, _ := ParseMsg(msg) if fn, ok := MsgHandler[msgId]; ok { fn(d, msg) } else { log.Error(fmt.Sprintf("user :%v msgId:%v not exist", uid, msgId)) } } func InitService() { opts := []service.Option{ service.SetOnInit(func(s service.IService) bool { if err := s.ExchangeDeclare(util.Direct(servername.Other), rmq.ExchangeDirect); err != nil { log.Error(err.Error()) return false } if err := s.ExchangeDeclare(util.Topic(servername.Other), rmq.ExchangeTopic); err != nil { log.Error(err.Error()) return false } if err := s.QueueDeclare(QueueName()); err != nil { log.Error(err.Error()) } if !RegisterMsgHandler(s) { return false } if err := s.Consume(QueueName()); err != nil { log.Error(err.Error()) return false } m := task.NewManager(s) RegisterTask(m) m.Submit() log.Info(fmt.Sprintf("service:%v init", s.Name())) return true }), service.SetOnNotifyStop(func(s service.IService) { }), service.SetCanStop(func(s service.IService) bool { return true }), service.SetOnStop(func(s service.IService) { if err := s.ConsumeDelete(); err != nil { log.Error(err.Error()) } else { log.Info(fmt.Sprintf("delete consume channle")) } if err := s.QueueDelete(QueueName()); err != nil { log.Error(err.Error()) } else { log.Info(fmt.Sprintf("delete queue:%v", QueueName())) } log.Info(fmt.Sprintf("service:%v stop", s.Name())) }), } // init here OtherService = service.NewService(servername.Other, QueueName(), config.RabbitmqUrl(), handlerMessage, opts...) } func StopService() { OtherService.NotifyStop() OtherService.WaitStop() } func QueueName() string { return fmt.Sprintf("%v-%v", servername.Other, config.Cmd.ChildId) } func SendMsg(exchange, router, token string, uid int64, msgId string, msg interface{}) { data := service.Message{ MsgId: msgId, Data: msg, Token: token, Uid: uid, } byteData, err := json.Marshal(data) if err != nil { log.Error(err.Error()) return } log.Debug(fmt.Sprintf("send msg:%v to exchange:%v, routKey:%v, data:%v", msgId, exchange, router, string(byteData))) if err = OtherService.Publish(exchange, router, byteData); err != nil { log.Error(err.Error()) } return } func SendMsgToGate(uid int64, msgId string, msg interface{}) { data := service.RspMessage{ Fun: "user_transfer", Data: service.RspData{ User: fmt.Sprintf("%v", uid), Data: service.MsgData{ MsgId: msgId, Data: msg, }, }, } byteData, err := json.Marshal(data) if err != nil { log.Error(fmt.Sprintf("uid:%v send msg error:%v", uid, err)) return } // `{"a":"user_transfer","p":{"u":"1008","d":{"t":0,"c":3,"p":[[413,113,301],[]],"d":1}}}` //log.Debug(fmt.Sprintf("send msg:%v to gate:%v", msgId, string(byteData))) err = OtherService.Publish(util.Direct(servername.User), routingKey.GateKey(uid), byteData) if err != nil { log.Error(fmt.Sprintf("uid:%v send msg error:%v", uid, err)) return } } func SendMsgToDb(routingKey string, uid int64, msgId string, msg interface{}) { data := service.Message{ MsgId: msgId, Data: msg, Uid: uid, } byteData, err := json.Marshal(data) if err != nil { log.Error(fmt.Sprintf("uid:%v send msg to db error:%v", uid, err)) return } log.Debug(fmt.Sprintf("send msg:%v to exchange:%v, routKey:%v, data:%v", msgId, util.Direct(servername.Money), routingKey, string(byteData))) if err = OtherService.Publish(util.Direct(servername.Money), routingKey, byteData); err != nil { log.Error(fmt.Sprintf("uid:%v send msg to db error:%v", uid, err)) } } func SendMsgToRPC(d *amqp091.Delivery, data any, methodName ...string) { rsp := map[string]any{ "r": "0", "p": data, } buf, err := json.Marshal(rsp) if err == nil { err = OtherService.PublishRpc(d, buf) } if err != nil { log.Error(fmt.Sprintf("response RPC fail method=%s,err:%v", util.TieOrFn(len(methodName) <= 0, "unknown", func() string { return methodName[0] }), err)) } } var rid uint func CallRpc[REQ, RSP any](id, exchange, routeKey string, req *REQ, future func(*RSP, error)) { rid++ ridStr := strconv.Itoa(int(rid)) msg := map[string]any{ "a": id, "p": req, "rid": ridStr, } buf, err := json.Marshal(msg) if err != nil { future(nil, err) return } svr, ok := OtherService.(interface { PublishRaw(exchangeName, routerKey string, data amqp091.Publishing) error }) if !ok { future(nil, fmt.Errorf("not support PublishRaw")) return } err = svr.PublishRaw(exchange, routeKey, amqp091.Publishing{ ContentType: "text/plain", Body: buf, ReplyTo: QueueName(), }) if err != nil { future(nil, err) return } MsgHandler[ridStr] = func(d *amqp091.Delivery, msg map[string]interface{}) { delete(MsgHandler, ridStr) _, _, _, data := ParseMsg(msg) rsp := new(RSP) future(rsp, util.MapToStruct(data, rsp)) } }