package monitor import ( "fmt" "math" "samba/pkg/log" "samba/pkg/servername" "samba/pkg/task" "samba/pkg/xtime" "samba/proto" "samba/server/other/handler" "samba/util/model" "samba/util/routingKey" "samba/util/util" "time" ) // OnlineMonitor // 在线人数异常检测 type OnlineMonitor struct { *Ding } func (m *OnlineMonitor) Init(t *task.Task) { t.Ticker(10 * time.Minute) } // 获取昨天的数据 func (m *OnlineMonitor) getYesterdayData(fn func(flows []proto.OnlineFlow, err error)) { ys := xtime.Now().Yesterday() stamp := xtime.ConvertToCarbon(time.Date(ys.Year(), time.Month(ys.Month()), ys.Day(), ys.Hour(), ys.Minute(), 0, 0, ys.StdTime().Location())) req := &proto.ReqGetOnlineFlow{ Date: ys.StdTime().Format("20060102"), TimeStamp: stamp.Timestamp(), } handler.CallRpc(proto.ReqGetOnlineFlowId, util.Direct(servername.ClickHouse), routingKey.ClickHouseKey(0), req, func(rsp *proto.RspGetOnlineFlow, err error) { if err != nil { fn(nil, err) return } if rsp.Err != "" { fn(nil, fmt.Errorf("query error:%v", rsp.Err)) return } fn(rsp.Flows, nil) }) } func (m *OnlineMonitor) Name() string { return "在线人数异常检测" } func (m *OnlineMonitor) Do(*task.Task) { m.getYesterdayData(func(flows []proto.OnlineFlow, err error) { if err != nil { log.Error(err.Error()) return } var yesterdayOnline int64 for _, flow := range flows { // 找到昨天当前分钟的数据, yesterdayOnline += flow.Online } current, err := model.NewUserOnlineOp().Current() if err != nil { log.Error(err.Error()) return } if math.Abs(float64(yesterdayOnline-current)) > (float64(yesterdayOnline) * 0.5) { // 波动大于 正负50% err := m.SendMessageText(fmt.Sprintf("在线人数波动,昨日同时刻%d人,今日%d人", yesterdayOnline, current)) if err != nil { log.Error(err.Error()) return } } }) }