package rmq import ( "context" "github.com/fox/fox/log" "github.com/google/uuid" "github.com/rabbitmq/amqp091-go" "time" ) const ( ExchangeDirect = "direct" // ExchangeFanout = "fanout" ExchangeTopic = "topic" ) type Rmq struct { conn *amqp091.Connection ch *amqp091.Channel consumerTag string } // url:amqp://guest:guest@localhost:5672/ func NewRmq(url string) (*Rmq, error) { rmq := &Rmq{consumerTag: uuid.NewString()} retries := 0 retryDelay := 1 * time.Second var err error for { rmq.conn, err = amqp091.DialConfig(url, amqp091.Config{Heartbeat: 10 * time.Second}) if err == nil { break } retries++ time.Sleep(retryDelay) if retryDelay < 30*time.Second { retryDelay *= 2 } log.ErrorF("amqp connection failed after %v reconnect.err:%v url:%v", retryDelay, err, url) } rmq.ch, err = rmq.conn.Channel() if err != nil { return nil, err } return rmq, nil } func (r *Rmq) Close() { if r.conn != nil { _ = r.conn.Close() } if r.ch != nil { _ = r.ch.Close() } } func (r *Rmq) ExchangeDeclare(exchangeName, typ string) error { return r.ch.ExchangeDeclare(exchangeName, typ, true, false, false, false, nil) } func (r *Rmq) QueueDeclare(queueName string) error { _, err := r.ch.QueueDeclare(queueName, false, false, false, false, nil) return err } func (r *Rmq) QueueDelete(queueName string) error { _, err := r.ch.QueueDelete(queueName, false, false, false) return err } func (r *Rmq) QueueBind(queueName, routerKey, exchangeName string) error { return r.ch.QueueBind(queueName, routerKey, exchangeName, false, nil) } func (r *Rmq) QueueUnbind(queueName, routerKey, exchangeName string) error { return r.ch.QueueUnbind(queueName, routerKey, exchangeName, nil) } // 发布消息到交换机,带有指定的路由键 func (r *Rmq) Publish(exchangeName, routerKey string, msg []byte) error { return r.ch.Publish(exchangeName, routerKey, true, false, amqp091.Publishing{ ContentType: "text/plain", Body: msg, }) } // 发布消息到交换机,带有指定的路由键 func (r *Rmq) PublishRpc(d *amqp091.Delivery, msg []byte) error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() return r.ch.PublishWithContext(ctx, "", d.ReplyTo, false, false, amqp091.Publishing{ ContentType: "text/plain", CorrelationId: d.CorrelationId, Body: msg, }) } func (r *Rmq) PublishRaw(exchangeName, routerKey string, data amqp091.Publishing) error { return r.ch.Publish(exchangeName, routerKey, true, false, data) } func (r *Rmq) Consume(queueName string) (<-chan amqp091.Delivery, error) { return r.ch.Consume(queueName, r.consumerTag, true, false, false, false, nil) } func (r *Rmq) ConsumeDelete() error { return r.ch.Cancel(r.consumerTag, true) }