package rmq import ( "context" "errors" "github.com/fox/fox/log" amqp "github.com/rabbitmq/amqp091-go" "time" ) type MessageHandler func(context.Context, string) error type Consumer struct { connection *Connection config *RabbitMQConfig handler MessageHandler } func NewConsumer(conn *Connection, config *RabbitMQConfig, handler MessageHandler) *Consumer { return &Consumer{ connection: conn, config: config, handler: handler, } } func (c *Consumer) StartConsuming(ctx context.Context) error { if !c.connection.IsConnected() { return errors.New("not connected to RabbitMQ") } msgs, err := c.connection.GetChannel().Consume( c.config.QueueName, "", false, // auto-ack false, false, false, nil, ) if err != nil { return errors.Join(err, errors.New("failed to start consuming")) } go c.consumeMessages(ctx, msgs) log.InfoF("Started consuming messages from queue: %s", c.config.QueueName) return nil } func (c *Consumer) consumeMessages(ctx context.Context, msgs <-chan amqp.Delivery) { for { select { case <-ctx.Done(): log.Info("Stopping consumer due to context cancellation") return case msg, ok := <-msgs: if !ok { log.Error("Message channel closed") return } c.processMessage(ctx, msg) } } } func (c *Consumer) processMessage(ctx context.Context, delivery amqp.Delivery) { message := string(delivery.Body) // 创建带超时的上下文 msgCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() err := c.handler(msgCtx, message) if err != nil { log.ErrorF("Failed to process message %s: %v", message, err) // 根据重试次数决定是否重试 c.rejectMessage(delivery, true) return } // 确认消息 if err := delivery.Ack(false); err != nil { log.ErrorF("Failed to acknowledge message: %v", err) } else { log.InfoF("Successfully processed message: %s", message) } } func (c *Consumer) rejectMessage(delivery amqp.Delivery, requeue bool) { if err := delivery.Reject(requeue); err != nil { log.ErrorF("Failed to reject message: %v", err) } } func (c *Consumer) Close() error { return c.connection.Close() }