94 lines
1.8 KiB
Go
94 lines
1.8 KiB
Go
package nsq
|
|
|
|
import (
|
|
"fmt"
|
|
"github.com/fox/fox/log"
|
|
"github.com/nsqio/go-nsq"
|
|
"go.uber.org/zap"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
testTopic = "test_topic"
|
|
testChannel = "test_channel"
|
|
testChannelD = "test_channelD"
|
|
testAddress = "192.168.232.128:4150"
|
|
testLookupAddress = "192.168.232.128:4161"
|
|
)
|
|
|
|
func initLog() {
|
|
log.Open("test.log", log.DebugL)
|
|
}
|
|
|
|
func toString(id nsq.MessageID) string {
|
|
sid := ""
|
|
for _, b := range id {
|
|
sid += string(b)
|
|
}
|
|
return sid
|
|
}
|
|
|
|
func testProducer() *Producer {
|
|
// 初始化一个生产者
|
|
producer, err := NewProducer(testAddress)
|
|
if err != nil {
|
|
log.Fatal(err.Error())
|
|
return nil
|
|
}
|
|
|
|
for n := 0; n <= 20; n++ {
|
|
err = producer.Publish(testTopic, []byte(fmt.Sprintf("hello nsq %d", n)))
|
|
if err != nil {
|
|
log.Fatal(err.Error())
|
|
return nil
|
|
}
|
|
// time.Sleep(1 * time.Second)
|
|
}
|
|
return producer
|
|
}
|
|
|
|
func testConsumers() {
|
|
consumer, err := NewConsumerByNsqD(testAddress, testTopic, testChannel)
|
|
if err != nil {
|
|
log.Fatal(err.Error())
|
|
return
|
|
}
|
|
defer consumer.Close()
|
|
time.Sleep(2 * time.Second)
|
|
for {
|
|
select {
|
|
case msg := <-consumer.Read():
|
|
log.Debug(consumer.Name(), zap.String("id", toString(msg.ID)), zap.String("body", string(msg.Body)))
|
|
}
|
|
}
|
|
}
|
|
|
|
func testConsumerByLookupD() {
|
|
consumer, err := NewConsumer(testLookupAddress, testTopic, testChannelD)
|
|
if err != nil {
|
|
log.Fatal(err.Error())
|
|
return
|
|
}
|
|
defer consumer.Close()
|
|
time.Sleep(2 * time.Second)
|
|
for {
|
|
select {
|
|
case msg := <-consumer.Read():
|
|
log.Debug(consumer.name, zap.String("id", toString(msg.ID)), zap.String("body", string(msg.Body)))
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestNsq(t *testing.T) {
|
|
initLog()
|
|
// go testConsumers()
|
|
// go testConsumerByLookupD()
|
|
producer := testProducer()
|
|
if producer != nil {
|
|
defer producer.Close()
|
|
}
|
|
time.Sleep(5 * time.Second)
|
|
log.Debug("shutdown")
|
|
}
|