fox/service/baseService.go

176 lines
3.6 KiB
Go
Raw Normal View History

2025-05-25 20:02:15 +08:00
package service
import (
"context"
"fmt"
"github.com/fox/fox/ksync"
"github.com/fox/fox/log"
"github.com/fox/fox/safeChan"
"github.com/fox/fox/timer"
"time"
)
type BaseService struct {
*timer.Timer
type_ string
name string
onFunc IOnFunc
sender ISender
msg *safeChan.SafeChan[[]byte]
job *safeChan.SafeChan[func()]
readyStop context.Context // 通知关闭的chan
readyStopFunc context.CancelFunc // NotifyStop会调用该方法触发服务进入关闭流程
waitStop context.Context
waitStopFunc context.CancelFunc
2025-05-25 20:02:15 +08:00
}
func NewBaseService(type_, name string, onFunc IOnFunc, sender ISender) *BaseService {
s := &BaseService{
type_: type_,
name: name,
Timer: timer.NewTimer(),
onFunc: onFunc,
sender: sender,
msg: safeChan.NewSafeChan[[]byte](128),
job: safeChan.NewSafeChan[func()](128),
}
s.readyStop, s.readyStopFunc = context.WithCancel(context.Background())
2025-05-25 20:02:15 +08:00
s.waitStop, s.waitStopFunc = context.WithCancel(context.Background())
//s.Run()
2025-05-25 20:02:15 +08:00
return s
}
func (s *BaseService) Name() string {
return s.name
}
func (s *BaseService) Type() string {
return s.type_
}
func (s *BaseService) Write(msg []byte) error {
return s.msg.Write(msg)
}
func (s *BaseService) RunOnce(cb func()) {
select {
case <-s.readyStop.Done():
log.Error(s.Log("want readyStop, can not call RunOnce function"))
2025-05-25 20:02:15 +08:00
return
default:
_ = s.job.Write(cb)
}
}
func (s *BaseService) RunWait(cb func() (retValue any)) (retValue any, err error) {
select {
case <-s.readyStop.Done():
err = fmt.Errorf(s.Log("want readyStop, can not call RunOnce function"))
2025-05-25 20:02:15 +08:00
log.Error(err.Error())
return nil, err
default:
wait := make(chan any, 2)
err = s.job.Write(func() {
retValue = cb()
wait <- retValue
})
if err == nil {
select {
case retValue = <-wait:
return retValue, nil
case <-time.After(time.Second * time.Duration(30)):
return nil, fmt.Errorf("timeout fail")
}
}
return nil, err
}
}
func (s *BaseService) Send(topic string, msg []byte) error {
if s.sender != nil {
return s.sender.Send(topic, msg)
}
return s.Err("send is nil")
}
func (s *BaseService) Call(topic string, timeout time.Duration, msg []byte) ([]byte, error) {
if s.sender != nil {
return s.sender.Call(topic, timeout, msg)
}
return nil, s.Err("call is nil")
}
func (s *BaseService) WaitStop() {
select {
case <-s.waitStop.Done():
return
}
}
func (s *BaseService) NotifyStop() {
s.readyStopFunc()
// log.Debug(fmt.Sprintf("notify %v service readyStop", s.name))
2025-05-25 20:02:15 +08:00
}
//func (s *BaseService) allChanEmpty() bool {
// if s.job.Size() == 0 && s.msg.Size() == 0 {
// return true
// }
// return false
//}
2025-05-25 20:02:15 +08:00
func (s *BaseService) canStop() bool {
select {
case <-s.readyStop.Done():
return true
2025-05-25 20:02:15 +08:00
default:
return false
}
}
func (s *BaseService) run() {
for {
if s.onFunc.CanStop() && s.canStop() {
s.msg.Close()
s.job.Close()
s.Timer.CancelAllTimer()
s.Timer.Close()
s.onFunc.OnStop()
s.waitStopFunc()
2025-05-25 20:02:15 +08:00
break
}
select {
case msg, ok := <-s.msg.Reader():
if ok {
2025-05-25 20:02:15 +08:00
_ = s.onFunc.OnMessage(msg)
}
case cb, ok := <-s.job.Reader():
if ok && cb != nil {
cb()
}
case t, ok := <-s.Timer.Reader():
if ok && t != nil && t.Func != nil {
t.Func()
}
}
}
}
func (s *BaseService) Run() {
ksync.GoSafe(s.run, s.Run)
}
func (s *BaseService) Log(format string, a ...any) string {
2025-05-26 16:02:54 +08:00
head := fmt.Sprintf("service:%v ", s.name)
2025-05-25 20:02:15 +08:00
return head + fmt.Sprintf(format, a...)
}
func (s *BaseService) Err(format string, a ...any) error {
2025-05-26 16:02:54 +08:00
head := fmt.Sprintf("service:%v ", s.name)
2025-05-25 20:02:15 +08:00
return fmt.Errorf(head + fmt.Sprintf(format, a...))
}