samba/pkg/service/service.go
2025-06-04 09:51:39 +08:00

252 lines
5.7 KiB
Go

package service
import (
"fmt"
"github.com/rabbitmq/amqp091-go"
"samba/pkg/ksync"
"samba/pkg/log"
"samba/pkg/rmq"
"sync/atomic"
"time"
)
type HandlerMessage func(IService, *amqp091.Delivery)
type HandlerRpcMessage func(string, ...interface{}) ([]interface{}, error)
type HandlerFunc func()
type service struct {
*Timer
mq *rmq.Rmq
mqUrl string
consumeMsg <-chan amqp091.Delivery
opts *options
type_ string
name string
//msgQueue *Queue[*Message]
//rpcMsgQueue *Queue[*RpcMessage]
//msg chan *Message
//rpcMsg chan *RpcMessage
stop int32
handlerMessage HandlerMessage
//handlerRpcMessage HandlerRpcMessage
handlerFunc chan HandlerFunc
}
func newOption(opt ...Option) *options {
opts := &options{
capacity: 256,
onInit: defaultOnInit,
onStop: defaultOnStop,
onNotifyStop: defaultOnNotifyStop,
}
for _, f := range opt {
f(opts)
}
return opts
}
func NewService(type_, name, mqUrl string, handler HandlerMessage, opt ...Option) IService {
op := newOption(opt...)
s := &service{
opts: op,
type_: type_,
name: name,
mq: nil,
mqUrl: mqUrl,
consumeMsg: nil,
//msgQueue: NewQueue[*Message](),
//msg: make(chan *Message, op.capacity),
//rpcMsgQueue: mq.NewQueue[*mq.RpcMessage](),
//rpcMsg: make(chan *mq.RpcMessage, op.capacity),
Timer: NewTimer(),
stop: 0,
handlerMessage: handler,
//handlerRpcMessage: handlerRpcMessage,
handlerFunc: make(chan HandlerFunc, 16),
}
var err error
s.mq, err = rmq.NewRmq(mqUrl)
if err != nil {
log.Error(err.Error())
return nil
}
if s.mq != nil {
s.opts.onInit(s)
s.Run()
registerService(s)
} else {
log.Error(fmt.Sprintf("rmq is nil"))
}
return s
}
func (s *service) Name() string {
return s.name
}
func (s *service) Type() string {
return s.type_
}
func (s *service) ExchangeDeclare(exchangeName, typ string) error {
return s.mq.ExchangeDeclare(exchangeName, typ)
}
func (s *service) Consume(queueName string) error {
var err error
s.consumeMsg, err = s.mq.Consume(queueName)
return err
}
func (s *service) ConsumeDelete() error {
return s.mq.ConsumeDelete()
}
func (s *service) QueueDeclare(queueName string) error { return s.mq.QueueDeclare(queueName) }
func (s *service) QueueDelete(queueName string) error {
return s.mq.QueueDelete(queueName)
}
func (s *service) QueueBind(queueName, routerKey, exchangeName string) error {
//log.Debug(fmt.Sprintf("exchange:%v bind router key:%v to queue:%v", exchangeName, routerKey, queueName))
return s.mq.QueueBind(queueName, routerKey, exchangeName)
}
func (s *service) QueueUnbind(queueName, routerKey, exchangeName string) error {
return s.mq.QueueUnbind(queueName, routerKey, exchangeName)
}
func (s *service) Publish(exchangeName, routerKey string, msg []byte) error {
return s.mq.Publish(exchangeName, routerKey, msg)
}
func (s *service) PublishRpc(d *amqp091.Delivery, msg []byte) error {
return s.mq.PublishRpc(d, msg)
}
func (s *service) PublishRaw(exchangeName, routerKey string, data amqp091.Publishing) error {
return s.mq.PublishRaw(exchangeName, routerKey, data)
}
func (s *service) WaitStop() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
isBreak := false
for {
select {
case <-ticker.C:
isBreak = s.opts.canStop(s)
//default:
}
if isBreak {
break
}
}
atomic.AddInt32(&s.stop, 1)
s.Timer.Close()
s.opts.onStop(s)
if _, ok := mgr.services.Get(s.name); ok {
mgr.services.Remove(s.name)
}
// log.Debug(fmt.Sprintf("%v service stop", s.name))
}
func (s *service) NotifyStop() {
s.opts.onNotifyStop(s)
// log.Debug(fmt.Sprintf("notify %v service stop", s.name))
}
func (s *service) isEmpty() bool {
return true
//return s.msgQueue.IsEmpty() && s.rpcMsgQueue.IsEmpty()
}
func (s *service) isStop() bool {
return atomic.LoadInt32(&s.stop) > 0 && s.isEmpty()
}
func (s *service) runRpc() {
//if m, err := s.rpcMsgQueue.Pop(); err == nil && m != nil {
// rets, err := s.handlerRpcMessage(m.FuncName, m.Args...)
// if srcM, ok := mgr.rpcMsg.Load(m.No); ok {
// srcM.Rets = rets
// srcM.Err = err
// srcM.Ch <- true
// } else {
// if remote, ok := findRemoteServiceByName(m.Src[0]); ok {
// m.Rets = rets
// if msg, err := json.Marshal(m); err == nil {
// _ = remote.WriteMsg(rpcReceiveMsgId, msg)
// } else {
// log.Error(fmt.Sprintf("return rpc data.err:%v. rpc src:%v", err, m.Src[0]))
// }
// } else {
// log.Error(fmt.Sprintf("remote service:%v not exist", m.Src[0]))
// }
// }
//}
}
func (s *service) ProcFuncOnce(cb HandlerFunc) {
s.handlerFunc <- cb
}
func (s *service) Run() {
ksync.GoSafe(func() {
for {
if s.isStop() {
break
}
select {
case d, ok := <-s.consumeMsg:
if ok {
//s.handlerMessage(d.Body)
s.handlerMessage(s, &d)
} else {
log.Error("consume was closed because queue or exchange error")
_ = s.ConsumeDelete()
if err := s.Consume(s.Name()); err != nil {
log.Error(err.Error())
if s.mq != nil {
s.mq.Close()
}
s.mq, err = rmq.NewRmq(s.mqUrl)
if err != nil {
log.Error(err.Error())
} else {
if s.mq != nil {
if !s.opts.onInit(s) {
s.mq.Close()
}
} else {
}
}
}
}
case cb, ok := <-s.handlerFunc:
if ok && cb != nil {
cb()
}
//case m := <-s.msg:
// s.msgQueue.Push(m)
//case r := <-s.rpcMsg:
// s.rpcMsgQueue.Push(r)
case t, ok := <-s.Timer.chTimer:
if ok && t != nil && t.cb != nil {
t.cb()
}
//default:
}
//if m, err := s.msgQueue.Pop(); err == nil && m != nil {
// s.handlerMessage(m)
//}
//s.runRpc()
}
}, s.Run)
}