package service import ( "fmt" "github.com/rabbitmq/amqp091-go" "samba/pkg/ksync" "samba/pkg/log" "samba/pkg/rmq" "sync/atomic" "time" ) type HandlerMessage func(IService, *amqp091.Delivery) type HandlerRpcMessage func(string, ...interface{}) ([]interface{}, error) type HandlerFunc func() type service struct { *Timer mq *rmq.Rmq mqUrl string consumeMsg <-chan amqp091.Delivery opts *options type_ string name string //msgQueue *Queue[*Message] //rpcMsgQueue *Queue[*RpcMessage] //msg chan *Message //rpcMsg chan *RpcMessage stop int32 handlerMessage HandlerMessage //handlerRpcMessage HandlerRpcMessage handlerFunc chan HandlerFunc } func newOption(opt ...Option) *options { opts := &options{ capacity: 256, onInit: defaultOnInit, onStop: defaultOnStop, onNotifyStop: defaultOnNotifyStop, } for _, f := range opt { f(opts) } return opts } func NewService(type_, name, mqUrl string, handler HandlerMessage, opt ...Option) IService { op := newOption(opt...) s := &service{ opts: op, type_: type_, name: name, mq: nil, mqUrl: mqUrl, consumeMsg: nil, //msgQueue: NewQueue[*Message](), //msg: make(chan *Message, op.capacity), //rpcMsgQueue: mq.NewQueue[*mq.RpcMessage](), //rpcMsg: make(chan *mq.RpcMessage, op.capacity), Timer: NewTimer(), stop: 0, handlerMessage: handler, //handlerRpcMessage: handlerRpcMessage, handlerFunc: make(chan HandlerFunc, 16), } var err error s.mq, err = rmq.NewRmq(mqUrl) if err != nil { log.Error(err.Error()) return nil } if s.mq != nil { s.opts.onInit(s) s.Run() registerService(s) } else { log.Error(fmt.Sprintf("rmq is nil")) } return s } func (s *service) Name() string { return s.name } func (s *service) Type() string { return s.type_ } func (s *service) ExchangeDeclare(exchangeName, typ string) error { return s.mq.ExchangeDeclare(exchangeName, typ) } func (s *service) Consume(queueName string) error { var err error s.consumeMsg, err = s.mq.Consume(queueName) return err } func (s *service) ConsumeDelete() error { return s.mq.ConsumeDelete() } func (s *service) QueueDeclare(queueName string) error { return s.mq.QueueDeclare(queueName) } func (s *service) QueueDelete(queueName string) error { return s.mq.QueueDelete(queueName) } func (s *service) QueueBind(queueName, routerKey, exchangeName string) error { //log.Debug(fmt.Sprintf("exchange:%v bind router key:%v to queue:%v", exchangeName, routerKey, queueName)) return s.mq.QueueBind(queueName, routerKey, exchangeName) } func (s *service) QueueUnbind(queueName, routerKey, exchangeName string) error { return s.mq.QueueUnbind(queueName, routerKey, exchangeName) } func (s *service) Publish(exchangeName, routerKey string, msg []byte) error { return s.mq.Publish(exchangeName, routerKey, msg) } func (s *service) PublishRpc(d *amqp091.Delivery, msg []byte) error { return s.mq.PublishRpc(d, msg) } func (s *service) PublishRaw(exchangeName, routerKey string, data amqp091.Publishing) error { return s.mq.PublishRaw(exchangeName, routerKey, data) } func (s *service) WaitStop() { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() isBreak := false for { select { case <-ticker.C: isBreak = s.opts.canStop(s) //default: } if isBreak { break } } atomic.AddInt32(&s.stop, 1) s.Timer.Close() s.opts.onStop(s) if _, ok := mgr.services.Get(s.name); ok { mgr.services.Remove(s.name) } // log.Debug(fmt.Sprintf("%v service stop", s.name)) } func (s *service) NotifyStop() { s.opts.onNotifyStop(s) // log.Debug(fmt.Sprintf("notify %v service stop", s.name)) } func (s *service) isEmpty() bool { return true //return s.msgQueue.IsEmpty() && s.rpcMsgQueue.IsEmpty() } func (s *service) isStop() bool { return atomic.LoadInt32(&s.stop) > 0 && s.isEmpty() } func (s *service) runRpc() { //if m, err := s.rpcMsgQueue.Pop(); err == nil && m != nil { // rets, err := s.handlerRpcMessage(m.FuncName, m.Args...) // if srcM, ok := mgr.rpcMsg.Load(m.No); ok { // srcM.Rets = rets // srcM.Err = err // srcM.Ch <- true // } else { // if remote, ok := findRemoteServiceByName(m.Src[0]); ok { // m.Rets = rets // if msg, err := json.Marshal(m); err == nil { // _ = remote.WriteMsg(rpcReceiveMsgId, msg) // } else { // log.Error(fmt.Sprintf("return rpc data.err:%v. rpc src:%v", err, m.Src[0])) // } // } else { // log.Error(fmt.Sprintf("remote service:%v not exist", m.Src[0])) // } // } //} } func (s *service) ProcFuncOnce(cb HandlerFunc) { s.handlerFunc <- cb } func (s *service) Run() { ksync.GoSafe(func() { for { if s.isStop() { break } select { case d, ok := <-s.consumeMsg: if ok { //s.handlerMessage(d.Body) s.handlerMessage(s, &d) } else { log.Error("consume was closed because queue or exchange error") _ = s.ConsumeDelete() if err := s.Consume(s.Name()); err != nil { log.Error(err.Error()) if s.mq != nil { s.mq.Close() } s.mq, err = rmq.NewRmq(s.mqUrl) if err != nil { log.Error(err.Error()) } else { if s.mq != nil { if !s.opts.onInit(s) { s.mq.Close() } } else { } } } } case cb, ok := <-s.handlerFunc: if ok && cb != nil { cb() } //case m := <-s.msg: // s.msgQueue.Push(m) //case r := <-s.rpcMsg: // s.rpcMsgQueue.Push(r) case t, ok := <-s.Timer.chTimer: if ok && t != nil && t.cb != nil { t.cb() } //default: } //if m, err := s.msgQueue.Pop(); err == nil && m != nil { // s.handlerMessage(m) //} //s.runRpc() } }, s.Run) }