88 lines
1.9 KiB
Go
Raw Permalink Normal View History

2025-06-04 09:51:39 +08:00
package monitor
import (
"fmt"
"math"
"samba/pkg/log"
"samba/pkg/servername"
"samba/pkg/task"
"samba/pkg/xtime"
"samba/proto"
"samba/server/other/handler"
"samba/util/model"
"samba/util/routingKey"
"samba/util/util"
"time"
)
// OnlineMonitor
// 在线人数异常检测
type OnlineMonitor struct {
*Ding
}
func (m *OnlineMonitor) Init(t *task.Task) {
t.Ticker(10 * time.Minute)
}
// 获取昨天的数据
func (m *OnlineMonitor) getYesterdayData(fn func(flows []proto.OnlineFlow, err error)) {
ys := xtime.Now().Yesterday()
stamp := xtime.ConvertToCarbon(time.Date(ys.Year(), time.Month(ys.Month()), ys.Day(), ys.Hour(), ys.Minute(), 0, 0, ys.StdTime().Location()))
req := &proto.ReqGetOnlineFlow{
Date: ys.StdTime().Format("20060102"),
TimeStamp: stamp.Timestamp(),
}
handler.CallRpc(proto.ReqGetOnlineFlowId,
util.Direct(servername.ClickHouse),
routingKey.ClickHouseKey(0),
req, func(rsp *proto.RspGetOnlineFlow, err error) {
if err != nil {
fn(nil, err)
return
}
if rsp.Err != "" {
fn(nil, fmt.Errorf("query error:%v", rsp.Err))
return
}
fn(rsp.Flows, nil)
})
}
func (m *OnlineMonitor) Name() string {
return "在线人数异常检测"
}
func (m *OnlineMonitor) Do(*task.Task) {
m.getYesterdayData(func(flows []proto.OnlineFlow, err error) {
if err != nil {
log.Error(err.Error())
return
}
var yesterdayOnline int64
for _, flow := range flows {
// 找到昨天当前分钟的数据,
yesterdayOnline += flow.Online
}
current, err := model.NewUserOnlineOp().Current()
if err != nil {
log.Error(err.Error())
return
}
if math.Abs(float64(yesterdayOnline-current)) > (float64(yesterdayOnline) * 0.5) {
// 波动大于 正负50%
err := m.SendMessageText(fmt.Sprintf("在线人数波动,昨日同时刻%d人今日%d人", yesterdayOnline, current))
if err != nil {
log.Error(err.Error())
return
}
}
})
}