package service import ( "context" "fmt" "github.com/fox/fox/ipb" "github.com/fox/fox/ksync" "github.com/fox/fox/log" "github.com/fox/fox/safeChan" "github.com/fox/fox/timer" "time" ) type BaseService struct { *timer.Timer type_ string name string onFunc IOnFunc sender ISender msg *safeChan.SafeChan[[]byte] job *safeChan.SafeChan[func()] readyStop context.Context // 通知关闭的chan readyStopFunc context.CancelFunc // NotifyStop会调用该方法,触发服务进入关闭流程 waitStop context.Context waitStopFunc context.CancelFunc } func NewBaseService(type_, name string, onFunc IOnFunc, sender ISender) *BaseService { s := &BaseService{ type_: type_, name: name, Timer: timer.NewTimer(), onFunc: onFunc, sender: sender, msg: safeChan.NewSafeChan[[]byte](128), job: safeChan.NewSafeChan[func()](128), } s.readyStop, s.readyStopFunc = context.WithCancel(context.Background()) s.waitStop, s.waitStopFunc = context.WithCancel(context.Background()) //s.Run() return s } func (s *BaseService) Name() string { return s.name } func (s *BaseService) Type() string { return s.type_ } func (s *BaseService) Write(msg []byte) error { return s.msg.Write(msg) } func (s *BaseService) RunOnce(cb func()) { select { case <-s.readyStop.Done(): log.Error(s.Log("want readyStop, can not call RunOnce function")) return default: _ = s.job.Write(cb) } } func (s *BaseService) RunWait(cb func() (retValue any)) (retValue any, err error) { select { case <-s.readyStop.Done(): err = fmt.Errorf(s.Log("want readyStop, can not call RunOnce function")) log.Error(err.Error()) return nil, err default: wait := make(chan any, 2) err = s.job.Write(func() { retValue = cb() wait <- retValue }) if err == nil { select { case retValue = <-wait: return retValue, nil case <-time.After(time.Second * time.Duration(30)): return nil, fmt.Errorf("timeout fail") } } return nil, err } } func (s *BaseService) Send(topic string, msg *ipb.InternalMsg) error { if s.sender != nil { return s.sender.Send(topic, msg) } return s.Err("send is nil") } func (s *BaseService) Call(rpcTopic string, msg *ipb.InternalMsg, subMsg, cb any) error { if s.sender != nil { return s.sender.Call(rpcTopic, msg, subMsg, cb) } return s.Err("call is nil") } func (s *BaseService) WaitStop() { select { case <-s.waitStop.Done(): return } } func (s *BaseService) NotifyStop() { s.readyStopFunc() // log.Debug(fmt.Sprintf("notify %v service readyStop", s.name)) } //func (s *BaseService) allChanEmpty() bool { // if s.job.Size() == 0 && s.msg.Size() == 0 { // return true // } // return false //} func (s *BaseService) canStop() bool { select { case <-s.readyStop.Done(): return true default: return false } } func (s *BaseService) run() { for { if s.onFunc.CanStop() && s.canStop() { s.msg.Close() s.job.Close() s.Timer.CancelAllTimer() s.Timer.Close() s.onFunc.OnStop() s.waitStopFunc() break } select { case msg, ok := <-s.msg.Reader(): if ok { _ = s.onFunc.OnMessage(msg) } case cb, ok := <-s.job.Reader(): if ok && cb != nil { cb() } case t, ok := <-s.Timer.Reader(): if ok && t != nil && t.Func != nil { t.Func() } } } } func (s *BaseService) Run() { ksync.GoSafe(s.run, s.Run) } func (s *BaseService) Log(format string, a ...any) string { head := fmt.Sprintf("service:%v ", s.name) return head + fmt.Sprintf(format, a...) } func (s *BaseService) Err(format string, a ...any) error { head := fmt.Sprintf("service:%v ", s.name) return fmt.Errorf(head + fmt.Sprintf(format, a...)) }