fox/service/natsService.go

49 lines
1.1 KiB
Go
Raw Normal View History

2025-05-25 20:02:15 +08:00
package service
import (
"fmt"
"github.com/fox/fox/nat"
"time"
)
type NatsService struct {
*BaseService
nats *nat.Nats
}
func NewNatsService(type_, name string, onFunc IOnFunc, natsAddress ...string) (*NatsService, error) {
s := new(NatsService)
s.BaseService = NewBaseService(type_, name, onFunc, s)
s.nats = nat.NewNats(fmt.Sprintf("%v-%v", type_, name), natsAddress...)
if err := s.nats.Connect(); err != nil {
// log.Error(err.Error())
s.BaseService.NotifyStop()
return nil, err
}
return s, nil
}
func (n *NatsService) Subscribe(topic string) error {
return n.nats.Subscribe(topic, n.msg)
}
// 队列订阅,队列中只会有一个消费者消费该消息
func (n *NatsService) QueueSubscribe(topic string, queue string) error {
return n.nats.QueueSubscribe(topic, queue, n.msg)
}
func (s *NatsService) OnStop() {
s.nats.Close()
}
func (s *NatsService) Send(topic string, msg []byte) error {
return s.nats.Publish(topic, msg)
}
func (s *NatsService) Call(topic string, timeout time.Duration, msg []byte) ([]byte, error) {
_ = topic
_ = timeout
_ = msg
return nil, nil
}