177 lines
3.6 KiB
Go
177 lines
3.6 KiB
Go
package service
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"github.com/fox/fox/ipb"
|
||
"github.com/fox/fox/ksync"
|
||
"github.com/fox/fox/log"
|
||
"github.com/fox/fox/safeChan"
|
||
"github.com/fox/fox/timer"
|
||
"time"
|
||
)
|
||
|
||
type BaseService struct {
|
||
*timer.Timer
|
||
type_ string
|
||
name string
|
||
|
||
onFunc IOnFunc
|
||
sender ISender
|
||
|
||
msg *safeChan.SafeChan[[]byte]
|
||
job *safeChan.SafeChan[func()]
|
||
readyStop context.Context // 通知关闭的chan
|
||
readyStopFunc context.CancelFunc // NotifyStop会调用该方法,触发服务进入关闭流程
|
||
waitStop context.Context
|
||
waitStopFunc context.CancelFunc
|
||
}
|
||
|
||
func NewBaseService(type_, name string, onFunc IOnFunc, sender ISender) *BaseService {
|
||
s := &BaseService{
|
||
type_: type_,
|
||
name: name,
|
||
Timer: timer.NewTimer(),
|
||
onFunc: onFunc,
|
||
sender: sender,
|
||
|
||
msg: safeChan.NewSafeChan[[]byte](128),
|
||
job: safeChan.NewSafeChan[func()](128),
|
||
}
|
||
s.readyStop, s.readyStopFunc = context.WithCancel(context.Background())
|
||
s.waitStop, s.waitStopFunc = context.WithCancel(context.Background())
|
||
|
||
//s.Run()
|
||
return s
|
||
}
|
||
|
||
func (s *BaseService) Name() string {
|
||
return s.name
|
||
}
|
||
|
||
func (s *BaseService) Type() string {
|
||
return s.type_
|
||
}
|
||
|
||
func (s *BaseService) Write(msg []byte) error {
|
||
return s.msg.Write(msg)
|
||
}
|
||
|
||
func (s *BaseService) RunOnce(cb func()) {
|
||
select {
|
||
case <-s.readyStop.Done():
|
||
log.Error(s.Log("want readyStop, can not call RunOnce function"))
|
||
return
|
||
default:
|
||
_ = s.job.Write(cb)
|
||
}
|
||
}
|
||
|
||
func (s *BaseService) RunWait(cb func() (retValue any)) (retValue any, err error) {
|
||
select {
|
||
case <-s.readyStop.Done():
|
||
err = fmt.Errorf(s.Log("want readyStop, can not call RunOnce function"))
|
||
log.Error(err.Error())
|
||
return nil, err
|
||
default:
|
||
wait := make(chan any, 2)
|
||
err = s.job.Write(func() {
|
||
retValue = cb()
|
||
wait <- retValue
|
||
})
|
||
if err == nil {
|
||
select {
|
||
case retValue = <-wait:
|
||
return retValue, nil
|
||
case <-time.After(time.Second * time.Duration(30)):
|
||
return nil, fmt.Errorf("timeout fail")
|
||
}
|
||
}
|
||
return nil, err
|
||
}
|
||
}
|
||
|
||
func (s *BaseService) Send(topic string, msg *ipb.InternalMsg) error {
|
||
if s.sender != nil {
|
||
return s.sender.Send(topic, msg)
|
||
}
|
||
return s.Err("send is nil")
|
||
}
|
||
func (s *BaseService) Call(rpcTopic string, msg *ipb.InternalMsg, cb func(msg *ipb.InternalMsg)) error {
|
||
if s.sender != nil {
|
||
return s.sender.Call(rpcTopic, msg, cb)
|
||
}
|
||
return s.Err("call is nil")
|
||
}
|
||
|
||
func (s *BaseService) WaitStop() {
|
||
select {
|
||
case <-s.waitStop.Done():
|
||
return
|
||
}
|
||
}
|
||
|
||
func (s *BaseService) NotifyStop() {
|
||
s.readyStopFunc()
|
||
// log.Debug(fmt.Sprintf("notify %v service readyStop", s.name))
|
||
}
|
||
|
||
//func (s *BaseService) allChanEmpty() bool {
|
||
// if s.job.Size() == 0 && s.msg.Size() == 0 {
|
||
// return true
|
||
// }
|
||
// return false
|
||
//}
|
||
|
||
func (s *BaseService) canStop() bool {
|
||
select {
|
||
case <-s.readyStop.Done():
|
||
return true
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
|
||
func (s *BaseService) run() {
|
||
for {
|
||
if s.onFunc.CanStop() && s.canStop() {
|
||
s.msg.Close()
|
||
s.job.Close()
|
||
s.Timer.CancelAllTimer()
|
||
s.Timer.Close()
|
||
|
||
s.onFunc.OnStop()
|
||
s.waitStopFunc()
|
||
break
|
||
}
|
||
select {
|
||
case msg, ok := <-s.msg.Reader():
|
||
if ok {
|
||
_ = s.onFunc.OnMessage(msg)
|
||
}
|
||
case cb, ok := <-s.job.Reader():
|
||
if ok && cb != nil {
|
||
cb()
|
||
}
|
||
case t, ok := <-s.Timer.Reader():
|
||
if ok && t != nil && t.Func != nil {
|
||
t.Func()
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func (s *BaseService) Run() {
|
||
ksync.GoSafe(s.run, s.Run)
|
||
}
|
||
|
||
func (s *BaseService) Log(format string, a ...any) string {
|
||
head := fmt.Sprintf("service:%v ", s.name)
|
||
return head + fmt.Sprintf(format, a...)
|
||
}
|
||
|
||
func (s *BaseService) Err(format string, a ...any) error {
|
||
head := fmt.Sprintf("service:%v ", s.name)
|
||
return fmt.Errorf(head + fmt.Sprintf(format, a...))
|
||
}
|