package service import ( "encoding/json" "github.com/fox/fox/etcd" "github.com/fox/fox/ipb" "github.com/fox/fox/log" "github.com/fox/fox/nat" "github.com/fox/fox/processor" "github.com/golang/protobuf/proto" "github.com/nats-io/nats.go" "os" "sync" "time" ) const ( updateTopic = "update.service.topic" ) // 初始化服务所需要的参数 type InitNatsServiceParams struct { EtcdAddress []string EtcdUsername string EtcdPassword string NatsAddress []string ServiceType string // 本服务的服务类型 ServiceName string // 本服务的服务名,唯一索引 OnFunc IOnFunc // 包含初始化,停止,服务的消息处理 TypeId int // 与ServiceType一一对应 Version string } type NatsService struct { *BaseService nats *nat.Nats registry *etcd.Registry[etcd.ServiceNode] RpcProcessor *processor.RpcProcessor node etcd.ServiceNode // 本服务节点信息 } func NewNatsService(param *InitNatsServiceParams) (*NatsService, error) { var err error s := new(NatsService) s.BaseService = NewBaseService(param.ServiceType, param.ServiceName, param.OnFunc, s) if s.registry, err = etcd.NewRegistry[etcd.ServiceNode](param.EtcdAddress, param.EtcdUsername, param.EtcdPassword); err != nil { return nil, err } s.RpcProcessor = processor.NewRpcProcessor() s.node = etcd.ServiceNode{ TypeId: param.TypeId, Name: s.Name(), Type: s.Type(), Version: param.Version, } if err = s.registry.Register(s.node); err != nil { return nil, err } s.registry.WatchServices() s.nats = nat.NewNats(param.ServiceName, param.NatsAddress...) if err = s.nats.Connect(); err != nil { s.registry.UnregisterService() return nil, err } // 订阅本服务名的topic if err = s.Subscribe(Topic(s)); err != nil { log.Error(err.Error()) } // 订阅广播服务上线 _ = s.subscribeUpdateService() s.publishUpdateService() // 订阅rpc回调 _ = s.subscribeRpc() return s, nil } func (n *NatsService) publishUpdateService() { jsonData, _ := json.Marshal(n.node) _ = n.nats.Publish(updateTopic, jsonData) //log.Debug(n.Log("publishUpdateService:%v", string(jsonData))) } func (n *NatsService) subscribeUpdateService() error { return n.SubscribeCb(updateTopic, func(m *nats.Msg) { var node = &etcd.ServiceNode{} _ = json.Unmarshal(m.Data, node) //log.Debug(n.Log("发现新节点:%v", string(m.Data))) // 不是同类服务不处理,是自己发出来的更新,也不处理 if node.Type != n.Type() || node.Name == n.Name() { //log.Debug(n.Log("与本节点不匹配.本节点:%v", n.node)) return } // 有新服务上线,本服务准备退出 if n.node.Version < node.Version { log.InfoF(n.Log("有新服务:%v,版本:%v.本服务即将关闭", node.Name, node.Version)) n.NotifyStop() //n.WaitStop() os.Exit(0) } }) } func (n *NatsService) subscribeRpc() error { return n.SubscribeCb(RpcTopic(n), func(m *nats.Msg) { var iMsg = &ipb.InternalMsg{} _ = proto.Unmarshal(m.Data, iMsg) rsp, err := n.RpcProcessor.Dispatch(iMsg.RpcMsgId, iMsg) if err != nil { log.Error(err.Error()) return } rspData, _ := proto.Marshal(rsp) _ = m.Respond(rspData) }) } // 订阅回调 cb会切回service协程执行,避免在cb中有阻塞式请求 func (n *NatsService) SubscribeCb(topic string, cb func(m *nats.Msg)) error { return n.nats.SubscribeCb(topic, func(m *nats.Msg) { n.RunOnce(func() { cb(m) }) }) } func (n *NatsService) Subscribe(topic string) error { return n.nats.Subscribe(topic, n.msg) } // 队列订阅,队列中只会有一个消费者消费该消息 func (n *NatsService) QueueSubscribe(topic string, queue string) error { return n.nats.QueueSubscribe(topic, queue, n.msg) } func (s *NatsService) OnStop() { s.nats.Close() } func (s *NatsService) Send(topic string, msg *ipb.InternalMsg) error { data, _ := proto.Marshal(msg) return s.nats.Publish(topic, data) } // call会阻塞主协程,需要起新协程去等待返回值,业务逻辑则转回主协程处理。 // 不要将处理返回值的业务逻辑放到新协程,避免业务逻辑中有数据并发问题 // 比如并发call拉用户数据,然后操作本地的user map func (s *NatsService) Call(rpcTopic string, timeout time.Duration, msg *ipb.InternalMsg) (*ipb.InternalMsg, error) { data, _ := proto.Marshal(msg) response, err := s.nats.Rpc(rpcTopic, timeout, data) if err != nil { return nil, err } rspMsg := &ipb.InternalMsg{} _ = proto.Unmarshal(response, rspMsg) return rspMsg, nil } func (s *NatsService) ServiceEtcd() *etcd.Registry[etcd.ServiceNode] { return s.registry } // 从etcd中获取所有服务节点 func (s *NatsService) GetServiceNodes() *sync.Map { return s.registry.GetNodes() } // 查找指定的服务节点信息 func (s *NatsService) FindServiceNode(serviceName string) (etcd.ServiceNode, error) { return s.registry.FindNode(serviceName) }