package nsq import ( "fmt" "github.com/fox/fox/log" "github.com/nsqio/go-nsq" "go.uber.org/zap" "testing" "time" ) const ( testTopic = "test_topic" testChannel = "test_channel" testChannelD = "test_channelD" testAddress = "192.168.232.128:4150" testLookupAddress = "192.168.232.128:4161" ) func initLog() { log.Open("test.log", log.DebugL) } func toString(id nsq.MessageID) string { sid := "" for _, b := range id { sid += string(b) } return sid } func testProducer() *Producer { // 初始化一个生产者 producer, err := NewProducer(testAddress) if err != nil { log.Fatal(err.Error()) return nil } for n := 0; n <= 20; n++ { err = producer.Publish(testTopic, []byte(fmt.Sprintf("hello nsq %d", n))) if err != nil { log.Fatal(err.Error()) return nil } // time.Sleep(1 * time.Second) } return producer } func testConsumers() { consumer, err := NewConsumerByNsqD(testAddress, testTopic, testChannel) if err != nil { log.Fatal(err.Error()) return } defer consumer.Close() time.Sleep(2 * time.Second) for { select { case msg := <-consumer.Read(): log.Debug(consumer.Name(), zap.String("id", toString(msg.ID)), zap.String("body", string(msg.Body))) } } } func testConsumerByLookupD() { consumer, err := NewConsumer(testLookupAddress, testTopic, testChannelD) if err != nil { log.Fatal(err.Error()) return } defer consumer.Close() time.Sleep(2 * time.Second) for { select { case msg := <-consumer.Read(): log.Debug(consumer.name, zap.String("id", toString(msg.ID)), zap.String("body", string(msg.Body))) } } } func TestNsq(t *testing.T) { initLog() // go testConsumers() // go testConsumerByLookupD() producer := testProducer() if producer != nil { defer producer.Close() } time.Sleep(5 * time.Second) log.Debug("shutdown") }