diff --git a/rmq/connection.go b/rmq/connection.go index 2b3f0bd..43456d1 100644 --- a/rmq/connection.go +++ b/rmq/connection.go @@ -93,6 +93,9 @@ func (c *Connection) Connect() error { // 设置确认通道 c.notifyClose = make(chan *amqp.Error) + if err = c.channel.Confirm(false); err != nil { + return errors.Join(err, errors.New("failed to set confirm")) + } c.notifyConfirm = make(chan amqp.Confirmation) c.channel.NotifyClose(c.notifyClose) c.channel.NotifyPublish(c.notifyConfirm) diff --git a/rmq/rmq_test.go b/rmq/rmq_test.go index 9197ea0..43a6c47 100644 --- a/rmq/rmq_test.go +++ b/rmq/rmq_test.go @@ -2,6 +2,7 @@ package rmq import ( "context" + "fmt" "github.com/fox/fox/log" "os" "os/signal" @@ -61,12 +62,15 @@ func TestRabbitmq(t *testing.T) { // 示例:发送测试消息 go func() { time.Sleep(2 * time.Second) // 等待消费者启动 - testMessage := "hello world" - if err = producer.Publish(ctx, testMessage); err != nil { - log.ErrorF("Failed to publish test message: %v", err) - } else { - log.Info("Test message published successfully") + for i := 0; i < 10; i++ { + testMessage := fmt.Sprintf("hello world %d", i) + if err = producer.Publish(ctx, testMessage); err != nil { + log.ErrorF("Failed to publish test message: %v", err) + } else { + log.Info("Test message published successfully") + } } + }() // 等待中断信号