package nat import ( "fmt" "github.com/fox/fox/ksync" "github.com/fox/fox/log" "github.com/fox/fox/safeChan" "sync/atomic" "testing" "time" ) const ( nats1 = "nats://192.168.232.128:4222" nats2 = "nats://114.132.124.145:4222" ) func initLog() { _ = nats1 _ = nats2 log.Open("test.log", log.DebugL) } /* docker pull nats:latest docker run -d --name my-nats -p 4222:4222 -p 8222:8222 nats */ func TestNats(t *testing.T) { initLog() n := NewNats("test", nats2) if err := n.Connect(); err != nil { t.Log(err) return } topic := "test.topic" for i := 0; i < 2; i++ { msgChan := safeChan.NewSafeChan[[]byte](128) if err := n.Subscribe(topic, msgChan); err != nil { t.Log(err) return } ksync.GoSafe(func() { for { select { case msg, ok := <-msgChan.Reader(): if !ok { return } t.Log("consumer:", i, string(msg)) } } }, nil) } count := 0 for { count++ _ = n.Publish(topic, []byte(fmt.Sprintf("hello nats:%v", count))) if count > 4 { break } } time.Sleep(3 * time.Second) n.Close() } func TestQueue(t *testing.T) { initLog() n := NewNats("test", "nats://192.168.232.128:4222") if err := n.Connect(); err != nil { t.Log(err) return } topic := "test.group" queue := "test.queue" count2 := int32(0) for i := 0; i < 3; i++ { msgChan := safeChan.NewSafeChan[[]byte](128) if err := n.QueueSubscribe(topic, queue, msgChan); err != nil { t.Log(err) return } ksync.GoSafe(func() { for { select { case msg, ok := <-msgChan.Reader(): if !ok { return } _ = atomic.AddInt32(&count2, 1) _ = msg // t.Log("consumer:", i, string(msg)) } } }, nil) } count := int32(0) for { count++ _ = n.Publish(topic, []byte(fmt.Sprintf("hello nats:%v", count))) if count > 900000 { break } } time.Sleep(10 * time.Second) c := atomic.LoadInt32(&count2) if c == count { t.Log("count==count2==", c) } else { t.Log("count:", count, " count2:", count2) } n.Close() }