package nsq import ( "fmt" "github.com/fox/fox/log" "github.com/nsqio/go-nsq" "time" ) const ( chanLen = 20 // 消息队列容量 ) type chanState int const ( idle chanState = 0 busy chanState = 1 ) func (ch chanState) String() string { switch ch { case idle: return "idle" case busy: return "busy" default: return "unknown" } } type Consumer struct { name string addr string topic string channel string msgChan chan *nsq.Message consumer *nsq.Consumer state chanState } func newConsumer(nsqAddr, topic, channel string) (*Consumer, error) { var err error config := nsq.NewConfig() // 心跳设置 config.HeartbeatInterval = 30 * time.Second // 关键配置项 // config.DialTimeout = 10 * time.Second // TCP 连接超时(默认系统级) // config.ReadTimeout = 60 * time.Second // 读取数据超时 // config.WriteTimeout = 60 * time.Second // 写入数据超时 // // // 如果是通过 nsqlookupd 发现服务,调整 HTTP 客户端超时 // config.LookupdPollInterval = 5 * time.Second // 轮询间隔 // config.LookupdPollTimeout = 10 * time.Second // 单个 HTTP 请求超时 c := &Consumer{} c.addr = nsqAddr c.state = idle c.topic = topic c.channel = channel c.name = fmt.Sprintf("%v-%v", topic, channel) c.msgChan = make(chan *nsq.Message, chanLen) c.consumer, err = nsq.NewConsumer(topic, channel, config) if err != nil { return nil, err } c.consumer.AddHandler(c) c.consumer.SetLogger(newNSQLogger(nsq.LogLevelError), nsq.LogLevelError) return c, nil } // 创建消费者,直连NSQd发现服务 func NewConsumerByNsqD(nsqDAddr, topic, channel string) (*Consumer, error) { c, err := newConsumer(nsqDAddr, topic, channel) if err != nil { return nil, err } // 连接方式选择:直连NSQd if err = c.consumer.ConnectToNSQD(nsqDAddr); err != nil { return nil, err } log.DebugF("consumer %v-%v 注册成功", topic, channel) return c, err } // 创建消费者,连接NSQLookupD发现服务 func NewConsumer(lookupAddr, topic, channel string) (*Consumer, error) { c, err := newConsumer(lookupAddr, topic, channel) if err != nil { return nil, err } // 连接NSQLookupD发现服务 err = c.consumer.ConnectToNSQLookupd(c.addr) if err != nil { return nil, err } log.DebugF("consumer %v-%v 注册成功", topic, channel) return c, err } func (c *Consumer) HandleMessage(msg *nsq.Message) error { num := len(c.msgChan) if num > chanLen/2 && c.state == idle { c.state = busy log.WarnF("%v-%v 通道已从闲时转为繁忙状态。当前积压消息数:%v", c.topic, c.channel, len(c.msgChan)) } else if c.state == busy && num < chanLen/4 { c.state = idle log.WarnF("%v-%v 通道已从繁忙转为闲时状态。当前积压消息数:%v", c.topic, c.channel, len(c.msgChan)) } c.msgChan <- msg msg.Finish() return nil } func (c *Consumer) Name() string { return c.name } func (c *Consumer) Read() <-chan *nsq.Message { return c.msgChan } func (c *Consumer) Close() { if c.consumer != nil { c.consumer.Stop() } }