129 lines
3.0 KiB
Go
129 lines
3.0 KiB
Go
![]() |
package nsq
|
|||
|
|
|||
|
import (
|
|||
|
"fmt"
|
|||
|
"github.com/fox/fox/log"
|
|||
|
"github.com/nsqio/go-nsq"
|
|||
|
"time"
|
|||
|
)
|
|||
|
|
|||
|
const (
|
|||
|
chanLen = 20 // 消息队列容量
|
|||
|
)
|
|||
|
|
|||
|
type chanState int
|
|||
|
|
|||
|
const (
|
|||
|
idle chanState = 0
|
|||
|
busy chanState = 1
|
|||
|
)
|
|||
|
|
|||
|
func (ch chanState) String() string {
|
|||
|
switch ch {
|
|||
|
case idle:
|
|||
|
return "idle"
|
|||
|
case busy:
|
|||
|
return "busy"
|
|||
|
default:
|
|||
|
return "unknown"
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
type Consumer struct {
|
|||
|
name string
|
|||
|
addr string
|
|||
|
topic string
|
|||
|
channel string
|
|||
|
msgChan chan *nsq.Message
|
|||
|
consumer *nsq.Consumer
|
|||
|
state chanState
|
|||
|
}
|
|||
|
|
|||
|
func newConsumer(nsqAddr, topic, channel string) (*Consumer, error) {
|
|||
|
var err error
|
|||
|
config := nsq.NewConfig()
|
|||
|
// 心跳设置
|
|||
|
config.HeartbeatInterval = 30 * time.Second
|
|||
|
// 关键配置项
|
|||
|
// config.DialTimeout = 10 * time.Second // TCP 连接超时(默认系统级)
|
|||
|
// config.ReadTimeout = 60 * time.Second // 读取数据超时
|
|||
|
// config.WriteTimeout = 60 * time.Second // 写入数据超时
|
|||
|
//
|
|||
|
// // 如果是通过 nsqlookupd 发现服务,调整 HTTP 客户端超时
|
|||
|
// config.LookupdPollInterval = 5 * time.Second // 轮询间隔
|
|||
|
// config.LookupdPollTimeout = 10 * time.Second // 单个 HTTP 请求超时
|
|||
|
|
|||
|
c := &Consumer{}
|
|||
|
c.addr = nsqAddr
|
|||
|
c.state = idle
|
|||
|
c.topic = topic
|
|||
|
c.channel = channel
|
|||
|
c.name = fmt.Sprintf("%v-%v", topic, channel)
|
|||
|
c.msgChan = make(chan *nsq.Message, chanLen)
|
|||
|
c.consumer, err = nsq.NewConsumer(topic, channel, config)
|
|||
|
if err != nil {
|
|||
|
return nil, err
|
|||
|
}
|
|||
|
c.consumer.AddHandler(c)
|
|||
|
|
|||
|
c.consumer.SetLogger(newNSQLogger(nsq.LogLevelError), nsq.LogLevelError)
|
|||
|
return c, nil
|
|||
|
}
|
|||
|
|
|||
|
// 创建消费者,直连NSQd发现服务
|
|||
|
func NewConsumerByNsqD(nsqDAddr, topic, channel string) (*Consumer, error) {
|
|||
|
c, err := newConsumer(nsqDAddr, topic, channel)
|
|||
|
if err != nil {
|
|||
|
return nil, err
|
|||
|
}
|
|||
|
// 连接方式选择:直连NSQd
|
|||
|
if err = c.consumer.ConnectToNSQD(nsqDAddr); err != nil {
|
|||
|
return nil, err
|
|||
|
}
|
|||
|
log.DebugF("consumer %v-%v 注册成功", topic, channel)
|
|||
|
return c, err
|
|||
|
}
|
|||
|
|
|||
|
// 创建消费者,连接NSQLookupD发现服务
|
|||
|
func NewConsumer(lookupAddr, topic, channel string) (*Consumer, error) {
|
|||
|
c, err := newConsumer(lookupAddr, topic, channel)
|
|||
|
if err != nil {
|
|||
|
return nil, err
|
|||
|
}
|
|||
|
// 连接NSQLookupD发现服务
|
|||
|
err = c.consumer.ConnectToNSQLookupd(c.addr)
|
|||
|
if err != nil {
|
|||
|
return nil, err
|
|||
|
}
|
|||
|
log.DebugF("consumer %v-%v 注册成功", topic, channel)
|
|||
|
return c, err
|
|||
|
}
|
|||
|
|
|||
|
func (c *Consumer) HandleMessage(msg *nsq.Message) error {
|
|||
|
num := len(c.msgChan)
|
|||
|
if num > chanLen/2 && c.state == idle {
|
|||
|
c.state = busy
|
|||
|
log.WarnF("%v-%v 通道已从闲时转为繁忙状态。当前积压消息数:%v", c.topic, c.channel, len(c.msgChan))
|
|||
|
} else if c.state == busy && num < chanLen/4 {
|
|||
|
c.state = idle
|
|||
|
log.WarnF("%v-%v 通道已从繁忙转为闲时状态。当前积压消息数:%v", c.topic, c.channel, len(c.msgChan))
|
|||
|
}
|
|||
|
c.msgChan <- msg
|
|||
|
msg.Finish()
|
|||
|
return nil
|
|||
|
}
|
|||
|
|
|||
|
func (c *Consumer) Name() string {
|
|||
|
return c.name
|
|||
|
}
|
|||
|
|
|||
|
func (c *Consumer) Read() <-chan *nsq.Message {
|
|||
|
return c.msgChan
|
|||
|
}
|
|||
|
|
|||
|
func (c *Consumer) Close() {
|
|||
|
if c.consumer != nil {
|
|||
|
c.consumer.Stop()
|
|||
|
}
|
|||
|
}
|