fox/nsq/producer.go
2025-05-25 20:02:15 +08:00

77 lines
1.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package nsq
import (
"errors"
"fmt"
"github.com/nsqio/go-nsq"
"time"
)
type Producer struct {
nsqAddr []string
config *nsq.Config
producer *nsq.Producer
size int
pos int
}
func initProducer(p *Producer, nsqAddr string) error {
var err error
p.producer, err = nsq.NewProducer(nsqAddr, p.config)
if err != nil {
return err
}
p.producer.SetLogger(newNSQLogger(nsq.LogLevelError), nsq.LogLevelError)
return nil
}
// 创建建生产者 nsqAddr:127.0.0.1:4150, 127.0.0.2:4150
func NewProducer(nsqAddr ...string) (*Producer, error) {
var err error
p := new(Producer)
p.config = nsq.NewConfig()
// 心跳设置
p.config.HeartbeatInterval = 30 * time.Second
p.nsqAddr = nsqAddr
p.size = len(nsqAddr)
p.pos = 0
if len(nsqAddr) == 0 {
return nil, errors.New("nsqAddr is empty")
}
err = initProducer(p, p.nsqAddr[0])
return p, err
}
// 向主题发布消息
func (p *Producer) Publish(topic string, data []byte) error {
var err error
for {
if p.producer != nil {
err = p.producer.Publish(topic, data)
if err == nil {
break
}
// 切换ip重发
p.producer.Stop()
}
p.pos = p.pos + 1
if p.pos < p.size {
err = initProducer(p, p.nsqAddr[p.pos])
} else {
p.pos = 0
return fmt.Errorf("连接nsq%v 失败", p.nsqAddr)
}
}
return nil
}
func (p *Producer) Close() {
if p.producer != nil {
p.producer.Stop()
p.producer = nil
}
}