From 63f13fa409ac27c8d5e72688fdf62d74e5723f18 Mon Sep 17 00:00:00 2001 From: liuxiaobo <1224730913@qq.com> Date: Wed, 3 Sep 2025 22:21:30 +0800 Subject: [PATCH] rabbitmq --- go.mod | 4 ++-- ipb/helper.go | 2 +- processor/processor.go | 2 +- processor/processor_test.go | 2 +- rmq/config.go | 23 ++++++++++------------- rmq/connection.go | 28 ++++++++++++++-------------- rmq/consumer.go | 4 ++-- rmq/producer.go | 4 ++-- rmq/rmq_docker.txt | 12 ++++++++++++ rmq/rmq_test.go | 2 +- service/natsService.go | 2 +- ws/wsConn.go | 2 +- 12 files changed, 48 insertions(+), 39 deletions(-) create mode 100644 rmq/rmq_docker.txt diff --git a/go.mod b/go.mod index ef6378e..2795c8d 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,6 @@ require ( github.com/ClickHouse/clickhouse-go/v2 v2.36.0 github.com/go-redis/redis/v8 v8.11.5 github.com/golang-module/carbon/v2 v2.6.5 - github.com/golang/protobuf v1.5.4 - github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/natefinch/lumberjack v2.0.0+incompatible github.com/nats-io/nats.go v1.42.0 @@ -38,7 +36,9 @@ require ( github.com/go-faster/errors v0.7.1 // indirect github.com/go-sql-driver/mysql v1.7.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.1 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect diff --git a/ipb/helper.go b/ipb/helper.go index 9e21f41..5c08fee 100644 --- a/ipb/helper.go +++ b/ipb/helper.go @@ -2,7 +2,7 @@ package ipb import ( "encoding/json" - "github.com/golang/protobuf/proto" + "google.golang.org/protobuf/proto" ) func MakeMsg(serviceName string, connId uint32, userId int64, msgId int32, data []byte) *InternalMsg { diff --git a/processor/processor.go b/processor/processor.go index 27376c1..f7d1096 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -3,7 +3,7 @@ package processor import ( "fmt" "github.com/fox/fox/log" - "github.com/golang/protobuf/proto" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" "reflect" ) diff --git a/processor/processor_test.go b/processor/processor_test.go index 3ed3776..8f51326 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -5,7 +5,7 @@ import ( "github.com/fox/fox/ipb" "github.com/fox/fox/log" "github.com/fox/fox/timer" - "github.com/golang/protobuf/proto" + "google.golang.org/protobuf/proto" "testing" ) diff --git a/rmq/config.go b/rmq/config.go index f2a01d5..59893b2 100644 --- a/rmq/config.go +++ b/rmq/config.go @@ -1,26 +1,23 @@ package rmq -import ( - "time" -) - type RabbitMQConfig struct { - URL string - ExchangeName string - QueueName string - RoutingKey string - ReconnectInterval time.Duration - MaxRetries int - PrefetchCount int + URL string // url + ExchangeName string // 交换器 + QueueName string // 队列名 + RoutingKey string // 路由名 + ReconnectInterval int64 // 重连间隔(s) + MaxRetries int // 重连次数 + PrefetchCount int // 预取数量 防止队列内消息过多,冲垮消费者 + Durable bool // 消息持久化至硬盘 } func LoadRabbitMQConfig() *RabbitMQConfig { return &RabbitMQConfig{ - URL: "amqp://guest:guest@localhost:5672/", + URL: "amqp://admin:password@114.132.124.145:5672/vh_game_dev", ExchangeName: "app_exchange", QueueName: "app_queue", RoutingKey: "app.routing.key", - ReconnectInterval: 5 * time.Second, + ReconnectInterval: 5, MaxRetries: 3, PrefetchCount: 10, } diff --git a/rmq/connection.go b/rmq/connection.go index e6df35e..2b3f0bd 100644 --- a/rmq/connection.go +++ b/rmq/connection.go @@ -2,8 +2,8 @@ package rmq import ( "context" + "errors" "github.com/fox/fox/log" - "github.com/pkg/errors" amqp "github.com/rabbitmq/amqp091-go" "time" ) @@ -37,19 +37,19 @@ func (c *Connection) Connect() error { // 建立连接 c.conn, err = amqp.Dial(c.config.URL) if err != nil { - return errors.Wrap(err, "failed to connect to RabbitMQ") + return errors.Join(err, errors.New("failed to connect to RabbitMQ")) } // 创建通道 c.channel, err = c.conn.Channel() if err != nil { - return errors.Wrap(err, "failed to open channel") + return errors.Join(err, errors.New("failed to open channel")) } // 设置QoS err = c.channel.Qos(c.config.PrefetchCount, 0, false) if err != nil { - return errors.Wrap(err, "failed to set QoS") + return errors.Join(err, errors.New("failed to set QoS")) } // 声明交换器 @@ -63,20 +63,20 @@ func (c *Connection) Connect() error { nil, ) if err != nil { - return errors.Wrap(err, "failed to declare exchange") + return errors.Join(err, errors.New("failed to declare exchange")) } // 声明队列 _, err = c.channel.QueueDeclare( - c.config.QueueName, - true, - false, - false, - false, + c.config.QueueName, // 队列名称 + c.config.Durable, // 持久化 true:队列元数据(名称、属性)和消息会写入磁盘,RabbitMQ 重启后仍存在。false:队列存在于内存,重启后丢失。 + false, // 不自动删除 true:当最后一个消费者断开连接后,队列自动删除。 + false, // 非独占队列 + false, // 等待服务器确认 nil, ) if err != nil { - return errors.Wrap(err, "failed to declare queue") + return errors.Join(err, errors.New("failed to declare queue")) } // 绑定队列 @@ -88,7 +88,7 @@ func (c *Connection) Connect() error { nil, ) if err != nil { - return errors.Wrap(err, "failed to bind queue") + return errors.Join(err, errors.New("failed to bind queue")) } // 设置确认通道 @@ -118,8 +118,8 @@ func (c *Connection) Reconnect(ctx context.Context) { // 重连逻辑 for { if err := c.Connect(); err != nil { - log.ErrorF("Failed to reconnect: %v. Retrying in %v", err, c.config.ReconnectInterval) - time.Sleep(c.config.ReconnectInterval) + log.ErrorF("Failed to reconnect: %v. Retrying in %vs", err, c.config.ReconnectInterval) + time.Sleep(time.Duration(c.config.ReconnectInterval) * time.Second) continue } break diff --git a/rmq/consumer.go b/rmq/consumer.go index eed54b0..6542c79 100644 --- a/rmq/consumer.go +++ b/rmq/consumer.go @@ -2,8 +2,8 @@ package rmq import ( "context" + "errors" "github.com/fox/fox/log" - "github.com/pkg/errors" amqp "github.com/rabbitmq/amqp091-go" "time" ) @@ -39,7 +39,7 @@ func (c *Consumer) StartConsuming(ctx context.Context) error { nil, ) if err != nil { - return errors.Wrap(err, "failed to start consuming") + return errors.Join(err, errors.New("failed to start consuming")) } go c.consumeMessages(ctx, msgs) diff --git a/rmq/producer.go b/rmq/producer.go index 1c2b1a8..cfee1ca 100644 --- a/rmq/producer.go +++ b/rmq/producer.go @@ -2,8 +2,8 @@ package rmq import ( "context" + "errors" "github.com/fox/fox/log" - "github.com/pkg/errors" amqp "github.com/rabbitmq/amqp091-go" "time" ) @@ -59,7 +59,7 @@ func (p *Producer) publishWithRetry(ctx context.Context, body []byte) error { ) if err != nil { - return errors.Wrap(err, "failed to publish message") + return errors.Join(err, errors.New("failed to publish message")) } // 等待确认 diff --git a/rmq/rmq_docker.txt b/rmq/rmq_docker.txt new file mode 100644 index 0000000..09615a8 --- /dev/null +++ b/rmq/rmq_docker.txt @@ -0,0 +1,12 @@ +docker run -d \ + --name rabbitmq \ + --hostname my-rabbit-host \ # 设置固定主机名 + -p 5672:5672 \ # AMQP 协议端口 + -p 15672:15672 \ # 管理界面端口 + -v rabbitmq_data:/var/lib/rabbitmq \ # 持久化数据卷 + -e RABBITMQ_DEFAULT_USER=admin \ # 设置默认用户名 + -e RABBITMQ_DEFAULT_PASS=password \ # 设置默认密码 + rabbitmq:3-management # 使用带管理插件的镜像 + + + docker run -d --name rabbitmq --hostname my-rabbit-host -p 5672:5672 -p 15672:15672 -v rabbitmq_data:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management \ No newline at end of file diff --git a/rmq/rmq_test.go b/rmq/rmq_test.go index 57368ce..9197ea0 100644 --- a/rmq/rmq_test.go +++ b/rmq/rmq_test.go @@ -10,7 +10,7 @@ import ( "time" ) -//const url = "amqp://samba:samba@testbuild.shoa.com:5672/vh_samba" +//const url = "amqp://admin:password@114.132.124.145:5672/vh_game_dev" //const exchangeName = "test_e" //const queueName = "test_q" diff --git a/service/natsService.go b/service/natsService.go index 95f31af..370f8b3 100644 --- a/service/natsService.go +++ b/service/natsService.go @@ -8,8 +8,8 @@ import ( "github.com/fox/fox/log" "github.com/fox/fox/nat" "github.com/fox/fox/processor" - "github.com/golang/protobuf/proto" "github.com/nats-io/nats.go" + "google.golang.org/protobuf/proto" "os" "time" ) diff --git a/ws/wsConn.go b/ws/wsConn.go index 21e5477..54100ba 100644 --- a/ws/wsConn.go +++ b/ws/wsConn.go @@ -5,8 +5,8 @@ import ( "github.com/fox/fox/ipb" "github.com/fox/fox/log" "github.com/fox/fox/safeChan" - "github.com/golang/protobuf/proto" "github.com/gorilla/websocket" + "google.golang.org/protobuf/proto" "sync" "time" )