fox/nsq/nsq_test.go
2025-05-25 20:02:15 +08:00

94 lines
1.8 KiB
Go

package nsq
import (
"fmt"
"github.com/fox/fox/log"
"github.com/nsqio/go-nsq"
"go.uber.org/zap"
"testing"
"time"
)
const (
testTopic = "test_topic"
testChannel = "test_channel"
testChannelD = "test_channelD"
testAddress = "192.168.232.128:4150"
testLookupAddress = "192.168.232.128:4161"
)
func initLog() {
log.Open("test.log", log.DebugL)
}
func toString(id nsq.MessageID) string {
sid := ""
for _, b := range id {
sid += string(b)
}
return sid
}
func testProducer() *Producer {
// 初始化一个生产者
producer, err := NewProducer(testAddress)
if err != nil {
log.Fatal(err.Error())
return nil
}
for n := 0; n <= 20; n++ {
err = producer.Publish(testTopic, []byte(fmt.Sprintf("hello nsq %d", n)))
if err != nil {
log.Fatal(err.Error())
return nil
}
// time.Sleep(1 * time.Second)
}
return producer
}
func testConsumers() {
consumer, err := NewConsumerByNsqD(testAddress, testTopic, testChannel)
if err != nil {
log.Fatal(err.Error())
return
}
defer consumer.Close()
time.Sleep(2 * time.Second)
for {
select {
case msg := <-consumer.Read():
log.Debug(consumer.Name(), zap.String("id", toString(msg.ID)), zap.String("body", string(msg.Body)))
}
}
}
func testConsumerByLookupD() {
consumer, err := NewConsumer(testLookupAddress, testTopic, testChannelD)
if err != nil {
log.Fatal(err.Error())
return
}
defer consumer.Close()
time.Sleep(2 * time.Second)
for {
select {
case msg := <-consumer.Read():
log.Debug(consumer.name, zap.String("id", toString(msg.ID)), zap.String("body", string(msg.Body)))
}
}
}
func TestNsq(t *testing.T) {
initLog()
// go testConsumers()
// go testConsumerByLookupD()
producer := testProducer()
if producer != nil {
defer producer.Close()
}
time.Sleep(5 * time.Second)
log.Debug("shutdown")
}