fox/nsq/producer.go

77 lines
1.4 KiB
Go
Raw Normal View History

2025-05-25 20:02:15 +08:00
package nsq
import (
"errors"
"fmt"
"github.com/nsqio/go-nsq"
"time"
)
type Producer struct {
nsqAddr []string
config *nsq.Config
producer *nsq.Producer
size int
pos int
}
func initProducer(p *Producer, nsqAddr string) error {
var err error
p.producer, err = nsq.NewProducer(nsqAddr, p.config)
if err != nil {
return err
}
p.producer.SetLogger(newNSQLogger(nsq.LogLevelError), nsq.LogLevelError)
return nil
}
// 创建建生产者 nsqAddr:127.0.0.1:4150, 127.0.0.2:4150
func NewProducer(nsqAddr ...string) (*Producer, error) {
var err error
p := new(Producer)
p.config = nsq.NewConfig()
// 心跳设置
p.config.HeartbeatInterval = 30 * time.Second
p.nsqAddr = nsqAddr
p.size = len(nsqAddr)
p.pos = 0
if len(nsqAddr) == 0 {
return nil, errors.New("nsqAddr is empty")
}
err = initProducer(p, p.nsqAddr[0])
return p, err
}
// 向主题发布消息
func (p *Producer) Publish(topic string, data []byte) error {
var err error
for {
if p.producer != nil {
err = p.producer.Publish(topic, data)
if err == nil {
break
}
// 切换ip重发
p.producer.Stop()
}
p.pos = p.pos + 1
if p.pos < p.size {
err = initProducer(p, p.nsqAddr[p.pos])
} else {
p.pos = 0
return fmt.Errorf("连接nsq%v 失败", p.nsqAddr)
}
}
return nil
}
func (p *Producer) Close() {
if p.producer != nil {
p.producer.Stop()
p.producer = nil
}
}