package rmq import ( "context" "github.com/fox/fox/log" "github.com/pkg/errors" amqp "github.com/rabbitmq/amqp091-go" "time" ) type Connection struct { conn *amqp.Connection channel *amqp.Channel config *RabbitMQConfig isConnected bool done chan bool notifyClose chan *amqp.Error notifyConfirm chan amqp.Confirmation } func NewConnection(config *RabbitMQConfig) (*Connection, error) { conn := &Connection{ config: config, done: make(chan bool), } if err := conn.Connect(); err != nil { return nil, err } return conn, nil } func (c *Connection) Connect() error { var err error // 建立连接 c.conn, err = amqp.Dial(c.config.URL) if err != nil { return errors.Wrap(err, "failed to connect to RabbitMQ") } // 创建通道 c.channel, err = c.conn.Channel() if err != nil { return errors.Wrap(err, "failed to open channel") } // 设置QoS err = c.channel.Qos(c.config.PrefetchCount, 0, false) if err != nil { return errors.Wrap(err, "failed to set QoS") } // 声明交换器 err = c.channel.ExchangeDeclare( c.config.ExchangeName, "direct", true, false, false, false, nil, ) if err != nil { return errors.Wrap(err, "failed to declare exchange") } // 声明队列 _, err = c.channel.QueueDeclare( c.config.QueueName, true, false, false, false, nil, ) if err != nil { return errors.Wrap(err, "failed to declare queue") } // 绑定队列 err = c.channel.QueueBind( c.config.QueueName, c.config.RoutingKey, c.config.ExchangeName, false, nil, ) if err != nil { return errors.Wrap(err, "failed to bind queue") } // 设置确认通道 c.notifyClose = make(chan *amqp.Error) c.notifyConfirm = make(chan amqp.Confirmation) c.channel.NotifyClose(c.notifyClose) c.channel.NotifyPublish(c.notifyConfirm) c.isConnected = true log.Info("RabbitMQ connection established successfully") return nil } func (c *Connection) Reconnect(ctx context.Context) { for { select { case <-ctx.Done(): return case <-c.done: return case err := <-c.notifyClose: if err != nil { log.ErrorF("RabbitMQ connection closed: %v", err) c.isConnected = false // 重连逻辑 for { if err := c.Connect(); err != nil { log.ErrorF("Failed to reconnect: %v. Retrying in %v", err, c.config.ReconnectInterval) time.Sleep(c.config.ReconnectInterval) continue } break } } } } } func (c *Connection) Close() error { close(c.done) if c.channel != nil { if err := c.channel.Close(); err != nil { return err } } if c.conn != nil { if err := c.conn.Close(); err != nil { return err } } c.isConnected = false log.Info("RabbitMQ connection closed") return nil } func (c *Connection) IsConnected() bool { return c.isConnected } func (c *Connection) GetChannel() *amqp.Channel { return c.channel }