rabbitmq
This commit is contained in:
parent
7934253651
commit
8f7a4f8c58
27
rmq/config.go
Normal file
27
rmq/config.go
Normal file
@ -0,0 +1,27 @@
|
||||
package rmq
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type RabbitMQConfig struct {
|
||||
URL string
|
||||
ExchangeName string
|
||||
QueueName string
|
||||
RoutingKey string
|
||||
ReconnectInterval time.Duration
|
||||
MaxRetries int
|
||||
PrefetchCount int
|
||||
}
|
||||
|
||||
func LoadRabbitMQConfig() *RabbitMQConfig {
|
||||
return &RabbitMQConfig{
|
||||
URL: "amqp://guest:guest@localhost:5672/",
|
||||
ExchangeName: "app_exchange",
|
||||
QueueName: "app_queue",
|
||||
RoutingKey: "app.routing.key",
|
||||
ReconnectInterval: 5 * time.Second,
|
||||
MaxRetries: 3,
|
||||
PrefetchCount: 10,
|
||||
}
|
||||
}
|
158
rmq/connection.go
Normal file
158
rmq/connection.go
Normal file
@ -0,0 +1,158 @@
|
||||
package rmq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/fox/fox/log"
|
||||
"github.com/pkg/errors"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Connection struct {
|
||||
conn *amqp.Connection
|
||||
channel *amqp.Channel
|
||||
config *RabbitMQConfig
|
||||
isConnected bool
|
||||
done chan bool
|
||||
notifyClose chan *amqp.Error
|
||||
notifyConfirm chan amqp.Confirmation
|
||||
}
|
||||
|
||||
func NewConnection(config *RabbitMQConfig) (*Connection, error) {
|
||||
conn := &Connection{
|
||||
config: config,
|
||||
done: make(chan bool),
|
||||
}
|
||||
|
||||
if err := conn.Connect(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (c *Connection) Connect() error {
|
||||
var err error
|
||||
|
||||
// 建立连接
|
||||
c.conn, err = amqp.Dial(c.config.URL)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to connect to RabbitMQ")
|
||||
}
|
||||
|
||||
// 创建通道
|
||||
c.channel, err = c.conn.Channel()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to open channel")
|
||||
}
|
||||
|
||||
// 设置QoS
|
||||
err = c.channel.Qos(c.config.PrefetchCount, 0, false)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to set QoS")
|
||||
}
|
||||
|
||||
// 声明交换器
|
||||
err = c.channel.ExchangeDeclare(
|
||||
c.config.ExchangeName,
|
||||
"direct",
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to declare exchange")
|
||||
}
|
||||
|
||||
// 声明队列
|
||||
_, err = c.channel.QueueDeclare(
|
||||
c.config.QueueName,
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to declare queue")
|
||||
}
|
||||
|
||||
// 绑定队列
|
||||
err = c.channel.QueueBind(
|
||||
c.config.QueueName,
|
||||
c.config.RoutingKey,
|
||||
c.config.ExchangeName,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to bind queue")
|
||||
}
|
||||
|
||||
// 设置确认通道
|
||||
c.notifyClose = make(chan *amqp.Error)
|
||||
c.notifyConfirm = make(chan amqp.Confirmation)
|
||||
c.channel.NotifyClose(c.notifyClose)
|
||||
c.channel.NotifyPublish(c.notifyConfirm)
|
||||
|
||||
c.isConnected = true
|
||||
log.Info("RabbitMQ connection established successfully")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Connection) Reconnect(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-c.done:
|
||||
return
|
||||
case err := <-c.notifyClose:
|
||||
if err != nil {
|
||||
log.ErrorF("RabbitMQ connection closed: %v", err)
|
||||
c.isConnected = false
|
||||
|
||||
// 重连逻辑
|
||||
for {
|
||||
if err := c.Connect(); err != nil {
|
||||
log.ErrorF("Failed to reconnect: %v. Retrying in %v", err, c.config.ReconnectInterval)
|
||||
time.Sleep(c.config.ReconnectInterval)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) Close() error {
|
||||
close(c.done)
|
||||
|
||||
if c.channel != nil {
|
||||
if err := c.channel.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if c.conn != nil {
|
||||
if err := c.conn.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
c.isConnected = false
|
||||
log.Info("RabbitMQ connection closed")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Connection) IsConnected() bool {
|
||||
return c.isConnected
|
||||
}
|
||||
|
||||
func (c *Connection) GetChannel() *amqp.Channel {
|
||||
return c.channel
|
||||
}
|
99
rmq/consumer.go
Normal file
99
rmq/consumer.go
Normal file
@ -0,0 +1,99 @@
|
||||
package rmq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/fox/fox/log"
|
||||
"github.com/pkg/errors"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"time"
|
||||
)
|
||||
|
||||
type MessageHandler func(context.Context, string) error
|
||||
|
||||
type Consumer struct {
|
||||
connection *Connection
|
||||
config *RabbitMQConfig
|
||||
handler MessageHandler
|
||||
}
|
||||
|
||||
func NewConsumer(conn *Connection, config *RabbitMQConfig, handler MessageHandler) *Consumer {
|
||||
return &Consumer{
|
||||
connection: conn,
|
||||
config: config,
|
||||
handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) StartConsuming(ctx context.Context) error {
|
||||
if !c.connection.IsConnected() {
|
||||
return errors.New("not connected to RabbitMQ")
|
||||
}
|
||||
|
||||
msgs, err := c.connection.GetChannel().Consume(
|
||||
c.config.QueueName,
|
||||
"",
|
||||
false, // auto-ack
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to start consuming")
|
||||
}
|
||||
|
||||
go c.consumeMessages(ctx, msgs)
|
||||
log.InfoF("Started consuming messages from queue: %s", c.config.QueueName)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Consumer) consumeMessages(ctx context.Context, msgs <-chan amqp.Delivery) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info("Stopping consumer due to context cancellation")
|
||||
return
|
||||
case msg, ok := <-msgs:
|
||||
if !ok {
|
||||
log.Error("Message channel closed")
|
||||
return
|
||||
}
|
||||
|
||||
c.processMessage(ctx, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) processMessage(ctx context.Context, delivery amqp.Delivery) {
|
||||
message := string(delivery.Body)
|
||||
// 创建带超时的上下文
|
||||
msgCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err := c.handler(msgCtx, message)
|
||||
if err != nil {
|
||||
log.ErrorF("Failed to process message %s: %v", message, err)
|
||||
|
||||
// 根据重试次数决定是否重试
|
||||
c.rejectMessage(delivery, true)
|
||||
return
|
||||
}
|
||||
|
||||
// 确认消息
|
||||
if err := delivery.Ack(false); err != nil {
|
||||
log.ErrorF("Failed to acknowledge message: %v", err)
|
||||
} else {
|
||||
log.InfoF("Successfully processed message: %s", message)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) rejectMessage(delivery amqp.Delivery, requeue bool) {
|
||||
if err := delivery.Reject(requeue); err != nil {
|
||||
log.ErrorF("Failed to reject message: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) Close() error {
|
||||
return c.connection.Close()
|
||||
}
|
80
rmq/producer.go
Normal file
80
rmq/producer.go
Normal file
@ -0,0 +1,80 @@
|
||||
package rmq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/fox/fox/log"
|
||||
"github.com/pkg/errors"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Producer struct {
|
||||
connection *Connection
|
||||
config *RabbitMQConfig
|
||||
}
|
||||
|
||||
func NewProducer(conn *Connection, config *RabbitMQConfig) *Producer {
|
||||
return &Producer{
|
||||
connection: conn,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Producer) Publish(ctx context.Context, message string) error {
|
||||
if !p.connection.IsConnected() {
|
||||
return errors.New("not connected to RabbitMQ")
|
||||
}
|
||||
|
||||
// 重试机制
|
||||
for attempt := 1; attempt <= p.config.MaxRetries; attempt++ {
|
||||
err := p.publishWithRetry(ctx, []byte(message))
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.WarnF("Publish attempt %d failed: %v", attempt, err)
|
||||
if attempt < p.config.MaxRetries {
|
||||
time.Sleep(time.Duration(attempt) * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
return errors.New("failed to publish message after all retries")
|
||||
}
|
||||
|
||||
func (p *Producer) publishWithRetry(ctx context.Context, body []byte) error {
|
||||
channel := p.connection.GetChannel()
|
||||
|
||||
err := channel.PublishWithContext(
|
||||
ctx,
|
||||
p.config.ExchangeName,
|
||||
p.config.RoutingKey,
|
||||
false,
|
||||
false,
|
||||
amqp.Publishing{
|
||||
ContentType: "application/json",
|
||||
Body: body,
|
||||
DeliveryMode: amqp.Persistent,
|
||||
Timestamp: time.Now(),
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to publish message")
|
||||
}
|
||||
|
||||
// 等待确认
|
||||
select {
|
||||
case confirm := <-p.connection.notifyConfirm:
|
||||
if !confirm.Ack {
|
||||
return errors.New("message not acknowledged by broker")
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
return errors.New("message confirmation timeout")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Producer) Close() error {
|
||||
return p.connection.Close()
|
||||
}
|
110
rmq/rmq.go
110
rmq/rmq.go
@ -1,111 +1 @@
|
||||
package rmq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/fox/fox/log"
|
||||
"github.com/google/uuid"
|
||||
"github.com/rabbitmq/amqp091-go"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
ExchangeDirect = "direct"
|
||||
// ExchangeFanout = "fanout"
|
||||
ExchangeTopic = "topic"
|
||||
)
|
||||
|
||||
type Rmq struct {
|
||||
conn *amqp091.Connection
|
||||
ch *amqp091.Channel
|
||||
consumerTag string
|
||||
}
|
||||
|
||||
// url:amqp://guest:guest@localhost:5672/
|
||||
func NewRmq(url string) (*Rmq, error) {
|
||||
rmq := &Rmq{consumerTag: uuid.NewString()}
|
||||
|
||||
retries := 0
|
||||
retryDelay := 1 * time.Second
|
||||
|
||||
var err error
|
||||
for {
|
||||
rmq.conn, err = amqp091.DialConfig(url, amqp091.Config{Heartbeat: 10 * time.Second})
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
retries++
|
||||
time.Sleep(retryDelay)
|
||||
if retryDelay < 30*time.Second {
|
||||
retryDelay *= 2
|
||||
}
|
||||
log.ErrorF("amqp connection failed after %v reconnect.err:%v url:%v", retryDelay, err, url)
|
||||
}
|
||||
|
||||
rmq.ch, err = rmq.conn.Channel()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rmq, nil
|
||||
}
|
||||
|
||||
func (r *Rmq) Close() {
|
||||
if r.conn != nil {
|
||||
_ = r.conn.Close()
|
||||
}
|
||||
if r.ch != nil {
|
||||
_ = r.ch.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Rmq) ExchangeDeclare(exchangeName, typ string) error {
|
||||
return r.ch.ExchangeDeclare(exchangeName, typ, true, false, false, false, nil)
|
||||
}
|
||||
|
||||
func (r *Rmq) QueueDeclare(queueName string) error {
|
||||
_, err := r.ch.QueueDeclare(queueName, false, false, false, false, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *Rmq) QueueDelete(queueName string) error {
|
||||
_, err := r.ch.QueueDelete(queueName, false, false, false)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *Rmq) QueueBind(queueName, routerKey, exchangeName string) error {
|
||||
return r.ch.QueueBind(queueName, routerKey, exchangeName, false, nil)
|
||||
}
|
||||
|
||||
func (r *Rmq) QueueUnbind(queueName, routerKey, exchangeName string) error {
|
||||
return r.ch.QueueUnbind(queueName, routerKey, exchangeName, nil)
|
||||
}
|
||||
|
||||
// 发布消息到交换机,带有指定的路由键
|
||||
func (r *Rmq) Publish(exchangeName, routerKey string, msg []byte) error {
|
||||
return r.ch.Publish(exchangeName, routerKey, true, false, amqp091.Publishing{
|
||||
ContentType: "text/plain",
|
||||
Body: msg,
|
||||
})
|
||||
}
|
||||
|
||||
// 发布消息到交换机,带有指定的路由键
|
||||
func (r *Rmq) PublishRpc(d *amqp091.Delivery, msg []byte) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
return r.ch.PublishWithContext(ctx, "", d.ReplyTo, false, false, amqp091.Publishing{
|
||||
ContentType: "text/plain",
|
||||
CorrelationId: d.CorrelationId,
|
||||
Body: msg,
|
||||
})
|
||||
}
|
||||
|
||||
func (r *Rmq) PublishRaw(exchangeName, routerKey string, data amqp091.Publishing) error {
|
||||
return r.ch.Publish(exchangeName, routerKey, true, false, data)
|
||||
}
|
||||
|
||||
func (r *Rmq) Consume(queueName string) (<-chan amqp091.Delivery, error) {
|
||||
return r.ch.Consume(queueName, r.consumerTag, true, false, false, false, nil)
|
||||
}
|
||||
|
||||
func (r *Rmq) ConsumeDelete() error {
|
||||
return r.ch.Cancel(r.consumerTag, true)
|
||||
}
|
||||
|
108
rmq/rmq_test.go
108
rmq/rmq_test.go
@ -1,53 +1,79 @@
|
||||
package rmq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/fox/fox/log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const url = "amqp://samba:samba@testbuild.shoa.com:5672/vh_samba"
|
||||
const exchangeName = "test_e"
|
||||
const queueName = "test_q"
|
||||
//const url = "amqp://samba:samba@testbuild.shoa.com:5672/vh_samba"
|
||||
//const exchangeName = "test_e"
|
||||
//const queueName = "test_q"
|
||||
|
||||
func testCreateMq(t *testing.T) *Rmq {
|
||||
mq, err := NewRmq(url)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
err = mq.ExchangeDeclare(exchangeName, ExchangeDirect)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
err = mq.QueueDeclare(queueName)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
return mq
|
||||
}
|
||||
|
||||
func testPublish(t *testing.T, mq *Rmq) {
|
||||
err := mq.Publish(exchangeName, queueName, []byte("hello world"))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func testConsume(t *testing.T, mq *Rmq) {
|
||||
msgs, err := mq.Consume(queueName)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
for msg := range msgs {
|
||||
t.Log(string(msg.Body))
|
||||
}
|
||||
return
|
||||
func initLog() {
|
||||
log.Open("test.log", log.DebugL)
|
||||
}
|
||||
|
||||
func TestRabbitmq(t *testing.T) {
|
||||
mq := testCreateMq(t)
|
||||
defer mq.Close()
|
||||
go testConsume(t, mq)
|
||||
testPublish(t, mq)
|
||||
time.Sleep(2 * time.Second)
|
||||
initLog()
|
||||
// 加载配置
|
||||
rabbitConfig := LoadRabbitMQConfig()
|
||||
|
||||
// 创建连接
|
||||
conn, err := NewConnection(rabbitConfig)
|
||||
if err != nil {
|
||||
log.ErrorF("Failed to create RabbitMQ connection: %v", err)
|
||||
return
|
||||
}
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
// 启动重连监控
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go conn.Reconnect(ctx)
|
||||
|
||||
// 创建生产者
|
||||
producer := NewProducer(conn, rabbitConfig)
|
||||
defer func() { _ = producer.Close() }()
|
||||
|
||||
// 创建消费者处理器
|
||||
handler := func(ctx context.Context, msg string) error {
|
||||
log.InfoF("Processing message: %s", msg)
|
||||
// 这里实现具体的业务逻辑
|
||||
time.Sleep(1 * time.Second) // 模拟处理时间
|
||||
return nil
|
||||
}
|
||||
|
||||
// 创建消费者
|
||||
consumer := NewConsumer(conn, rabbitConfig, handler)
|
||||
defer func() { _ = consumer.Close() }()
|
||||
|
||||
// 启动消费者
|
||||
if err = consumer.StartConsuming(ctx); err != nil {
|
||||
log.ErrorF("Failed to start consumer: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 示例:发送测试消息
|
||||
go func() {
|
||||
time.Sleep(2 * time.Second) // 等待消费者启动
|
||||
testMessage := "hello world"
|
||||
if err = producer.Publish(ctx, testMessage); err != nil {
|
||||
log.ErrorF("Failed to publish test message: %v", err)
|
||||
} else {
|
||||
log.Info("Test message published successfully")
|
||||
}
|
||||
}()
|
||||
|
||||
// 等待中断信号
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigChan
|
||||
|
||||
log.Info("Shutting down...")
|
||||
cancel()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user