package sender import ( "context" "encoding/json" "fmt" "github.com/rabbitmq/amqp091-go" "reflect" "samba/pkg/log" "samba/pkg/service" "samba/util/util" "strconv" "time" ) const ( serviceName = "test" queueName = "test-0" ) type serviceRaw interface { service.IService PublishRaw(exchangeName, routerKey string, data amqp091.Publishing) error } type Sender struct { svr serviceRaw rspCh chan map[string]any exchange string routeKey string } func rabbitmqUrl() string { return fmt.Sprintf("amqp://%s:%s@%s:%s/%s", "samba", "samba", "testbuild.shoa.com", "5672", "vh_samba_dev") } // NewMQSender 创建mq生产者,用于测试 func NewMQSender(exchange string, routeKey string) *Sender { s := &Sender{ rspCh: make(chan map[string]any, 1), exchange: exchange, routeKey: routeKey, } svr := service.NewService(serviceName, queueName, rabbitmqUrl(), s.handleMsg, service.SetOnInit(func(svr service.IService) bool { err := svr.QueueDeclare(queueName) if err != nil { log.Error(err.Error()) return false } err = svr.Consume(queueName) if err != nil { log.Error(err.Error()) return false } return true })) s.svr = svr.(serviceRaw) return s } func (*Sender) parseMsg(msg map[string]interface{}) (msgId string, roomId int, uid int64, data interface{}) { var v int64 msgId, _ = msg["a"].(string) if iv, ok := msg["r"]; ok { tv := reflect.TypeOf(iv) switch tv.Kind() { case reflect.Float64: roomId = int(iv.(float64)) case reflect.Int: roomId = iv.(int) case reflect.Int64: roomId = int(iv.(int64)) case reflect.String: v, _ = strconv.ParseInt(msg["r"].(string), 10, 64) roomId = int(v) default: log.Error(fmt.Sprintf("map:%+v, room:%v type:%v", msg, iv, tv.Kind())) } } if iv, ok := msg["uid"]; ok { tv := reflect.TypeOf(iv) switch tv.Kind() { case reflect.Float64: uid = int64(iv.(float64)) case reflect.Int64: uid = iv.(int64) default: log.Error(fmt.Sprintf("map:%+v, uid:%v type:%v", msg, iv, tv.Kind())) } } data = msg["p"] return } func (s *Sender) handleMsg(_ service.IService, d *amqp091.Delivery) { var msg map[string]interface{} if err := json.Unmarshal(d.Body, &msg); err != nil { log.Error(fmt.Sprintf("consume message error: %v.body:%v", err, string(d.Body))) return } s.rspCh <- msg } func (s *Sender) Call(id string, req, rsp any) error { err := s.Send(id, req) if err != nil { return err } return s.Recv(rsp, time.Second*3) } func (s *Sender) Recv(val any, timeout time.Duration) error { ctx, cancel := context.WithTimeout(context.TODO(), timeout) defer cancel() select { case <-ctx.Done(): return ctx.Err() case msg := <-s.rspCh: _, _, _, data := s.parseMsg(msg) return util.MapToStruct(data, val) } } func (s *Sender) Send(id string, req any) error { msg := map[string]any{ "a": id, "p": req, } buf, err := json.Marshal(msg) if err != nil { return err } return s.svr.PublishRaw(s.exchange, s.routeKey, amqp091.Publishing{ ContentType: "text/plain", Body: buf, ReplyTo: queueName, }) } func (s *Sender) SendWithUId(id string, uid int64, req any) error { msg := map[string]any{ "a": id, "p": req, "uid": uid, } buf, err := json.Marshal(msg) if err != nil { return err } return s.svr.PublishRaw(s.exchange, s.routeKey, amqp091.Publishing{ ContentType: "text/plain", Body: buf, ReplyTo: queueName, }) }