fox/nsq/consumer.go

129 lines
3.0 KiB
Go
Raw Permalink Normal View History

2025-05-25 20:02:15 +08:00
package nsq
import (
"fmt"
"github.com/fox/fox/log"
"github.com/nsqio/go-nsq"
"time"
)
const (
chanLen = 20 // 消息队列容量
)
type chanState int
const (
idle chanState = 0
busy chanState = 1
)
func (ch chanState) String() string {
switch ch {
case idle:
return "idle"
case busy:
return "busy"
default:
return "unknown"
}
}
type Consumer struct {
name string
addr string
topic string
channel string
msgChan chan *nsq.Message
consumer *nsq.Consumer
state chanState
}
func newConsumer(nsqAddr, topic, channel string) (*Consumer, error) {
var err error
config := nsq.NewConfig()
// 心跳设置
config.HeartbeatInterval = 30 * time.Second
// 关键配置项
// config.DialTimeout = 10 * time.Second // TCP 连接超时(默认系统级)
// config.ReadTimeout = 60 * time.Second // 读取数据超时
// config.WriteTimeout = 60 * time.Second // 写入数据超时
//
// // 如果是通过 nsqlookupd 发现服务,调整 HTTP 客户端超时
// config.LookupdPollInterval = 5 * time.Second // 轮询间隔
// config.LookupdPollTimeout = 10 * time.Second // 单个 HTTP 请求超时
c := &Consumer{}
c.addr = nsqAddr
c.state = idle
c.topic = topic
c.channel = channel
c.name = fmt.Sprintf("%v-%v", topic, channel)
c.msgChan = make(chan *nsq.Message, chanLen)
c.consumer, err = nsq.NewConsumer(topic, channel, config)
if err != nil {
return nil, err
}
c.consumer.AddHandler(c)
c.consumer.SetLogger(newNSQLogger(nsq.LogLevelError), nsq.LogLevelError)
return c, nil
}
// 创建消费者直连NSQd发现服务
func NewConsumerByNsqD(nsqDAddr, topic, channel string) (*Consumer, error) {
c, err := newConsumer(nsqDAddr, topic, channel)
if err != nil {
return nil, err
}
// 连接方式选择直连NSQd
if err = c.consumer.ConnectToNSQD(nsqDAddr); err != nil {
return nil, err
}
log.DebugF("consumer %v-%v 注册成功", topic, channel)
return c, err
}
// 创建消费者连接NSQLookupD发现服务
func NewConsumer(lookupAddr, topic, channel string) (*Consumer, error) {
c, err := newConsumer(lookupAddr, topic, channel)
if err != nil {
return nil, err
}
// 连接NSQLookupD发现服务
err = c.consumer.ConnectToNSQLookupd(c.addr)
if err != nil {
return nil, err
}
log.DebugF("consumer %v-%v 注册成功", topic, channel)
return c, err
}
func (c *Consumer) HandleMessage(msg *nsq.Message) error {
num := len(c.msgChan)
if num > chanLen/2 && c.state == idle {
c.state = busy
log.WarnF("%v-%v 通道已从闲时转为繁忙状态。当前积压消息数:%v", c.topic, c.channel, len(c.msgChan))
} else if c.state == busy && num < chanLen/4 {
c.state = idle
log.WarnF("%v-%v 通道已从繁忙转为闲时状态。当前积压消息数:%v", c.topic, c.channel, len(c.msgChan))
}
c.msgChan <- msg
msg.Finish()
return nil
}
func (c *Consumer) Name() string {
return c.name
}
func (c *Consumer) Read() <-chan *nsq.Message {
return c.msgChan
}
func (c *Consumer) Close() {
if c.consumer != nil {
c.consumer.Stop()
}
}