fox/rmq/consumer.go
2025-09-03 22:21:30 +08:00

100 lines
2.1 KiB
Go

package rmq
import (
"context"
"errors"
"github.com/fox/fox/log"
amqp "github.com/rabbitmq/amqp091-go"
"time"
)
type MessageHandler func(context.Context, string) error
type Consumer struct {
connection *Connection
config *RabbitMQConfig
handler MessageHandler
}
func NewConsumer(conn *Connection, config *RabbitMQConfig, handler MessageHandler) *Consumer {
return &Consumer{
connection: conn,
config: config,
handler: handler,
}
}
func (c *Consumer) StartConsuming(ctx context.Context) error {
if !c.connection.IsConnected() {
return errors.New("not connected to RabbitMQ")
}
msgs, err := c.connection.GetChannel().Consume(
c.config.QueueName,
"",
false, // auto-ack
false,
false,
false,
nil,
)
if err != nil {
return errors.Join(err, errors.New("failed to start consuming"))
}
go c.consumeMessages(ctx, msgs)
log.InfoF("Started consuming messages from queue: %s", c.config.QueueName)
return nil
}
func (c *Consumer) consumeMessages(ctx context.Context, msgs <-chan amqp.Delivery) {
for {
select {
case <-ctx.Done():
log.Info("Stopping consumer due to context cancellation")
return
case msg, ok := <-msgs:
if !ok {
log.Error("Message channel closed")
return
}
c.processMessage(ctx, msg)
}
}
}
func (c *Consumer) processMessage(ctx context.Context, delivery amqp.Delivery) {
message := string(delivery.Body)
// 创建带超时的上下文
msgCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
err := c.handler(msgCtx, message)
if err != nil {
log.ErrorF("Failed to process message %s: %v", message, err)
// 根据重试次数决定是否重试
c.rejectMessage(delivery, true)
return
}
// 确认消息
if err := delivery.Ack(false); err != nil {
log.ErrorF("Failed to acknowledge message: %v", err)
} else {
log.InfoF("Successfully processed message: %s", message)
}
}
func (c *Consumer) rejectMessage(delivery amqp.Delivery, requeue bool) {
if err := delivery.Reject(requeue); err != nil {
log.ErrorF("Failed to reject message: %v", err)
}
}
func (c *Consumer) Close() error {
return c.connection.Close()
}