From 8f7a4f8c585245bcc35f7d992061c580603c3858 Mon Sep 17 00:00:00 2001 From: liuxiaobo <1224730913@qq.com> Date: Wed, 27 Aug 2025 23:02:26 +0800 Subject: [PATCH] rabbitmq --- rmq/config.go | 27 ++++++++ rmq/connection.go | 158 ++++++++++++++++++++++++++++++++++++++++++++++ rmq/consumer.go | 99 +++++++++++++++++++++++++++++ rmq/producer.go | 80 +++++++++++++++++++++++ rmq/rmq.go | 110 -------------------------------- rmq/rmq_test.go | 108 +++++++++++++++++++------------ 6 files changed, 431 insertions(+), 151 deletions(-) create mode 100644 rmq/config.go create mode 100644 rmq/connection.go create mode 100644 rmq/consumer.go create mode 100644 rmq/producer.go diff --git a/rmq/config.go b/rmq/config.go new file mode 100644 index 0000000..f2a01d5 --- /dev/null +++ b/rmq/config.go @@ -0,0 +1,27 @@ +package rmq + +import ( + "time" +) + +type RabbitMQConfig struct { + URL string + ExchangeName string + QueueName string + RoutingKey string + ReconnectInterval time.Duration + MaxRetries int + PrefetchCount int +} + +func LoadRabbitMQConfig() *RabbitMQConfig { + return &RabbitMQConfig{ + URL: "amqp://guest:guest@localhost:5672/", + ExchangeName: "app_exchange", + QueueName: "app_queue", + RoutingKey: "app.routing.key", + ReconnectInterval: 5 * time.Second, + MaxRetries: 3, + PrefetchCount: 10, + } +} diff --git a/rmq/connection.go b/rmq/connection.go new file mode 100644 index 0000000..e6df35e --- /dev/null +++ b/rmq/connection.go @@ -0,0 +1,158 @@ +package rmq + +import ( + "context" + "github.com/fox/fox/log" + "github.com/pkg/errors" + amqp "github.com/rabbitmq/amqp091-go" + "time" +) + +type Connection struct { + conn *amqp.Connection + channel *amqp.Channel + config *RabbitMQConfig + isConnected bool + done chan bool + notifyClose chan *amqp.Error + notifyConfirm chan amqp.Confirmation +} + +func NewConnection(config *RabbitMQConfig) (*Connection, error) { + conn := &Connection{ + config: config, + done: make(chan bool), + } + + if err := conn.Connect(); err != nil { + return nil, err + } + + return conn, nil +} + +func (c *Connection) Connect() error { + var err error + + // 建立连接 + c.conn, err = amqp.Dial(c.config.URL) + if err != nil { + return errors.Wrap(err, "failed to connect to RabbitMQ") + } + + // 创建通道 + c.channel, err = c.conn.Channel() + if err != nil { + return errors.Wrap(err, "failed to open channel") + } + + // 设置QoS + err = c.channel.Qos(c.config.PrefetchCount, 0, false) + if err != nil { + return errors.Wrap(err, "failed to set QoS") + } + + // 声明交换器 + err = c.channel.ExchangeDeclare( + c.config.ExchangeName, + "direct", + true, + false, + false, + false, + nil, + ) + if err != nil { + return errors.Wrap(err, "failed to declare exchange") + } + + // 声明队列 + _, err = c.channel.QueueDeclare( + c.config.QueueName, + true, + false, + false, + false, + nil, + ) + if err != nil { + return errors.Wrap(err, "failed to declare queue") + } + + // 绑定队列 + err = c.channel.QueueBind( + c.config.QueueName, + c.config.RoutingKey, + c.config.ExchangeName, + false, + nil, + ) + if err != nil { + return errors.Wrap(err, "failed to bind queue") + } + + // 设置确认通道 + c.notifyClose = make(chan *amqp.Error) + c.notifyConfirm = make(chan amqp.Confirmation) + c.channel.NotifyClose(c.notifyClose) + c.channel.NotifyPublish(c.notifyConfirm) + + c.isConnected = true + log.Info("RabbitMQ connection established successfully") + + return nil +} + +func (c *Connection) Reconnect(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-c.done: + return + case err := <-c.notifyClose: + if err != nil { + log.ErrorF("RabbitMQ connection closed: %v", err) + c.isConnected = false + + // 重连逻辑 + for { + if err := c.Connect(); err != nil { + log.ErrorF("Failed to reconnect: %v. Retrying in %v", err, c.config.ReconnectInterval) + time.Sleep(c.config.ReconnectInterval) + continue + } + break + } + } + } + } +} + +func (c *Connection) Close() error { + close(c.done) + + if c.channel != nil { + if err := c.channel.Close(); err != nil { + return err + } + } + + if c.conn != nil { + if err := c.conn.Close(); err != nil { + return err + } + } + + c.isConnected = false + log.Info("RabbitMQ connection closed") + return nil +} + +func (c *Connection) IsConnected() bool { + return c.isConnected +} + +func (c *Connection) GetChannel() *amqp.Channel { + return c.channel +} diff --git a/rmq/consumer.go b/rmq/consumer.go new file mode 100644 index 0000000..eed54b0 --- /dev/null +++ b/rmq/consumer.go @@ -0,0 +1,99 @@ +package rmq + +import ( + "context" + "github.com/fox/fox/log" + "github.com/pkg/errors" + amqp "github.com/rabbitmq/amqp091-go" + "time" +) + +type MessageHandler func(context.Context, string) error + +type Consumer struct { + connection *Connection + config *RabbitMQConfig + handler MessageHandler +} + +func NewConsumer(conn *Connection, config *RabbitMQConfig, handler MessageHandler) *Consumer { + return &Consumer{ + connection: conn, + config: config, + handler: handler, + } +} + +func (c *Consumer) StartConsuming(ctx context.Context) error { + if !c.connection.IsConnected() { + return errors.New("not connected to RabbitMQ") + } + + msgs, err := c.connection.GetChannel().Consume( + c.config.QueueName, + "", + false, // auto-ack + false, + false, + false, + nil, + ) + if err != nil { + return errors.Wrap(err, "failed to start consuming") + } + + go c.consumeMessages(ctx, msgs) + log.InfoF("Started consuming messages from queue: %s", c.config.QueueName) + + return nil +} + +func (c *Consumer) consumeMessages(ctx context.Context, msgs <-chan amqp.Delivery) { + for { + select { + case <-ctx.Done(): + log.Info("Stopping consumer due to context cancellation") + return + case msg, ok := <-msgs: + if !ok { + log.Error("Message channel closed") + return + } + + c.processMessage(ctx, msg) + } + } +} + +func (c *Consumer) processMessage(ctx context.Context, delivery amqp.Delivery) { + message := string(delivery.Body) + // 创建带超时的上下文 + msgCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + err := c.handler(msgCtx, message) + if err != nil { + log.ErrorF("Failed to process message %s: %v", message, err) + + // 根据重试次数决定是否重试 + c.rejectMessage(delivery, true) + return + } + + // 确认消息 + if err := delivery.Ack(false); err != nil { + log.ErrorF("Failed to acknowledge message: %v", err) + } else { + log.InfoF("Successfully processed message: %s", message) + } +} + +func (c *Consumer) rejectMessage(delivery amqp.Delivery, requeue bool) { + if err := delivery.Reject(requeue); err != nil { + log.ErrorF("Failed to reject message: %v", err) + } +} + +func (c *Consumer) Close() error { + return c.connection.Close() +} diff --git a/rmq/producer.go b/rmq/producer.go new file mode 100644 index 0000000..1c2b1a8 --- /dev/null +++ b/rmq/producer.go @@ -0,0 +1,80 @@ +package rmq + +import ( + "context" + "github.com/fox/fox/log" + "github.com/pkg/errors" + amqp "github.com/rabbitmq/amqp091-go" + "time" +) + +type Producer struct { + connection *Connection + config *RabbitMQConfig +} + +func NewProducer(conn *Connection, config *RabbitMQConfig) *Producer { + return &Producer{ + connection: conn, + config: config, + } +} + +func (p *Producer) Publish(ctx context.Context, message string) error { + if !p.connection.IsConnected() { + return errors.New("not connected to RabbitMQ") + } + + // 重试机制 + for attempt := 1; attempt <= p.config.MaxRetries; attempt++ { + err := p.publishWithRetry(ctx, []byte(message)) + if err == nil { + return nil + } + + log.WarnF("Publish attempt %d failed: %v", attempt, err) + if attempt < p.config.MaxRetries { + time.Sleep(time.Duration(attempt) * time.Second) + } + } + + return errors.New("failed to publish message after all retries") +} + +func (p *Producer) publishWithRetry(ctx context.Context, body []byte) error { + channel := p.connection.GetChannel() + + err := channel.PublishWithContext( + ctx, + p.config.ExchangeName, + p.config.RoutingKey, + false, + false, + amqp.Publishing{ + ContentType: "application/json", + Body: body, + DeliveryMode: amqp.Persistent, + Timestamp: time.Now(), + }, + ) + + if err != nil { + return errors.Wrap(err, "failed to publish message") + } + + // 等待确认 + select { + case confirm := <-p.connection.notifyConfirm: + if !confirm.Ack { + return errors.New("message not acknowledged by broker") + } + case <-time.After(5 * time.Second): + return errors.New("message confirmation timeout") + } + + return nil +} + +func (p *Producer) Close() error { + return p.connection.Close() +} diff --git a/rmq/rmq.go b/rmq/rmq.go index 7642226..dc14add 100644 --- a/rmq/rmq.go +++ b/rmq/rmq.go @@ -1,111 +1 @@ package rmq - -import ( - "context" - "github.com/fox/fox/log" - "github.com/google/uuid" - "github.com/rabbitmq/amqp091-go" - "time" -) - -const ( - ExchangeDirect = "direct" - // ExchangeFanout = "fanout" - ExchangeTopic = "topic" -) - -type Rmq struct { - conn *amqp091.Connection - ch *amqp091.Channel - consumerTag string -} - -// url:amqp://guest:guest@localhost:5672/ -func NewRmq(url string) (*Rmq, error) { - rmq := &Rmq{consumerTag: uuid.NewString()} - - retries := 0 - retryDelay := 1 * time.Second - - var err error - for { - rmq.conn, err = amqp091.DialConfig(url, amqp091.Config{Heartbeat: 10 * time.Second}) - if err == nil { - break - } - retries++ - time.Sleep(retryDelay) - if retryDelay < 30*time.Second { - retryDelay *= 2 - } - log.ErrorF("amqp connection failed after %v reconnect.err:%v url:%v", retryDelay, err, url) - } - - rmq.ch, err = rmq.conn.Channel() - if err != nil { - return nil, err - } - return rmq, nil -} - -func (r *Rmq) Close() { - if r.conn != nil { - _ = r.conn.Close() - } - if r.ch != nil { - _ = r.ch.Close() - } -} - -func (r *Rmq) ExchangeDeclare(exchangeName, typ string) error { - return r.ch.ExchangeDeclare(exchangeName, typ, true, false, false, false, nil) -} - -func (r *Rmq) QueueDeclare(queueName string) error { - _, err := r.ch.QueueDeclare(queueName, false, false, false, false, nil) - return err -} - -func (r *Rmq) QueueDelete(queueName string) error { - _, err := r.ch.QueueDelete(queueName, false, false, false) - return err -} - -func (r *Rmq) QueueBind(queueName, routerKey, exchangeName string) error { - return r.ch.QueueBind(queueName, routerKey, exchangeName, false, nil) -} - -func (r *Rmq) QueueUnbind(queueName, routerKey, exchangeName string) error { - return r.ch.QueueUnbind(queueName, routerKey, exchangeName, nil) -} - -// 发布消息到交换机,带有指定的路由键 -func (r *Rmq) Publish(exchangeName, routerKey string, msg []byte) error { - return r.ch.Publish(exchangeName, routerKey, true, false, amqp091.Publishing{ - ContentType: "text/plain", - Body: msg, - }) -} - -// 发布消息到交换机,带有指定的路由键 -func (r *Rmq) PublishRpc(d *amqp091.Delivery, msg []byte) error { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - return r.ch.PublishWithContext(ctx, "", d.ReplyTo, false, false, amqp091.Publishing{ - ContentType: "text/plain", - CorrelationId: d.CorrelationId, - Body: msg, - }) -} - -func (r *Rmq) PublishRaw(exchangeName, routerKey string, data amqp091.Publishing) error { - return r.ch.Publish(exchangeName, routerKey, true, false, data) -} - -func (r *Rmq) Consume(queueName string) (<-chan amqp091.Delivery, error) { - return r.ch.Consume(queueName, r.consumerTag, true, false, false, false, nil) -} - -func (r *Rmq) ConsumeDelete() error { - return r.ch.Cancel(r.consumerTag, true) -} diff --git a/rmq/rmq_test.go b/rmq/rmq_test.go index 108fb6c..57368ce 100644 --- a/rmq/rmq_test.go +++ b/rmq/rmq_test.go @@ -1,53 +1,79 @@ package rmq import ( + "context" + "github.com/fox/fox/log" + "os" + "os/signal" + "syscall" "testing" "time" ) -const url = "amqp://samba:samba@testbuild.shoa.com:5672/vh_samba" -const exchangeName = "test_e" -const queueName = "test_q" +//const url = "amqp://samba:samba@testbuild.shoa.com:5672/vh_samba" +//const exchangeName = "test_e" +//const queueName = "test_q" -func testCreateMq(t *testing.T) *Rmq { - mq, err := NewRmq(url) - if err != nil { - t.Error(err) - } - err = mq.ExchangeDeclare(exchangeName, ExchangeDirect) - if err != nil { - t.Error(err) - } - err = mq.QueueDeclare(queueName) - if err != nil { - t.Error(err) - } - return mq -} - -func testPublish(t *testing.T, mq *Rmq) { - err := mq.Publish(exchangeName, queueName, []byte("hello world")) - if err != nil { - t.Error(err) - } - return -} - -func testConsume(t *testing.T, mq *Rmq) { - msgs, err := mq.Consume(queueName) - if err != nil { - t.Error(err) - } - for msg := range msgs { - t.Log(string(msg.Body)) - } - return +func initLog() { + log.Open("test.log", log.DebugL) } func TestRabbitmq(t *testing.T) { - mq := testCreateMq(t) - defer mq.Close() - go testConsume(t, mq) - testPublish(t, mq) - time.Sleep(2 * time.Second) + initLog() + // 加载配置 + rabbitConfig := LoadRabbitMQConfig() + + // 创建连接 + conn, err := NewConnection(rabbitConfig) + if err != nil { + log.ErrorF("Failed to create RabbitMQ connection: %v", err) + return + } + defer func() { _ = conn.Close() }() + + // 启动重连监控 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go conn.Reconnect(ctx) + + // 创建生产者 + producer := NewProducer(conn, rabbitConfig) + defer func() { _ = producer.Close() }() + + // 创建消费者处理器 + handler := func(ctx context.Context, msg string) error { + log.InfoF("Processing message: %s", msg) + // 这里实现具体的业务逻辑 + time.Sleep(1 * time.Second) // 模拟处理时间 + return nil + } + + // 创建消费者 + consumer := NewConsumer(conn, rabbitConfig, handler) + defer func() { _ = consumer.Close() }() + + // 启动消费者 + if err = consumer.StartConsuming(ctx); err != nil { + log.ErrorF("Failed to start consumer: %v", err) + return + } + + // 示例:发送测试消息 + go func() { + time.Sleep(2 * time.Second) // 等待消费者启动 + testMessage := "hello world" + if err = producer.Publish(ctx, testMessage); err != nil { + log.ErrorF("Failed to publish test message: %v", err) + } else { + log.Info("Test message published successfully") + } + }() + + // 等待中断信号 + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + + log.Info("Shutting down...") + cancel() }