fox/rmq/connection.go

162 lines
3.4 KiB
Go
Raw Permalink Normal View History

2025-08-27 23:02:26 +08:00
package rmq
import (
"context"
2025-09-03 22:21:30 +08:00
"errors"
2025-08-27 23:02:26 +08:00
"github.com/fox/fox/log"
amqp "github.com/rabbitmq/amqp091-go"
"time"
)
type Connection struct {
conn *amqp.Connection
channel *amqp.Channel
config *RabbitMQConfig
isConnected bool
done chan bool
notifyClose chan *amqp.Error
notifyConfirm chan amqp.Confirmation
}
func NewConnection(config *RabbitMQConfig) (*Connection, error) {
conn := &Connection{
config: config,
done: make(chan bool),
}
if err := conn.Connect(); err != nil {
return nil, err
}
return conn, nil
}
func (c *Connection) Connect() error {
var err error
// 建立连接
c.conn, err = amqp.Dial(c.config.URL)
if err != nil {
2025-09-03 22:21:30 +08:00
return errors.Join(err, errors.New("failed to connect to RabbitMQ"))
2025-08-27 23:02:26 +08:00
}
// 创建通道
c.channel, err = c.conn.Channel()
if err != nil {
2025-09-03 22:21:30 +08:00
return errors.Join(err, errors.New("failed to open channel"))
2025-08-27 23:02:26 +08:00
}
// 设置QoS
err = c.channel.Qos(c.config.PrefetchCount, 0, false)
if err != nil {
2025-09-03 22:21:30 +08:00
return errors.Join(err, errors.New("failed to set QoS"))
2025-08-27 23:02:26 +08:00
}
// 声明交换器
err = c.channel.ExchangeDeclare(
c.config.ExchangeName,
"direct",
true,
false,
false,
false,
nil,
)
if err != nil {
2025-09-03 22:21:30 +08:00
return errors.Join(err, errors.New("failed to declare exchange"))
2025-08-27 23:02:26 +08:00
}
// 声明队列
_, err = c.channel.QueueDeclare(
2025-09-03 22:21:30 +08:00
c.config.QueueName, // 队列名称
c.config.Durable, // 持久化 true队列元数据名称、属性和消息会写入磁盘RabbitMQ 重启后仍存在。false队列存在于内存重启后丢失。
false, // 不自动删除 true当最后一个消费者断开连接后队列自动删除。
false, // 非独占队列
false, // 等待服务器确认
2025-08-27 23:02:26 +08:00
nil,
)
if err != nil {
2025-09-03 22:21:30 +08:00
return errors.Join(err, errors.New("failed to declare queue"))
2025-08-27 23:02:26 +08:00
}
// 绑定队列
err = c.channel.QueueBind(
c.config.QueueName,
c.config.RoutingKey,
c.config.ExchangeName,
false,
nil,
)
if err != nil {
2025-09-03 22:21:30 +08:00
return errors.Join(err, errors.New("failed to bind queue"))
2025-08-27 23:02:26 +08:00
}
// 设置确认通道
c.notifyClose = make(chan *amqp.Error)
2025-09-03 22:40:27 +08:00
if err = c.channel.Confirm(false); err != nil {
return errors.Join(err, errors.New("failed to set confirm"))
}
2025-08-27 23:02:26 +08:00
c.notifyConfirm = make(chan amqp.Confirmation)
c.channel.NotifyClose(c.notifyClose)
c.channel.NotifyPublish(c.notifyConfirm)
c.isConnected = true
log.Info("RabbitMQ connection established successfully")
return nil
}
func (c *Connection) Reconnect(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-c.done:
return
case err := <-c.notifyClose:
if err != nil {
log.ErrorF("RabbitMQ connection closed: %v", err)
c.isConnected = false
// 重连逻辑
for {
if err := c.Connect(); err != nil {
2025-09-03 22:21:30 +08:00
log.ErrorF("Failed to reconnect: %v. Retrying in %vs", err, c.config.ReconnectInterval)
time.Sleep(time.Duration(c.config.ReconnectInterval) * time.Second)
2025-08-27 23:02:26 +08:00
continue
}
break
}
}
}
}
}
func (c *Connection) Close() error {
close(c.done)
if c.channel != nil {
if err := c.channel.Close(); err != nil {
return err
}
}
if c.conn != nil {
if err := c.conn.Close(); err != nil {
return err
}
}
c.isConnected = false
log.Info("RabbitMQ connection closed")
return nil
}
func (c *Connection) IsConnected() bool {
return c.isConnected
}
func (c *Connection) GetChannel() *amqp.Channel {
return c.channel
}