2025-08-27 23:02:26 +08:00
|
|
|
|
package rmq
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
2025-09-03 22:21:30 +08:00
|
|
|
|
"errors"
|
2025-08-27 23:02:26 +08:00
|
|
|
|
"github.com/fox/fox/log"
|
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type Connection struct {
|
|
|
|
|
conn *amqp.Connection
|
|
|
|
|
channel *amqp.Channel
|
|
|
|
|
config *RabbitMQConfig
|
|
|
|
|
isConnected bool
|
|
|
|
|
done chan bool
|
|
|
|
|
notifyClose chan *amqp.Error
|
|
|
|
|
notifyConfirm chan amqp.Confirmation
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewConnection(config *RabbitMQConfig) (*Connection, error) {
|
|
|
|
|
conn := &Connection{
|
|
|
|
|
config: config,
|
|
|
|
|
done: make(chan bool),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := conn.Connect(); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return conn, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Connection) Connect() error {
|
|
|
|
|
var err error
|
|
|
|
|
|
|
|
|
|
// 建立连接
|
|
|
|
|
c.conn, err = amqp.Dial(c.config.URL)
|
|
|
|
|
if err != nil {
|
2025-09-03 22:21:30 +08:00
|
|
|
|
return errors.Join(err, errors.New("failed to connect to RabbitMQ"))
|
2025-08-27 23:02:26 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 创建通道
|
|
|
|
|
c.channel, err = c.conn.Channel()
|
|
|
|
|
if err != nil {
|
2025-09-03 22:21:30 +08:00
|
|
|
|
return errors.Join(err, errors.New("failed to open channel"))
|
2025-08-27 23:02:26 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 设置QoS
|
|
|
|
|
err = c.channel.Qos(c.config.PrefetchCount, 0, false)
|
|
|
|
|
if err != nil {
|
2025-09-03 22:21:30 +08:00
|
|
|
|
return errors.Join(err, errors.New("failed to set QoS"))
|
2025-08-27 23:02:26 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 声明交换器
|
|
|
|
|
err = c.channel.ExchangeDeclare(
|
|
|
|
|
c.config.ExchangeName,
|
|
|
|
|
"direct",
|
|
|
|
|
true,
|
|
|
|
|
false,
|
|
|
|
|
false,
|
|
|
|
|
false,
|
|
|
|
|
nil,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
2025-09-03 22:21:30 +08:00
|
|
|
|
return errors.Join(err, errors.New("failed to declare exchange"))
|
2025-08-27 23:02:26 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 声明队列
|
|
|
|
|
_, err = c.channel.QueueDeclare(
|
2025-09-03 22:21:30 +08:00
|
|
|
|
c.config.QueueName, // 队列名称
|
|
|
|
|
c.config.Durable, // 持久化 true:队列元数据(名称、属性)和消息会写入磁盘,RabbitMQ 重启后仍存在。false:队列存在于内存,重启后丢失。
|
|
|
|
|
false, // 不自动删除 true:当最后一个消费者断开连接后,队列自动删除。
|
|
|
|
|
false, // 非独占队列
|
|
|
|
|
false, // 等待服务器确认
|
2025-08-27 23:02:26 +08:00
|
|
|
|
nil,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
2025-09-03 22:21:30 +08:00
|
|
|
|
return errors.Join(err, errors.New("failed to declare queue"))
|
2025-08-27 23:02:26 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 绑定队列
|
|
|
|
|
err = c.channel.QueueBind(
|
|
|
|
|
c.config.QueueName,
|
|
|
|
|
c.config.RoutingKey,
|
|
|
|
|
c.config.ExchangeName,
|
|
|
|
|
false,
|
|
|
|
|
nil,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
2025-09-03 22:21:30 +08:00
|
|
|
|
return errors.Join(err, errors.New("failed to bind queue"))
|
2025-08-27 23:02:26 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 设置确认通道
|
|
|
|
|
c.notifyClose = make(chan *amqp.Error)
|
2025-09-03 22:40:27 +08:00
|
|
|
|
if err = c.channel.Confirm(false); err != nil {
|
|
|
|
|
return errors.Join(err, errors.New("failed to set confirm"))
|
|
|
|
|
}
|
2025-08-27 23:02:26 +08:00
|
|
|
|
c.notifyConfirm = make(chan amqp.Confirmation)
|
|
|
|
|
c.channel.NotifyClose(c.notifyClose)
|
|
|
|
|
c.channel.NotifyPublish(c.notifyConfirm)
|
|
|
|
|
|
|
|
|
|
c.isConnected = true
|
|
|
|
|
log.Info("RabbitMQ connection established successfully")
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Connection) Reconnect(ctx context.Context) {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
case <-c.done:
|
|
|
|
|
return
|
|
|
|
|
case err := <-c.notifyClose:
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ErrorF("RabbitMQ connection closed: %v", err)
|
|
|
|
|
c.isConnected = false
|
|
|
|
|
|
|
|
|
|
// 重连逻辑
|
|
|
|
|
for {
|
|
|
|
|
if err := c.Connect(); err != nil {
|
2025-09-03 22:21:30 +08:00
|
|
|
|
log.ErrorF("Failed to reconnect: %v. Retrying in %vs", err, c.config.ReconnectInterval)
|
|
|
|
|
time.Sleep(time.Duration(c.config.ReconnectInterval) * time.Second)
|
2025-08-27 23:02:26 +08:00
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Connection) Close() error {
|
|
|
|
|
close(c.done)
|
|
|
|
|
|
|
|
|
|
if c.channel != nil {
|
|
|
|
|
if err := c.channel.Close(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if c.conn != nil {
|
|
|
|
|
if err := c.conn.Close(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c.isConnected = false
|
|
|
|
|
log.Info("RabbitMQ connection closed")
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Connection) IsConnected() bool {
|
|
|
|
|
return c.isConnected
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Connection) GetChannel() *amqp.Channel {
|
|
|
|
|
return c.channel
|
|
|
|
|
}
|