package service import ( "encoding/json" "github.com/fox/fox/etcd" "github.com/fox/fox/ipb" "github.com/fox/fox/log" "github.com/fox/fox/nat" "github.com/fox/fox/processor" "github.com/golang/protobuf/proto" "github.com/nats-io/nats.go" "os" "sync" ) const ( updateTopic = "update.service.topic" ) // 初始化服务所需要的参数 type InitNatsServiceParams struct { EtcdAddress []string EtcdUsername string EtcdPassword string NatsAddress []string ServiceType string // 本服务的服务类型 ServiceName string // 本服务的服务名,唯一索引 OnFunc IOnFunc // 包含初始化,停止,服务的消息处理 TypeId int // 与ServiceType一一对应 Version string } type NatsService struct { *BaseService nats *nat.Nats registry *etcd.Registry[etcd.ServiceNode] rpcProcessor *processor.Processor node etcd.ServiceNode // 本服务节点信息 } func NewNatsService(param *InitNatsServiceParams) (*NatsService, error) { var err error s := new(NatsService) s.BaseService = NewBaseService(param.ServiceType, param.ServiceName, param.OnFunc, s) if s.registry, err = etcd.NewRegistry[etcd.ServiceNode](param.EtcdAddress, param.EtcdUsername, param.EtcdPassword); err != nil { return nil, err } s.node = etcd.ServiceNode{ TypeId: param.TypeId, Name: s.Name(), Type: s.Type(), Version: param.Version, } if err = s.registry.Register(s.node); err != nil { return nil, err } s.registry.WatchServices() s.nats = nat.NewNats(param.ServiceName, param.NatsAddress...) if err = s.nats.Connect(); err != nil { s.registry.UnregisterService() return nil, err } // 订阅本服务名的topic if err = s.Subscribe(Topic(s)); err != nil { log.Error(err.Error()) } // 订阅广播服务上线 _ = s.subscribeUpdateService() s.publishUpdateService() // 订阅rpc回调 _ = s.subscribeRpc() return s, nil } func (n *NatsService) publishUpdateService() { jsonData, _ := json.Marshal(n.node) _ = n.nats.Publish(updateTopic, jsonData) } func (n *NatsService) subscribeUpdateService() error { return n.SubscribeCb(updateTopic, func(m *nats.Msg) { var node = &etcd.ServiceNode{} _ = json.Unmarshal(m.Data, node) // 不是同类服务不处理,是自己发出来的更新,也不处理 if node.Type != n.Type() || node.Name == n.Name() { return } // 有新服务上线,本服务准备退出 if n.node.Version < node.Version { n.NotifyStop() n.WaitStop() log.InfoF("%v auto exit, initiating shutdown...", n.Name()) os.Exit(0) } }) } func (n *NatsService) subscribeRpc() error { return n.SubscribeCb(RpcTopic(n), func(m *nats.Msg) { var iMsg = &ipb.InternalMsg{} _ = proto.Unmarshal(m.Data, iMsg) _ = n.rpcProcessor.Dispatch(iMsg.RetRpcMsgId, iMsg) }) } // 订阅回调 func (n *NatsService) SubscribeCb(topic string, cb func(m *nats.Msg)) error { return n.nats.SubscribeCb(topic, func(m *nats.Msg) { n.RunOnce(func() { cb(m) }) }) } func (n *NatsService) Subscribe(topic string) error { return n.nats.Subscribe(topic, n.msg) } // 队列订阅,队列中只会有一个消费者消费该消息 func (n *NatsService) QueueSubscribe(topic string, queue string) error { return n.nats.QueueSubscribe(topic, queue, n.msg) } func (s *NatsService) OnStop() { s.nats.Close() } func (s *NatsService) Send(topic string, msg *ipb.InternalMsg) error { data, _ := proto.Marshal(msg) return s.nats.Publish(topic, data) } func (s *NatsService) Call(rpcTopic string, msg *ipb.InternalMsg, cb func(msg *ipb.InternalMsg)) error { data, _ := proto.Marshal(ipb.MakeRpcMsg(msg.ServiceName, msg.ConnId, msg.UserId, msg.MsgId, msg.Msg)) err := s.nats.Publish(rpcTopic, data) if err != nil { return err } s.rpcProcessor.RegisterMessage(msg.RetRpcMsgId, ipb.InternalMsg{}, cb) return nil } func (s *NatsService) ServiceEtcd() *etcd.Registry[etcd.ServiceNode] { return s.registry } // 从etcd中获取所有服务节点 func (s *NatsService) GetServiceNodes() *sync.Map { return s.registry.GetNodes() } // 查找指定的服务节点信息 func (s *NatsService) FindServiceNode(serviceName string) (etcd.ServiceNode, error) { return s.registry.FindNode(serviceName) }