118 lines
2.0 KiB
Go
118 lines
2.0 KiB
Go
![]() |
package nat
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"github.com/fox/fox/ksync"
|
||
|
"github.com/fox/fox/log"
|
||
|
"github.com/fox/fox/safeChan"
|
||
|
"sync/atomic"
|
||
|
"testing"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
nats1 = "nats://192.168.232.128:4222"
|
||
|
nats2 = "nats://114.132.124.145:4222"
|
||
|
)
|
||
|
|
||
|
func initLog() {
|
||
|
_ = nats1
|
||
|
_ = nats2
|
||
|
log.Open("test.log", log.DebugL)
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
docker pull nats:latest
|
||
|
docker run -d --name my-nats -p 4222:4222 -p 8222:8222 nats
|
||
|
*/
|
||
|
func TestNats(t *testing.T) {
|
||
|
initLog()
|
||
|
n := NewNats("test", nats2)
|
||
|
if err := n.Connect(); err != nil {
|
||
|
t.Log(err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
topic := "test.topic"
|
||
|
for i := 0; i < 2; i++ {
|
||
|
msgChan := safeChan.NewSafeChan[[]byte](128)
|
||
|
if err := n.Subscribe(topic, msgChan); err != nil {
|
||
|
t.Log(err)
|
||
|
return
|
||
|
}
|
||
|
ksync.GoSafe(func() {
|
||
|
for {
|
||
|
select {
|
||
|
case msg, ok := <-msgChan.Reader():
|
||
|
if !ok {
|
||
|
return
|
||
|
}
|
||
|
t.Log("consumer:", i, string(msg))
|
||
|
}
|
||
|
}
|
||
|
}, nil)
|
||
|
}
|
||
|
count := 0
|
||
|
for {
|
||
|
count++
|
||
|
_ = n.Publish(topic, []byte(fmt.Sprintf("hello nats:%v", count)))
|
||
|
if count > 4 {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
time.Sleep(3 * time.Second)
|
||
|
n.Close()
|
||
|
}
|
||
|
|
||
|
func TestQueue(t *testing.T) {
|
||
|
initLog()
|
||
|
n := NewNats("test", "nats://192.168.232.128:4222")
|
||
|
if err := n.Connect(); err != nil {
|
||
|
t.Log(err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
topic := "test.group"
|
||
|
queue := "test.queue"
|
||
|
|
||
|
count2 := int32(0)
|
||
|
for i := 0; i < 3; i++ {
|
||
|
msgChan := safeChan.NewSafeChan[[]byte](128)
|
||
|
if err := n.QueueSubscribe(topic, queue, msgChan); err != nil {
|
||
|
t.Log(err)
|
||
|
return
|
||
|
}
|
||
|
ksync.GoSafe(func() {
|
||
|
for {
|
||
|
select {
|
||
|
case msg, ok := <-msgChan.Reader():
|
||
|
if !ok {
|
||
|
return
|
||
|
}
|
||
|
_ = atomic.AddInt32(&count2, 1)
|
||
|
_ = msg
|
||
|
// t.Log("consumer:", i, string(msg))
|
||
|
}
|
||
|
}
|
||
|
}, nil)
|
||
|
}
|
||
|
|
||
|
count := int32(0)
|
||
|
for {
|
||
|
count++
|
||
|
_ = n.Publish(topic, []byte(fmt.Sprintf("hello nats:%v", count)))
|
||
|
if count > 900000 {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
time.Sleep(10 * time.Second)
|
||
|
c := atomic.LoadInt32(&count2)
|
||
|
if c == count {
|
||
|
t.Log("count==count2==", c)
|
||
|
} else {
|
||
|
t.Log("count:", count, " count2:", count2)
|
||
|
}
|
||
|
|
||
|
n.Close()
|
||
|
}
|