fox/nsq/consumer.go
2025-05-25 20:02:15 +08:00

129 lines
3.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package nsq
import (
"fmt"
"github.com/fox/fox/log"
"github.com/nsqio/go-nsq"
"time"
)
const (
chanLen = 20 // 消息队列容量
)
type chanState int
const (
idle chanState = 0
busy chanState = 1
)
func (ch chanState) String() string {
switch ch {
case idle:
return "idle"
case busy:
return "busy"
default:
return "unknown"
}
}
type Consumer struct {
name string
addr string
topic string
channel string
msgChan chan *nsq.Message
consumer *nsq.Consumer
state chanState
}
func newConsumer(nsqAddr, topic, channel string) (*Consumer, error) {
var err error
config := nsq.NewConfig()
// 心跳设置
config.HeartbeatInterval = 30 * time.Second
// 关键配置项
// config.DialTimeout = 10 * time.Second // TCP 连接超时(默认系统级)
// config.ReadTimeout = 60 * time.Second // 读取数据超时
// config.WriteTimeout = 60 * time.Second // 写入数据超时
//
// // 如果是通过 nsqlookupd 发现服务,调整 HTTP 客户端超时
// config.LookupdPollInterval = 5 * time.Second // 轮询间隔
// config.LookupdPollTimeout = 10 * time.Second // 单个 HTTP 请求超时
c := &Consumer{}
c.addr = nsqAddr
c.state = idle
c.topic = topic
c.channel = channel
c.name = fmt.Sprintf("%v-%v", topic, channel)
c.msgChan = make(chan *nsq.Message, chanLen)
c.consumer, err = nsq.NewConsumer(topic, channel, config)
if err != nil {
return nil, err
}
c.consumer.AddHandler(c)
c.consumer.SetLogger(newNSQLogger(nsq.LogLevelError), nsq.LogLevelError)
return c, nil
}
// 创建消费者直连NSQd发现服务
func NewConsumerByNsqD(nsqDAddr, topic, channel string) (*Consumer, error) {
c, err := newConsumer(nsqDAddr, topic, channel)
if err != nil {
return nil, err
}
// 连接方式选择直连NSQd
if err = c.consumer.ConnectToNSQD(nsqDAddr); err != nil {
return nil, err
}
log.DebugF("consumer %v-%v 注册成功", topic, channel)
return c, err
}
// 创建消费者连接NSQLookupD发现服务
func NewConsumer(lookupAddr, topic, channel string) (*Consumer, error) {
c, err := newConsumer(lookupAddr, topic, channel)
if err != nil {
return nil, err
}
// 连接NSQLookupD发现服务
err = c.consumer.ConnectToNSQLookupd(c.addr)
if err != nil {
return nil, err
}
log.DebugF("consumer %v-%v 注册成功", topic, channel)
return c, err
}
func (c *Consumer) HandleMessage(msg *nsq.Message) error {
num := len(c.msgChan)
if num > chanLen/2 && c.state == idle {
c.state = busy
log.WarnF("%v-%v 通道已从闲时转为繁忙状态。当前积压消息数:%v", c.topic, c.channel, len(c.msgChan))
} else if c.state == busy && num < chanLen/4 {
c.state = idle
log.WarnF("%v-%v 通道已从繁忙转为闲时状态。当前积压消息数:%v", c.topic, c.channel, len(c.msgChan))
}
c.msgChan <- msg
msg.Finish()
return nil
}
func (c *Consumer) Name() string {
return c.name
}
func (c *Consumer) Read() <-chan *nsq.Message {
return c.msgChan
}
func (c *Consumer) Close() {
if c.consumer != nil {
c.consumer.Stop()
}
}