112 lines
2.7 KiB
Go
112 lines
2.7 KiB
Go
package rmq
|
|
|
|
import (
|
|
"context"
|
|
"github.com/fox/fox/log"
|
|
"github.com/google/uuid"
|
|
"github.com/rabbitmq/amqp091-go"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
ExchangeDirect = "direct"
|
|
// ExchangeFanout = "fanout"
|
|
ExchangeTopic = "topic"
|
|
)
|
|
|
|
type Rmq struct {
|
|
conn *amqp091.Connection
|
|
ch *amqp091.Channel
|
|
consumerTag string
|
|
}
|
|
|
|
// url:amqp://guest:guest@localhost:5672/
|
|
func NewRmq(url string) (*Rmq, error) {
|
|
rmq := &Rmq{consumerTag: uuid.NewString()}
|
|
|
|
retries := 0
|
|
retryDelay := 1 * time.Second
|
|
|
|
var err error
|
|
for {
|
|
rmq.conn, err = amqp091.DialConfig(url, amqp091.Config{Heartbeat: 10 * time.Second})
|
|
if err == nil {
|
|
break
|
|
}
|
|
retries++
|
|
time.Sleep(retryDelay)
|
|
if retryDelay < 30*time.Second {
|
|
retryDelay *= 2
|
|
}
|
|
log.ErrorF("amqp connection failed after %v reconnect.err:%v url:%v", retryDelay, err, url)
|
|
}
|
|
|
|
rmq.ch, err = rmq.conn.Channel()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return rmq, nil
|
|
}
|
|
|
|
func (r *Rmq) Close() {
|
|
if r.conn != nil {
|
|
_ = r.conn.Close()
|
|
}
|
|
if r.ch != nil {
|
|
_ = r.ch.Close()
|
|
}
|
|
}
|
|
|
|
func (r *Rmq) ExchangeDeclare(exchangeName, typ string) error {
|
|
return r.ch.ExchangeDeclare(exchangeName, typ, true, false, false, false, nil)
|
|
}
|
|
|
|
func (r *Rmq) QueueDeclare(queueName string) error {
|
|
_, err := r.ch.QueueDeclare(queueName, false, false, false, false, nil)
|
|
return err
|
|
}
|
|
|
|
func (r *Rmq) QueueDelete(queueName string) error {
|
|
_, err := r.ch.QueueDelete(queueName, false, false, false)
|
|
return err
|
|
}
|
|
|
|
func (r *Rmq) QueueBind(queueName, routerKey, exchangeName string) error {
|
|
return r.ch.QueueBind(queueName, routerKey, exchangeName, false, nil)
|
|
}
|
|
|
|
func (r *Rmq) QueueUnbind(queueName, routerKey, exchangeName string) error {
|
|
return r.ch.QueueUnbind(queueName, routerKey, exchangeName, nil)
|
|
}
|
|
|
|
// 发布消息到交换机,带有指定的路由键
|
|
func (r *Rmq) Publish(exchangeName, routerKey string, msg []byte) error {
|
|
return r.ch.Publish(exchangeName, routerKey, true, false, amqp091.Publishing{
|
|
ContentType: "text/plain",
|
|
Body: msg,
|
|
})
|
|
}
|
|
|
|
// 发布消息到交换机,带有指定的路由键
|
|
func (r *Rmq) PublishRpc(d *amqp091.Delivery, msg []byte) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
return r.ch.PublishWithContext(ctx, "", d.ReplyTo, false, false, amqp091.Publishing{
|
|
ContentType: "text/plain",
|
|
CorrelationId: d.CorrelationId,
|
|
Body: msg,
|
|
})
|
|
}
|
|
|
|
func (r *Rmq) PublishRaw(exchangeName, routerKey string, data amqp091.Publishing) error {
|
|
return r.ch.Publish(exchangeName, routerKey, true, false, data)
|
|
}
|
|
|
|
func (r *Rmq) Consume(queueName string) (<-chan amqp091.Delivery, error) {
|
|
return r.ch.Consume(queueName, r.consumerTag, true, false, false, false, nil)
|
|
}
|
|
|
|
func (r *Rmq) ConsumeDelete() error {
|
|
return r.ch.Cancel(r.consumerTag, true)
|
|
}
|