fox/rmq/rmq.go

112 lines
2.7 KiB
Go
Raw Normal View History

2025-05-25 20:02:15 +08:00
package rmq
import (
"context"
"github.com/fox/fox/log"
"github.com/google/uuid"
"github.com/rabbitmq/amqp091-go"
"time"
)
const (
ExchangeDirect = "direct"
// ExchangeFanout = "fanout"
ExchangeTopic = "topic"
)
type Rmq struct {
conn *amqp091.Connection
ch *amqp091.Channel
consumerTag string
}
// url:amqp://guest:guest@localhost:5672/
func NewRmq(url string) (*Rmq, error) {
rmq := &Rmq{consumerTag: uuid.NewString()}
retries := 0
retryDelay := 1 * time.Second
var err error
for {
rmq.conn, err = amqp091.DialConfig(url, amqp091.Config{Heartbeat: 10 * time.Second})
if err == nil {
break
}
retries++
time.Sleep(retryDelay)
if retryDelay < 30*time.Second {
retryDelay *= 2
}
log.ErrorF("amqp connection failed after %v reconnect.err:%v url:%v", retryDelay, err, url)
}
rmq.ch, err = rmq.conn.Channel()
if err != nil {
return nil, err
}
return rmq, nil
}
func (r *Rmq) Close() {
if r.conn != nil {
_ = r.conn.Close()
}
if r.ch != nil {
_ = r.ch.Close()
}
}
func (r *Rmq) ExchangeDeclare(exchangeName, typ string) error {
return r.ch.ExchangeDeclare(exchangeName, typ, true, false, false, false, nil)
}
func (r *Rmq) QueueDeclare(queueName string) error {
_, err := r.ch.QueueDeclare(queueName, false, false, false, false, nil)
return err
}
func (r *Rmq) QueueDelete(queueName string) error {
_, err := r.ch.QueueDelete(queueName, false, false, false)
return err
}
func (r *Rmq) QueueBind(queueName, routerKey, exchangeName string) error {
return r.ch.QueueBind(queueName, routerKey, exchangeName, false, nil)
}
func (r *Rmq) QueueUnbind(queueName, routerKey, exchangeName string) error {
return r.ch.QueueUnbind(queueName, routerKey, exchangeName, nil)
}
// 发布消息到交换机,带有指定的路由键
func (r *Rmq) Publish(exchangeName, routerKey string, msg []byte) error {
return r.ch.Publish(exchangeName, routerKey, true, false, amqp091.Publishing{
ContentType: "text/plain",
Body: msg,
})
}
// 发布消息到交换机,带有指定的路由键
func (r *Rmq) PublishRpc(d *amqp091.Delivery, msg []byte) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.ch.PublishWithContext(ctx, "", d.ReplyTo, false, false, amqp091.Publishing{
ContentType: "text/plain",
CorrelationId: d.CorrelationId,
Body: msg,
})
}
func (r *Rmq) PublishRaw(exchangeName, routerKey string, data amqp091.Publishing) error {
return r.ch.Publish(exchangeName, routerKey, true, false, data)
}
func (r *Rmq) Consume(queueName string) (<-chan amqp091.Delivery, error) {
return r.ch.Consume(queueName, r.consumerTag, true, false, false, false, nil)
}
func (r *Rmq) ConsumeDelete() error {
return r.ch.Cancel(r.consumerTag, true)
}