2025-06-04 09:51:39 +08:00

270 lines
6.4 KiB
Go

package handler
import (
"encoding/json"
"fmt"
"github.com/rabbitmq/amqp091-go"
"reflect"
"samba/pkg/log"
"samba/pkg/rmq"
"samba/pkg/servername"
"samba/pkg/service"
"samba/pkg/task"
"samba/util/config"
"samba/util/routingKey"
"samba/util/util"
"strconv"
"strings"
)
var OtherService service.IService
var Env = map[string]any{}
func SetEnv(k string, v any) {
Env[strings.ToLower(k)] = v
//log.Info(fmt.Sprintf("SetEnv key=%s,value=%v", k, v))
}
func GetEnv[T any](k string) (v T, ok bool) {
val, ok := Env[strings.ToLower(k)]
if !ok {
return v, ok
}
v, ok = val.(T)
return
}
func ParseMsg(msg map[string]interface{}) (msgId string, roomId int, uid int64, data interface{}) {
var v int64
msgId, _ = msg["a"].(string)
if iv, ok := msg["r"]; ok {
tv := reflect.TypeOf(iv)
switch tv.Kind() {
case reflect.Float64:
roomId = int(iv.(float64))
case reflect.Int:
roomId = iv.(int)
case reflect.Int64:
roomId = int(iv.(int64))
case reflect.String:
v, _ = strconv.ParseInt(msg["r"].(string), 10, 64)
roomId = int(v)
default:
log.Error(fmt.Sprintf("map:%+v, room:%v type:%v", msg, iv, tv.Kind()))
}
}
if iv, ok := msg["uid"]; ok {
tv := reflect.TypeOf(iv)
switch tv.Kind() {
case reflect.Float64:
uid = int64(iv.(float64))
case reflect.Int64:
uid = iv.(int64)
default:
log.Error(fmt.Sprintf("map:%+v, uid:%v type:%v", msg, iv, tv.Kind()))
}
}
data = msg["p"]
return
}
func handlerMessage(_ service.IService, d *amqp091.Delivery) {
var msg map[string]interface{}
if err := json.Unmarshal(d.Body, &msg); err != nil {
log.Error(fmt.Sprintf("consume message error: %v.body:%v", err, string(d.Body)))
return
}
msgId, _, uid, _ := ParseMsg(msg)
if fn, ok := MsgHandler[msgId]; ok {
fn(d, msg)
} else {
log.Error(fmt.Sprintf("user :%v msgId:%v not exist", uid, msgId))
}
}
func InitService() {
opts := []service.Option{
service.SetOnInit(func(s service.IService) bool {
if err := s.ExchangeDeclare(util.Direct(servername.Other), rmq.ExchangeDirect); err != nil {
log.Error(err.Error())
return false
}
if err := s.ExchangeDeclare(util.Topic(servername.Other), rmq.ExchangeTopic); err != nil {
log.Error(err.Error())
return false
}
if err := s.QueueDeclare(QueueName()); err != nil {
log.Error(err.Error())
}
if !RegisterMsgHandler(s) {
return false
}
if err := s.Consume(QueueName()); err != nil {
log.Error(err.Error())
return false
}
m := task.NewManager(s)
RegisterTask(m)
m.Submit()
log.Info(fmt.Sprintf("service:%v init", s.Name()))
return true
}),
service.SetOnNotifyStop(func(s service.IService) {
}),
service.SetCanStop(func(s service.IService) bool {
return true
}),
service.SetOnStop(func(s service.IService) {
if err := s.ConsumeDelete(); err != nil {
log.Error(err.Error())
} else {
log.Info(fmt.Sprintf("delete consume channle"))
}
if err := s.QueueDelete(QueueName()); err != nil {
log.Error(err.Error())
} else {
log.Info(fmt.Sprintf("delete queue:%v", QueueName()))
}
log.Info(fmt.Sprintf("service:%v stop", s.Name()))
}),
}
// init here
OtherService = service.NewService(servername.Other, QueueName(), config.RabbitmqUrl(), handlerMessage, opts...)
}
func StopService() {
OtherService.NotifyStop()
OtherService.WaitStop()
}
func QueueName() string {
return fmt.Sprintf("%v-%v", servername.Other, config.Cmd.ChildId)
}
func SendMsg(exchange, router, token string, uid int64, msgId string, msg interface{}) {
data := service.Message{
MsgId: msgId,
Data: msg,
Token: token,
Uid: uid,
}
byteData, err := json.Marshal(data)
if err != nil {
log.Error(err.Error())
return
}
log.Debug(fmt.Sprintf("send msg:%v to exchange:%v, routKey:%v, data:%v", msgId, exchange, router, string(byteData)))
if err = OtherService.Publish(exchange, router, byteData); err != nil {
log.Error(err.Error())
}
return
}
func SendMsgToGate(uid int64, msgId string, msg interface{}) {
data := service.RspMessage{
Fun: "user_transfer",
Data: service.RspData{
User: fmt.Sprintf("%v", uid),
Data: service.MsgData{
MsgId: msgId,
Data: msg,
},
},
}
byteData, err := json.Marshal(data)
if err != nil {
log.Error(fmt.Sprintf("uid:%v send msg error:%v", uid, err))
return
}
// `{"a":"user_transfer","p":{"u":"1008","d":{"t":0,"c":3,"p":[[413,113,301],[]],"d":1}}}`
//log.Debug(fmt.Sprintf("send msg:%v to gate:%v", msgId, string(byteData)))
err = OtherService.Publish(util.Direct(servername.User), routingKey.GateKey(uid), byteData)
if err != nil {
log.Error(fmt.Sprintf("uid:%v send msg error:%v", uid, err))
return
}
}
func SendMsgToDb(routingKey string, uid int64, msgId string, msg interface{}) {
data := service.Message{
MsgId: msgId,
Data: msg,
Uid: uid,
}
byteData, err := json.Marshal(data)
if err != nil {
log.Error(fmt.Sprintf("uid:%v send msg to db error:%v", uid, err))
return
}
log.Debug(fmt.Sprintf("send msg:%v to exchange:%v, routKey:%v, data:%v", msgId, util.Direct(servername.Money), routingKey, string(byteData)))
if err = OtherService.Publish(util.Direct(servername.Money), routingKey, byteData); err != nil {
log.Error(fmt.Sprintf("uid:%v send msg to db error:%v", uid, err))
}
}
func SendMsgToRPC(d *amqp091.Delivery, data any, methodName ...string) {
rsp := map[string]any{
"r": "0",
"p": data,
}
buf, err := json.Marshal(rsp)
if err == nil {
err = OtherService.PublishRpc(d, buf)
}
if err != nil {
log.Error(fmt.Sprintf("response RPC fail method=%s,err:%v", util.TieOrFn(len(methodName) <= 0, "unknown", func() string {
return methodName[0]
}), err))
}
}
var rid uint
func CallRpc[REQ, RSP any](id, exchange, routeKey string, req *REQ, future func(*RSP, error)) {
rid++
ridStr := strconv.Itoa(int(rid))
msg := map[string]any{
"a": id,
"p": req,
"rid": ridStr,
}
buf, err := json.Marshal(msg)
if err != nil {
future(nil, err)
return
}
svr, ok := OtherService.(interface {
PublishRaw(exchangeName, routerKey string, data amqp091.Publishing) error
})
if !ok {
future(nil, fmt.Errorf("not support PublishRaw"))
return
}
err = svr.PublishRaw(exchange, routeKey, amqp091.Publishing{
ContentType: "text/plain",
Body: buf,
ReplyTo: QueueName(),
})
if err != nil {
future(nil, err)
return
}
MsgHandler[ridStr] = func(d *amqp091.Delivery, msg map[string]interface{}) {
delete(MsgHandler, ridStr)
_, _, _, data := ParseMsg(msg)
rsp := new(RSP)
future(rsp, util.MapToStruct(data, rsp))
}
}