diff --git a/service/natsService.go b/service/natsService.go index b08959c..5070994 100644 --- a/service/natsService.go +++ b/service/natsService.go @@ -52,6 +52,8 @@ func NewNatsService(param *InitNatsServiceParams) (*NatsService, error) { return nil, err } s.registry.WatchServices() + // 广播服务上线 + s.publishUpdateService() s.nats = nat.NewNats(param.ServiceName, param.NatsAddress...) if err = s.nats.Connect(); err != nil { @@ -65,6 +67,11 @@ func NewNatsService(param *InitNatsServiceParams) (*NatsService, error) { return s, nil } +func (n *NatsService) publishUpdateService() { + jsonData, _ := json.Marshal(n.node) + _ = n.nats.Publish(updateTopic, jsonData) +} + func (n *NatsService) subscribeUpdateService() error { return n.nats.SubscribeCb(updateTopic, func(m *nats.Msg) { var node = &etcd.ServiceNode{} @@ -83,6 +90,11 @@ func (n *NatsService) subscribeUpdateService() error { }) } +// 订阅回调,极少用。调用方自行保证并发性问题 +func (n *NatsService) SubscribeCb(topic string, cb func(m *nats.Msg)) error { + return n.nats.SubscribeCb(topic, cb) +} + func (n *NatsService) Subscribe(topic string) error { return n.nats.Subscribe(topic, n.msg) } diff --git a/ws/wsServer.go b/ws/wsServer.go index 7afa4be..0df0f73 100644 --- a/ws/wsServer.go +++ b/ws/wsServer.go @@ -83,3 +83,7 @@ func (s *WsServer) FindConnByUserId(userId int64) (IConn, bool) { func (s *WsServer) FindConnByConnId(connId uint32) (IConn, bool) { return wsMgr.Get(connId) } + +func (s *WsServer) Rang(cb func(conn IConn) bool) { + wsMgr.Rang(cb) +}