diff --git a/safeChan/safeChan.go b/safeChan/safeChan.go index d9dc645..b314e04 100644 --- a/safeChan/safeChan.go +++ b/safeChan/safeChan.go @@ -15,6 +15,7 @@ type SafeChan[T any] struct { once sync.Once } +// size为0会创建无缓冲队列,读chan时,没有数据会阻塞到有数据可读 func NewSafeChan[T any](size int) *SafeChan[T] { ch := &SafeChan[T]{} ch.ctx, ch.cancel = context.WithCancel(context.Background()) @@ -40,6 +41,7 @@ func (s *SafeChan[T]) Size() int { } func (s *SafeChan[T]) Reader() <-chan T { + //log.Debug("reader channel") return s.ch } diff --git a/service/baseService.go b/service/baseService.go index 842195b..f0c2399 100644 --- a/service/baseService.go +++ b/service/baseService.go @@ -57,6 +57,7 @@ func (s *BaseService) Write(msg []byte) error { return s.msg.Write(msg) } +// 执行一次回调 func (s *BaseService) RunOnce(cb func()) { select { case <-s.readyStop.Done(): @@ -67,6 +68,7 @@ func (s *BaseService) RunOnce(cb func()) { } } +// 执行一次回调并等待返回值 func (s *BaseService) RunWait(cb func() (retValue any)) (retValue any, err error) { select { case <-s.readyStop.Done(): @@ -113,7 +115,7 @@ func (s *BaseService) WaitStop() { func (s *BaseService) NotifyStop() { s.readyStopFunc() - // log.Debug(fmt.Sprintf("notify %v service readyStop", s.name)) + //log.Debug(s.Log("notify %v service readyStop", s.name)) } //func (s *BaseService) allChanEmpty() bool { @@ -126,8 +128,10 @@ func (s *BaseService) NotifyStop() { func (s *BaseService) canStop() bool { select { case <-s.readyStop.Done(): + //log.Error(s.Log("want readyStop")) return true default: + //log.Error(s.Log("can not Stop")) return false } } @@ -135,6 +139,7 @@ func (s *BaseService) canStop() bool { func (s *BaseService) run() { for { if s.onFunc.CanStop() && s.canStop() { + //log.Error(s.Log("stop service")) s.msg.Close() s.job.Close() s.Timer.CancelAllTimer() @@ -144,19 +149,26 @@ func (s *BaseService) run() { s.waitStopFunc() break } + // select无default会阻塞在case的chan中直到有可读chan select { case msg, ok := <-s.msg.Reader(): + //log.Debug(s.Log("msg reader")) if ok { _ = s.onFunc.OnMessage(msg) } case cb, ok := <-s.job.Reader(): + //log.Debug(s.Log("job reader")) if ok && cb != nil { cb() } case t, ok := <-s.Timer.Reader(): + //log.Debug(s.Log("timer reader")) if ok && t != nil && t.Func != nil { t.Func() } + default: + // 休眠50微秒,避免cpu占用过高 + time.Sleep(50 * time.Microsecond) } } }