商品异动检测-kats算法实现
背景
为服务好xx战役,需要对各个xg类目的表现情况做好监控,为此工程侧期望能够沉淀出一套异动情况、异动原因的快速/准确的反馈、归纳以及对未来是否还能维稳的预测能力。本次主要是以爆品商品订单的持续异常下滑为切入点,做订单异动检测算法工程化的设计。
异动检测能力工程化目标
趋势变化检测:加速放缓、跌速增加
趋势拐点检测:在某个时间点趋势发生变化
时序异常检测:检测时序数据中的异常点
趋势差异检测:比如某些品类商品的变化趋势和其他的明显不同
业界相关产品/算法调研
在相对完善的指标体系建设背景下,我们需要通过指标以及指标波动的解读来描述、追踪、推动业务。当一个指标波动时,首先需要从业务视角判断其波动是否异常,即异动检测;其次判断异常背后的原因是什么,即异动归因。
弹内相关产品调研
脱敏
业界公认算法
常用的时序算法包括经典的时序统计模型ç, 异常检测模型Isolation Forest,深度学习模型AutoEncoder,以及工业界模型greykite/luminol(LinkedIn), Propet/Kats(Meta), Twitter-AD(Twitter), SR-CNN(Microsoft)
优劣势对比
算法按照数据要求、适用范围、精度、计算速度、准确度、计算速度、可解释性、算法灵活性几个维度做比较:
算法 | 数据要求 | 适用范围 | 精度 | 计算速度 | 准确度 | 可解释性 | 算法灵活性 |
---|---|---|---|---|---|---|---|
Arima | 高 | 小规模数据 | 中 | 中 | 中 | 低 | 中 |
Isolation Forest | 中 | 大规模数据 | 中 | 高 | 高 | 低 | 中 |
AutoEncoder | 高 | 各种类型的数据 | 高 | 中 | 高 | 低 | 中 |
Greykite/Luminol(LinkedIn) | 高 | 大规模数据 | 中 | 低 | 中 | 低 | 高 |
Propet/Kats(Meta) | 高 | 大规模数据 | 中 | 中 | 中 | 高 | 高 |
Twitter-AD(Twitter) | 中 | 大规模数据 | 中 | 中 | 中 | 低 | 高 |
SR-CNN(Microsoft) | 高 | 大规模数据 | 高 | 低 | 高 | 低 | 中 |
数据要求方面,Arima、AutoEncoder和SR-CNN的数据要求较高,而Isolation Forest和Twitter-AD的数据要求较低。
适用范围方面,Greykite/Luminol、Propet/Kats和Isolation Forest适用于大规模数据,Arima适用于小规模数据,AutoEncoder适用于各种类型的数据,而Twitter-AD适用于中等规模的数据。
精度方面,AutoEncoder和SR-CNN的精度较高,Isolation Forest和Greykite/Luminol的精度处于中等水平,Arima和Propet/Kats的精度较低。
计算速度方面,Isolation Forest和SR-CNN的计算速度较快,而Greykite/Luminol、Propet/Kats和Arima的计算速度较慢。
准确度方面,Isolation Forest和AutoEncoder的准确度较高,而其他算法的准确度则处于中等水平。
可解释性方面,Propet/Kats和AutoEncoder的可解释性较高,而Greykite/Luminol、Twitter-AD和SR-CNN的可解释性较低。
算法灵活性方面,Greykite/Luminol、Propet/Kats和Twitter-AD的灵活性较高,而其他算法的灵活性则处于中等水平。
数据集验证
结论
Kats+STL分解:对于季节性变化较为明显的订单指标,可以使用Kats中的STL分解算法进行异常检测。该算法将时间序列分解为三个部分:季节性、趋势性和残差部分,从而更好地进行异常检测
应用算法简单介绍
Prophet是Meta推出的时间序列预测的开源库,其背后的原理是基于回归中的可加模型, 另外用分段线性或逻辑增长曲线模拟时序的趋势,用傅里叶级数模拟年周期,用虚拟变量模拟周周期,以及考虑了假期/事件对时序的影响。它适用于具有周期性和假期时间影响的时间序列, Prophet 对缺失数据和趋势变化具有鲁棒性,并且通常可以很好地处理异常值。 因为其提供统计显著及置信区间, 如果实际值在置信区间之外,可判断为是异常值。 所以其工具也可作为异常检测手段使用。
随后,Meta数据科学团队开发了一个叫Kats的Python工具包,将一系列的时序模型打包成可以复用的框架,以便使用者在上面进行一站式的时间序列分析:包含变点/异常值/趋势识别、时序预测、特征提取/嵌入、多变量分析等等。
总体方案设计
方案
脱敏
算法报告
数据集
取直营最近一天的爆品商品近30天的每日订单数据做算法输入
输入:xxxx.item_ord_detection_kats_algo_input(约3500个爆品)
输出:xxxx.item_ord_detection_kats_algo_res_opt
本报告基于kats支持的几种异常检测器来开展:
各检测算法&脚本
官方算法文档:https://github.com/facebookresearch/Kats/blob/v0.2.0/tutorials/kats_202_detection.ipynb
CUSUMDetector 异常点检测
CUSUM是一种检测时间序列中均值上升或下降变化的方法。实现有两个主要步骤:
Locate变化点:这是一个迭代过程,在时间序列的中间初始化一个变化点和CUSUM时间序列。下一个变化点是先前的CUSUM时间序列最大化(或最小化)的位置。这个迭代过程会一直持续,直到找到一个稳定的变化点或超过迭代次数的限制。
Test变化点的统计学意义。进行对数似然比检验,以测试在步骤1中计算出的变化点是否存在时间序列均值的变化。零假设是均值不变。默认情况下,只有在步骤2中拒绝零假设时,我们才会报告检测到的变化点。
CUSUMDetector具体可调试参数(标黄重点调试):
参数key | 含义 | 默认值/类型 |
---|---|---|
threshold | 阀值 | 0.01/float |
max_iter | 寻找变化点的最大迭代次数 | 10/int |
delta_std_ratio | 平均值之差必须大于数据的标准差乘以这个参数才能被视为变化 | 1.0/float |
min_abs_change | mu0和mu1之间的最小绝对差值(mu0: 变化点之前的平均值;mu1: 变化点之后的平均值;) | 0/int |
start_point | 变化点的起始索引 | None/Optionalint |
change_directions | 一个包含“increase”和/或“decrease”的列表,用于指定要检测的变化类型 | None/Optional[List[str]] |
interest_window | 一个包含兴趣窗口的起始和结束位置的列表,在该窗口中寻找变化点 | None/Optional[Tuple[int, int]] |
magnitude_quantile | 大小比较的分位数 | None/Optionalfloat |
magnitude_ratio | 可比较的比率 | 1.3/float |
magnitude_comparable_day | 可以具有可比较大小的最大天数的百分比,以便被视为回归 | 0.5/float |
return_all_changepoints | 返回发现的所有变化点,即使是不显著的。 | False/bool |
单突变点检测脚本
xxxxxxxxxx
def CUSUMDetectorMean(startDs, data):
# synthesize data with simulation
# orders = [54,59,72, 86, 177, 72, 76, 58, 45, 63, 55, 218, 217, 39, 17, 31, 31, 17, 16, 24, 27, 22, 28, 20, 29, 21, 25, 30, 20, 74, 145, 89, 30, 426, 53, 645, 542, 938, 171, 952, 780, 952, 1006, 914, 512, 596, 655, 610, 786, 952, 1198, 142, 128, 66, 87, 100, 74, 64, 106, 85, 86, 106]
# orders_one_m = [89, 30, 426, 53, 645, 542, 938, 171, 952, 780, 952, 1006, 914, 512, 596, 655, 610, 786, 952, 1198, 142, 128, 66, 87, 100, 74, 64, 106, 85, 86, 106]
orders_one_m = data
# np.random.seed(10)
series = pd.Series(orders_one_m)
df_increase_decrease = pd.DataFrame(
{
'time': pd.date_range(startDs, periods=len(series)),
'increase':series,
'decrease':series,
}
)
print('TimeSeriesData data:',df_increase_decrease)
# 突变点监测1
tsd = TimeSeriesData(df_increase_decrease.loc[:,['time','increase']])
detector = CUSUMDetector(tsd)
change_points = detector.detector(change_directions=["increase"], return_all_changepoints=True)
print('CUSUMDetector 突变点监测1:',change_points)
# 突变点监测2
tsd = TimeSeriesData(df_increase_decrease.loc[:,['time','decrease']])
detector = CUSUMDetector(tsd)
change_points = detector.detector(change_directions=["decrease"])
print('CUSUMDetector 突变点监测2:',change_points)
detector.plot(change_points)
return changePToDict(change_points)
多突变点检测脚本
xxxxxxxxxx
def CUSUMDetectorWithMultipleChangeP(startDs, data):
# orders_one_m = [89, 30, 426, 53, 645, 542, 938, 171, 952, 780, 952, 1006, 914, 512, 596, 655, 610, 786, 952, 1198, 142, 128, 66, 87, 100, 74, 64, 106, 85, 86, 106]
orders_one_m = data
series = pd.Series(orders_one_m)
df_multi_cps = pd.DataFrame(
{
'time': pd.date_range(startDs, periods=len(series)),
'y':series
}
)
multi_cp_ts = TimeSeriesData(df_multi_cps)
# print('CUSUMDetectorWithMultipleChangePTest input:',multi_cp_ts)
historical_window = 0
scan_window = 10
step = 5
changepoints = []
n = len(df_multi_cps)
for end_idx in range(historical_window + scan_window, n, step):
tsd = multi_cp_ts[end_idx - (historical_window + scan_window) : end_idx]
print('多窗口输入检测input:',tsd)
changepoints += CUSUMDetector(tsd).detector(change_directions=["decrease"],interest_window=[historical_window, historical_window + scan_window],max_iter=15,return_all_changepoints=True)
print('CUSUMDetector 多异常点检测:',changepoints)
return changePToDict(changepoints)
BOCPDetector 贝叶斯异常区间检测
贝叶斯在线变点检测(BOCPD)是一种用于检测时间序列中持续存在突变的方法。实现基于论文《Bayesian Online Changepoint Detection》(Adams&McKay,2007)。
有几个特性将BOCPD与Kats支持的其他变点检测方法区分开来:
在线模型:这种检测不需要预先知道整个序列。它只需要向前查看几步(由检测器方法中的滞后参数指定)进行预测,并在新数据到达时修正预测。
贝叶斯模型:用户可以指定关于变点概率的先验信念(使用检测器方法中的changepoint_prior参数),并指定生成时间序列的潜在概率模型的参数(使用检测器方法中的model_parameters参数)。目前,我们支持三种不同类型的潜在概率模型(使用检测器方法中的model参数指定)。
BOCPD检测方法的基本思想是使用贝叶斯推理来判断下一个点是否为变点。这要求用户指定(或使用默认值)变点概率和生成时间序列中传入数据点的潜在预测模型(UPM)。目前算法支持三种不同类型的潜在模型:
正态分布(未知均值,已知方差)
趋势变化分布
Poisson过程模型
BOCPDetector具体可调试参数(标黄重点调试):
参数key | 含义 | 默认值/类型 |
---|---|---|
model | 生成每个段内数据的潜在概率模型 | NORMAL_KNOWN_MODEL:已知方差的正态模型。使用此模型查找通常分布数据中的水平移位。TREND_CHANGE_MODEL:该模型假设每个段都是由普通线性回归生成的。使用此模型了解时间序列中的斜率或趋势的变化。POISSON_PROCESS_MODEL:这假设Poisson生成模型。将其用于计数数据,其中大多数值接近于零 |
model_parameters | 模型参数对应于特定模型的特定参数 | 模型对应的参数体:NormalKnownParameters,TrendChangeParameters,PoissonModelParameters |
lag | 指向检测变点的滞后。在看到“滞后”数量的数据点后报告变点。更高的滞后会更确定这确实是一个变点。较低的滞后将更快地检测到变点。这是一个权衡。 | 10/int |
changepoint_prior | 这是一种贝叶斯算法。此参数指定了给定点是变点的先验概率 | True/bool |
threshold | 在每个时刻报告观察变点的概率,通过将超过此阈值的点标记为变点来获得实际的变点 | 0.5/float |
debug | 允许用户查看为什么变点未能正确检测的调试信息 | False/bool |
检测脚本
xxxxxxxxxx
def BOCPDetectorAM(startDs, data):
# from kats.utils.simulator import Simulator
# sim = Simulator(n=450, start='2020-01-01', freq='H')
# ts_bocpd = sim.level_shift_sim(noise=0.05, seasonal_period=1)
orders_one_m = data
series = pd.Series(orders_one_m)
df = pd.DataFrame(
{
'time': pd.date_range(startDs, periods=len(series)),
'value':series
}
)
ts_bocpd = TimeSeriesData(df)
print('BOCPDetector input:',ts_bocpd)
# Initialize the detector
detector = BOCPDetector(ts_bocpd)
changepoints = detector.detector()
print('BOCPDetector input:',changepoints)
RobustStatDetector多突变点检测
RobustStatDetector是一种变点检测算法,类似于CUSUMDetector,用于发现均值变化。其工作方式如下:
使用移动平均法对时间序列进行平滑处理;
计算平滑时间序列中一定数量的数据点(由detector方法中的comparison_window参数指定)之间的差异;
计算差异的z-scores和p-values。返回p-value小于预定阈值的点(由detector方法中的p_value_cutoff参数指定);
与CUSUMDetector不同,RobustStatDetector可以在单次运行中检测多个变点。
RobustStatDetector具体可调试参数(标黄重点调试):
参数key | 含义 | 默认值/类型 |
---|---|---|
p_value_cutoff | 标记变点的p-value阈值 | 1e-2/float |
smoothing_window_size | 平滑窗口的长度 | 5/int |
comparison_window | diff函数的步长,即算法查看多少数据点来进行比较。 | -2/int |
检测脚本
xxxxxxxxxx
def RobustStatDetectorMean(startDs, data):
orders = [54,59,72, 86, 177, 72, 76, 58, 45, 63, 55, 218, 217, 39, 17, 31, 31, 17, 16, 24, 27, 22, 28, 20, 29, 21, 25, 30, 20, 74, 145, 89, 30, 426, 53, 645, 542, 938, 171, 952, 780, 952, 1006, 914, 512, 596, 655, 610, 786, 952, 1198, 142, 128, 66, 87, 100, 74, 64, 106, 85, 86, 106]
# orders_one_m = [89, 30, 426, 53, 645, 542, 938, 171, 952, 780, 952, 1006, 914, 512, 596, 655, 610, 786, 952, 1198, 142, 128, 66, 87, 100, 74, 64, 106, 85, 86, 106]
# np.random.seed(10)
orders_one_m = data
series = pd.Series(orders_one_m)
df_increase_decrease = pd.DataFrame(
{
'time': pd.date_range(startDs, periods=len(series)),
'value':series
# 'increase':series,
# 'decrease':series
}
)
print('BOCPDetector input:',df_increase_decrease)
tsd = TimeSeriesData(df_increase_decrease)
detector = RobustStatDetector(tsd)
change_points = detector.detector(p_value_cutoff = 5e-3, comparison_window=2)
print('RobustStatDetector 突变点监测:',change_points)
return changePToDict(change_points,'TimeSeriesChangePoint')
# 转为字典
def changePToDict(changepoints,dictType=None):
cp_dict_list = []
if dictType is None or dictType == '':
for cp in changepoints:
happenTime = cp.start_time + timedelta(days=1)
cp_dict = {
'start_time': cp.start_time,
'end_time': cp.end_time,
'happen_time': happenTime.strftime('%Y%m%d'),
'confidence': cp.confidence,
'direction': cp.direction,
'delta': cp.delta,
'regression_detected': str(cp.regression_detected),
'stable_changepoint': str(cp.stable_changepoint),
'mu0': cp.mu0,
'mu1': cp.mu1,
'llr': cp.llr,
'llr_int': cp.llr_int,
'p_value': cp.p_value
,'p_value_int': cp.p_value_int
}
cp_dict_list.append(cp_dict)
elif dictType == 'TimeSeriesChangePoint':
for cp in changepoints:
# 返回的是timestamp类型
happenTime = pd.to_datetime(cp.start_time) + timedelta(days=1)
cp_dict = {
'start_time': pd.to_datetime(cp.start_time).strftime('%Y%m%d'),
'end_time': pd.to_datetime(cp.end_time).strftime('%Y%m%d'),
'happen_time': happenTime.strftime('%Y%m%d'),
'confidence': cp.confidence
}
cp_dict_list.append(cp_dict)
print('changePToDict:',cp_dict_list)
return json.dumps(cp_dict_list)
OutlierDetector离群点检测
Kats提供了OutlierDetector模块来检测时间序列中的异常值。由于异常值可能会在下游处理中引起很多问题,因此能够检测它们非常重要。OutlierDetector还提供了处理或删除找到的异常值的功能。
异常值检测算法如下:
对输入时间序列进行季节分解,使用指定的加法或乘法分解(默认为加法)。
通过仅删除趋势或删除趋势和季节性(如果季节性很强)来生成残差时间序列。
.检测残差中超出3倍四分位距的点。可以使用OutlierDetector中的iqr_mult参数调整此乘数。
OutlierDetector具体可调试参数(标黄重点调试):
参数key | 含义 | 默认值/类型 |
---|---|---|
decomp | 加法或乘法 | 'additive'/str |
iqr_mult | 四分位距上的乘数用于分类异常值 | 3.0/float |
检测脚本
xxxxxxxxxx
def OutlierDetectorDecomp(startDs, data):
orders_one_m = data
series = pd.Series(orders_one_m)
df_increase_decrease = pd.DataFrame(
{
'time': pd.date_range(startDs, periods=len(series)),
'value':series
}
)
print('TimeSeriesData data:',df_increase_decrease)
tsd = TimeSeriesData(df_increase_decrease)
ts_outlierDetection = OutlierDetector(tsd, 'additive')
ts_outlierDetection.detector()
print('OutlierDetectorDecomp 离群点:',ts_outlierDetection.outliers[0])
return changePToDict(ts_outlierDetection.outliers[0], 'time_series')
MultivariateAnomalyDetector跨时序检测
MultivariateAnomalyDetector异常检测方法非常适用于跨多个时间序列检测异常。异常是基于与预测稳定状态行为的偏差检测出来的。通过使用向量自回归(VAR)模型对时间序列之间的线性相互依赖关系进行建模,可以预测指标系统的稳定状态行为。这种方法尤其适用于检测多变量异常 - 即在多个时间序列中持续存在的小异常。
检测脚本
xxxxxxxxxx
def MultivariateAnomalyDetectorCrossMultiSeries(startDs, data):
orders_one_m = data
series = pd.Series(orders_one_m)
df_increase_decrease = pd.DataFrame(
{
'time': pd.date_range(startDs, periods=len(series)),
'value':series
}
)
print('TimeSeriesData data:',df_increase_decrease)
multi_anomaly_ts = TimeSeriesData(df_increase_decrease)
params = VARParams(maxlags=2)
d = MultivariateAnomalyDetector(multi_anomaly_ts, params, training_days=60)
anomaly_score_df = d.detector()
print('MultivariateAnomalyDetector anomaly_score_df:',anomaly_score_df)
# 检测
anomalies = d.get_anomaly_timepoints(alpha=0.05)
print('MultivariateAnomalyDetector 离群点:',anomalies)
检测结果比对
算法参数
详情见脚本:kats2_1.py(直营爆品近30天口径)
策略
Base策略:基于置信度confidence倒序取大于0.7的检测结果
策略1:各检测算法结果的交集/并集
策略2:部分检测算法结果的并集+部分检测算法结果的交集
数据&分析
商品id | 检测结果 | 解析 |
---|---|---|
628032051803 | ![]() | CUSUMDetector单突变点检测出的日期:0611; CUSUMDetector多突变点检测出日期:0611,0621;OutlierDetector离群点检测出0609、0611 |
610295940401 | ![]() | CUSUMDetector单突变点检测出的日期:0703; CUSUMDetector多突变点检测出日期:0619,0621;RobustStatDetector离群点检测出0628 |
620189523811 | ![]() | CUSUMDetector多突变点检测出日期:0630;RobustStatDetector离群点检测出0630、0702、0703 |
生产检测链路
核心脚本
x
# kats算法依赖的lib压缩后超过100M限制,排除掉pyODPS自带的基础包后压缩如果不满足要求,可以将其他包通过系统path引入
load_resource_package("kats-deps_01.tar.gz")
load_resource_package("kats-deps_02_exclude.tar.gz")
load_resource_package("kats-deps_03_exclude.tar.gz")
import sys
import os
import platform
print("Python版本:", platform.python_version())
# 必须解压statsmodels包,否则使用不了so链接库
Pillow_archive_file = 'http://xxx.com/scheduler/res?id=265642211'
Pillow_archive_file_dir = os.path.dirname(Pillow_archive_file)
os.system('unzip ' + Pillow_archive_file + ' -d ' + Pillow_archive_file_dir)
sys.path.append(Pillow_archive_file_dir)
# 必须解压statsmodels包,否则使用不了so链接库
Pillow_archive_file2 = 'http://xxx.com/scheduler/res?id=264591628'
Pillow_archive_file_dir = os.path.dirname(Pillow_archive_file2)
os.system('unzip ' + Pillow_archive_file2 + ' -d ' + Pillow_archive_file_dir)
sys.path.append(Pillow_archive_file_dir)
# kats包
libs = ['http://xxx.com/scheduler/res?id=265937401'
# dateutil包
,'http://xxx.com/scheduler/res?id=266111495'
# ax-platform
# typeguard
,'http://xxx.com/scheduler/res?id=264843190'
# botorch
,'http://xxx.com/scheduler/res?id=264840378'
]
lib_path = [ os.path.abspath(f) for f in libs ]
print('lib_path:', lib_path)
# 不包含动态库的zip可以直接往sys.path中添加依赖
for f in lib_path:
sys.path.append(f)
import numpy as np
import pandas as pd
import pyarrow as pa
# pyodps已有的dateutil包是2.5.3而上传的matplotlib需要依赖dateutil2.7版本及以上才可以,重载方式可解决
import importlib
import dateutil
importlib.reload(dateutil)
# import statsmodels.tsa.exponential_smoothing
# print('exponential_smoothing:',statsmodels.tsa.exponential_smoothing)
# import statsmodels.tsa.exponential_smoothing._ets_smooth
# print('_ets_smooth:',statsmodels.tsa.exponential_smoothing._ets_smooth)
# 拷贝libta到/usr目录(只引入到syspath测试无用)
from odps.udf import annotate
from odps.distcache import get_cache_archive
from odps.distcache import get_cache_file
from odps import options
from odps import ODPS
def include_file(file_name):
import os, sys
so_file = get_cache_file(file_name)
with open(so_file.name, 'rb') as fp:
content=fp.read()
so = open("/usr/lib64/libta_lib.so.0", "wb")
so.write(content)
so.flush()
so.close()
# 读入odps表,读入数据已经全部预处理完毕,该模块本身不进行校验
def read_table(table_name, cols, max_num=1e10, ptn=None):
if ptn is not None:
# ps:get_table()方法是有获取1w条数据的限制
record_iterator = o.read_table(table_name, partition=ptn, timeout=6000)
else :
record_iterator = o.read_table(table_name, timeout=6000)
datas = {}
for cur_record in record_iterator:
for col in cols:
tmp = datas.get(col,[])
if col in ("item_id", "periods_cnt", ):
tmp.append(int(cur_record[col]))
else :
tmp.append(cur_record[col])
datas[col] = tmp
return datas
# 检测结果写入odps表
def write_table(records, out_table, cols, ptn=None):
t = o.get_table(out_table)
if len(records) < 1:
return
print('write_table onecase:', len(records),records[0])
if ptn is not None:
t.delete_partition('ds=' + ptn, if_exists=True) # 存在的时候才删除
with t.open_writer(partition='ds=' + ptn, create_partition=True,arrow=True) as writer:
df = pd.DataFrame(records, columns=cols)
# 写入 RecordBatch
batch = pa.RecordBatch.from_pandas(df)
writer.write(batch)
else:
t.truncate()
with t.open_writer() as writer:
writer.write(records)
# include_file('_ets_smooth.cpython-37m-x86_64-linux-gnu.so')
# import ax
# import torch
# import kats.utils.time_series_parameter_tuning as tpt
from kats.consts import TimeSeriesData
from kats.detectors.cusum_detection import CUSUMDetector
from kats.detectors.robust_stat_detection import RobustStatDetector
# 离群点
from kats.detectors.outlier import OutlierDetector
from kats.detectors.outlier import MultivariateAnomalyDetector, MultivariateAnomalyDetectorType
from kats.detectors.bocpd import BOCPDetector, BOCPDModelType, TrendChangeParameters
from kats.models.var import VARParams
import json
from datetime import datetime, timedelta
# from kats.models.anomaly_detector import AnomalyDetector
# from kats.detectors import level_shift_detector
def toJson():
data = {
'name': 'Alice',
'age': 18,
'gender': 'female'
}
# 指定编码格式
json_str = json.dumps(data, ensure_ascii=False)
print('toJson',json_str)
def testDetection():
# from kats.consts import TimeSeriesData
# from kats.models.prophet import ProphetModel
print('testDetectionDs',args['bizdate'])
# simulate time series with increase
print('======222====')
np.random.seed(10)
df_increase = pd.DataFrame(
{
'time': pd.date_range('2019-01-01', '2019-03-01'),
'increase':np.concatenate([np.random.normal(1,0.2,30), np.random.normal(2,0.2,30)]),
}
)
print('df_increase',df_increase)
# convert to TimeSeriesData object
timeseries = TimeSeriesData(df_increase)
# run detector and find change points
change_points = CUSUMDetector(timeseries).detector()
print('change_points',change_points)
def testOrderDetection():
# 假设orders是一个列表,包含每天的订单数量
orders = [10, 15, 14, 17, 12, 1, 25, 28, 30, 14, 18, 11, 15, 16, 19, 22, 23, 25, 21, 20, 30, 35, 40, 50]
# 将数据转换为TimeSeriesData格式
# data = TimeSeriesData(time=pd.date_range(start='2021-01-01', periods=len(orders), freq='D'),
# value=orders)
# 将列表转换为pandas.Series
series = pd.Series(orders)
# 将数据转换为TimeSeriesData格式
data = TimeSeriesData(time=pd.date_range(start='20230701', periods=len(series), freq='D'),
value=series)
print('TimeSeriesData data:',data)
# 创建异常检测器对象
# detector = AnomalyDetector(data=data, granularity='day', threshold=3.0)
# 创建CUSUMDetector对象,并设置阈值为2.0
detector = CUSUMDetector(data)
# 进行异常检测
change_points = detector.detector()
# 输出结果
print('testOrderDetection',change_points)
def CUSUMDetectorMean(startDs, data):
# synthesize data with simulation
# orders = [54,59,72, 86, 177, 72, 76, 58, 45, 63, 55, 218, 217, 39, 17, 31, 31, 17, 16, 24, 27, 22, 28, 20, 29, 21, 25, 30, 20, 74, 145, 89, 30, 426, 53, 645, 542, 938, 171, 952, 780, 952, 1006, 914, 512, 596, 655, 610, 786, 952, 1198, 142, 128, 66, 87, 100, 74, 64, 106, 85, 86, 106]
# orders_one_m = [89, 30, 426, 53, 645, 542, 938, 171, 952, 780, 952, 1006, 914, 512, 596, 655, 610, 786, 952, 1198, 142, 128, 66, 87, 100, 74, 64, 106, 85, 86, 106]
orders_one_m = data
# np.random.seed(10)
series = pd.Series(orders_one_m)
df_increase_decrease = pd.DataFrame(
{
'time': pd.date_range(startDs, periods=len(series)),
'increase':series,
'decrease':series,
}
)
print('TimeSeriesData data:',df_increase_decrease)
# 突变点监测1
# tsd = TimeSeriesData(df_increase_decrease.loc[:,['time','increase']])
# detector = CUSUMDetector(tsd)
# change_points = detector.detector(change_directions=["increase"], max_iter=20,return_all_changepoints=True)
# print('CUSUMDetector 突变点监测1:',change_points)
kv_multi_cp_ts = df_increase_decrease.to_dict(orient='records')
for d in kv_multi_cp_ts:
d['time'] = d['time'].strftime('%Y%m%d')
new_kv_multi_cp_ts = {d['time']: d['decrease'] for d in kv_multi_cp_ts}
# 突变点监测2
tsd = TimeSeriesData(df_increase_decrease.loc[:,['time','decrease']])
detector = CUSUMDetector(tsd)
change_points = detector.detector(change_directions=["decrease"],max_iter=10,return_all_changepoints=True)
print('CUSUMDetector 突变点监测decrease:',change_points)
# detector.plot(change_points)
return changePToDict(new_kv_multi_cp_ts, change_points)
# plt.xticks(rotation=45)
# detector.plot(change_points)
# plt.show()
def CUSUMDetectorWithMultipleChangeP(startDs, data):
orders = [54,59,72, 86, 177, 72, 76, 58, 45, 63, 55, 218, 217, 39, 17, 31, 31, 17, 16, 24, 27, 22, 28, 20, 29, 21, 25, 30, 20, 74, 145, 89, 30, 426, 53, 645, 542, 938, 171, 952, 780, 952, 1006, 914, 512, 596, 655, 610, 786, 952, 1198, 142, 128, 66, 87, 100, 74, 64, 106, 85, 86, 106]
# orders_one_m = [89, 30, 426, 53, 645, 542, 938, 171, 952, 780, 952, 1006, 914, 512, 596, 655, 610, 786, 952, 1198, 142, 128, 66, 87, 100, 74, 64, 106, 85, 86, 106]
orders_one_m = data
# np.random.seed(10)
series = pd.Series(orders_one_m)
df_multi_cps = pd.DataFrame(
{
'time': pd.date_range(startDs, periods=len(series)),
'y':series
}
)
multi_cp_ts = TimeSeriesData(df_multi_cps)
kv_multi_cp_ts = df_multi_cps.to_dict(orient='records')
for d in kv_multi_cp_ts:
d['time'] = d['time'].strftime('%Y%m%d')
new_kv_multi_cp_ts = {d['time']: d['y'] for d in kv_multi_cp_ts}
print('CUSUMDetectorWithMultipleChangePTest input:',new_kv_multi_cp_ts)
n = len(df_multi_cps)
historical_window = 0
scan_window = n - 1 if n < 10 else 10
step = 5
changepoints = []
print('scan_window:',scan_window)
# for end_idx in range(0, n, step):
for end_idx in range(historical_window + scan_window, n, step):
tsd = multi_cp_ts[end_idx - (historical_window + scan_window) : end_idx]
# tsd = multi_cp_ts[end_idx : end_idx + step]
print('多窗口输入检测:',end_idx,tsd)
changepoints += CUSUMDetector(tsd).detector(change_directions=["decrease"],interest_window=[historical_window, historical_window + scan_window],max_iter=10,return_all_changepoints=True)
print('CUSUMDetector 多异常点检测:',changepoints)
return changePToDict(new_kv_multi_cp_ts, changepoints)
# 转为字典
def changePToDict(series_kv,changepoints,dictType=None):
cp_dict_list = []
if dictType is None or dictType == 'CUSUMChangePoint':
for cp in changepoints:
happenTime = cp.start_time + timedelta(days=1)
startDs = cp.start_time.strftime('%Y%m%d')
happenDs = happenTime.strftime('%Y%m%d')
print('changePToDict:',series_kv[startDs],series_kv[happenDs])
if happenDs > args['bizdate']:
continue
c = cp.confidence
cp_dict = {
# 'start_time': pd.to_datetime(cp.start_time).strftime('%Y%m%d'),
# 'end_time': pd.to_datetime(cp.end_time).strftime('%Y%m%d'),
'happen_time': happenDs,
'confidence': cp.confidence,
'direction': cp.direction,
'delta': cp.delta,
'regression_detected': str(cp.regression_detected),
'stable_changepoint': str(cp.stable_changepoint),
'mu0': cp.mu0,
'mu1': cp.mu1,
'llr': cp.llr,
'llr_int': cp.llr_int,
'p_value': cp.p_value
,'p_value_int': cp.p_value_int
}
# 约束条件
if c > 0.7 and series_kv[happenDs] < series_kv[startDs]:
cp_dict_list.append(cp_dict)
elif dictType == 'TimeSeriesChangePoint':
for cp in changepoints:
# 返回的是timestamp类型
startDs = pd.to_datetime(cp.start_time).strftime('%Y%m%d')
happenTime = pd.to_datetime(cp.start_time) + timedelta(days=1)
happenDs = happenTime.strftime('%Y%m%d')
print('TimeSeriesChangePoint:',series_kv,series_kv[startDs],series_kv[happenDs])
cp_dict = {
# 'start_time': pd.to_datetime(cp.start_time).strftime('%Y%m%d'),
# 'end_time': pd.to_datetime(cp.end_time).strftime('%Y%m%d'),
'happen_time': happenDs,
'confidence': cp.confidence
}
if happenDs > args['bizdate']:
continue
if series_kv[happenDs] < series_kv[startDs]:
cp_dict_list.append(cp_dict)
elif dictType == 'time_series':
for cp in changepoints:
startDs = pd.to_datetime(cp).strftime('%Y%m%d')
happenTime = pd.to_datetime(cp) + timedelta(days=1)
happenDs = happenTime.strftime('%Y%m%d')
if happenDs > args['bizdate']:
continue
print('time_series:',series_kv,series_kv[startDs],series_kv[happenDs])
if series_kv[happenDs] < series_kv[startDs]:
cp_dict_list.append({
'happen_time': happenDs
})
print('changePToDict:',cp_dict_list)
return json.dumps(cp_dict_list)
def BOCPDetectorAM(startDs, data):
# from kats.utils.simulator import Simulator
# sim = Simulator(n=450, start='2020-01-01', freq='H')
# ts_bocpd = sim.level_shift_sim(noise=0.05, seasonal_period=1)
orders_one_m = data
series = pd.Series(orders_one_m)
df = pd.DataFrame(
{
'time': pd.date_range(startDs, periods=len(series)),
'value':series
# 'increase':series,
# 'decrease':series
}
)
ts_bocpd = TimeSeriesData(df)
print('BOCPDetector input:',ts_bocpd)
# Initialize the detector
detector = BOCPDetector(ts_bocpd)
changepoints = detector.detector()
print('BOCPDetector input:',changepoints)
def RobustStatDetectorMean(startDs, data):
orders = [54,59,72, 86, 177, 72, 76, 58, 45, 63, 55, 218, 217, 39, 17, 31, 31, 17, 16, 24, 27, 22, 28, 20, 29, 21, 25, 30, 20, 74, 145, 89, 30, 426, 53, 645, 542, 938, 171, 952, 780, 952, 1006, 914, 512, 596, 655, 610, 786, 952, 1198, 142, 128, 66, 87, 100, 74, 64, 106, 85, 86, 106]
# orders_one_m = [89, 30, 426, 53, 645, 542, 938, 171, 952, 780, 952, 1006, 914, 512, 596, 655, 610, 786, 952, 1198, 142, 128, 66, 87, 100, 74, 64, 106, 85, 86, 106]
# np.random.seed(10)
orders_one_m = data
series = pd.Series(orders_one_m)
df_increase_decrease = pd.DataFrame(
{
'time': pd.date_range(startDs, periods=len(series)),
'value':series
# 'increase':series,
# 'decrease':series
}
)
print('BOCPDetector input:',df_increase_decrease)
kv_multi_cp_ts = df_increase_decrease.to_dict(orient='records')
for d in kv_multi_cp_ts:
d['time'] = d['time'].strftime('%Y%m%d')
new_kv_multi_cp_ts = {d['time']: d['value'] for d in kv_multi_cp_ts}
tsd = TimeSeriesData(df_increase_decrease)
detector = RobustStatDetector(tsd)
change_points = detector.detector(p_value_cutoff = 5e-3, comparison_window=2)
print('RobustStatDetector 突变点监测:',change_points)
return changePToDict(new_kv_multi_cp_ts, change_points,'TimeSeriesChangePoint')
def OutlierDetectorDecomp(startDs, data):
# orders = [54,59,72, 86, 177, 72, 76, 58, 45, 63, 55, 218, 217, 39, 17, 31, 31, 17, 16, 24, 27, 22, 28, 20, 29, 21, 25, 30, 20, 74, 145, 89, 30, 426, 53, 645, 542, 938, 171, 952, 780, 952, 1006, 914, 512, 596, 655, 610, 786, 952, 1198, 142, 128, 66, 87, 100, 74, 64, 106, 85, 86, 106]
# orders_one_m = [89, 30, 426, 53, 645, 542, 938, 171, 952, 780, 952, 1006, 914, 512, 596, 655, 610, 786, 952, 1198, 142, 128, 66, 87, 100, 74, 64, 106, 85, 86, 106]
# np.random.seed(10)
orders_one_m = data
series = pd.Series(orders_one_m)
df_increase_decrease = pd.DataFrame(
{
'time': pd.date_range(startDs, periods=len(series)),
'value':series
}
)
print('TimeSeriesData data:',df_increase_decrease)
kv_multi_cp_ts = df_increase_decrease.to_dict(orient='records')
for d in kv_multi_cp_ts:
d['time'] = d['time'].strftime('%Y%m%d')
new_kv_multi_cp_ts = {d['time']: d['value'] for d in kv_multi_cp_ts}
tsd = TimeSeriesData(df_increase_decrease)
ts_outlierDetection = OutlierDetector(tsd, 'additive')
ts_outlierDetection.detector()
print('OutlierDetectorDecomp 离群点:',ts_outlierDetection.outliers[0])
return changePToDict(new_kv_multi_cp_ts, ts_outlierDetection.outliers[0], 'time_series')
# 检测跨多个时间序列的异常
def MultivariateAnomalyDetectorCrossMultiSeries(startDs, data):
orders_one_m = data
series = pd.Series(orders_one_m)
df_increase_decrease = pd.DataFrame(
{
'time': pd.date_range(startDs, periods=len(series)),
'value':series
}
)
print('TimeSeriesData data:',df_increase_decrease)
multi_anomaly_ts = TimeSeriesData(df_increase_decrease)
params = VARParams(maxlags=2)
d = MultivariateAnomalyDetector(multi_anomaly_ts, params, training_days=60)
anomaly_score_df = d.detector()
print('MultivariateAnomalyDetector anomaly_score_df:',anomaly_score_df)
# 检测
anomalies = d.get_anomaly_timepoints(alpha=0.05)
print('MultivariateAnomalyDetector 离群点:',anomalies)
# 离群点
# ts_outlierDetection.outliers[0]
# print('OutlierDetector 离群点监测:',ts_outlierDetection,ts_outlierDetection.outliers[0])
if __name__ == '__main__':
# testDetection()
# testOrderDetection()
CUSUMDetectorMean('20230602',[301, 444, 245, 216, 231, 204, 180, 181, 201, 182, 210, 213, 240, 530, 499, 506, 938, 527, 694, 242, 393, 454, 506, 406, 304, 316, 277, 256, 236, 238, 247])
# res1 = CUSUMDetectorWithMultipleChangeP('20230604',[258, 206, 252, 194, 10])
# print('CUSUMDetectorWithMultipleChangeP', res1)
# RobustStatDetectorMean('20230602',[301, 444, 245, 216, 231, 204, 180, 181, 201, 182, 210, 213, 240, 530, 499, 506, 938, 527, 694, 242, 393, 454, 506, 406, 304, 316, 277, 256, 236, 238, 247])
# OutlierDetectorDecomp('20230604',[709, 679, 651, 560, 492, 435, 579, 789, 644, 579, 449, 560, 624, 544, 683, 346, 300, 267, 367, 381, 371, 346, 321, 336, 312, 4815, 454, 309, 405, 356])
# MultivariateAnomalyDetectorCrossMultiSeries('20230602',[301, 444, 245, 216, 231, 204, 180, 0, 0, 0, 210, 213, 240, 530, 499, 506, 938, 527, 694, 242, 393, 454, 506, 406, 304, 316, 277, 256, 236, 238, 247])
# toJson()
# BOCPDetectorAM('20230602',[301, 444, 245, 216, 231, 204, 180, 181, 201, 182, 210, 213, 240, 530, 499, 506, 938, 527, 694, 242, 393, 454, 506, 406, 304, 316, 277, 256, 236, 238, 247])
# 数据源定义
input_tables = {
"input_total": "huopin_data.item_ord_detection_kats_algo_input"
}
output_tables = {
"detection_result": "huopin_data.item_ord_detection_kats_algo_res_opt"
}
# 读取数据源
# read input info,output ini_dict like {col1:[val1_1,val1_2,...],col2:[val2_1,val2_2,...],...}
ini_input_dict = read_table(input_tables["input_total"]
,cols=[
"item_id"
,"start_ds"
,"ord_arr"
,"extend_json"
],
ptn='ds='+args['bizdate'])
print('ini_input_dict len:',len(ini_input_dict['item_id']))
detection_res = []
for i in range(len(ini_input_dict['item_id'])):
itemId = ini_input_dict['item_id'][i]
res_0 = CUSUMDetectorMean(ini_input_dict['start_ds'][i],ini_input_dict['ord_arr'][i])
res_1 = CUSUMDetectorWithMultipleChangeP(ini_input_dict['start_ds'][i],ini_input_dict['ord_arr'][i])
res_3 = RobustStatDetectorMean(ini_input_dict['start_ds'][i],ini_input_dict['ord_arr'][i])
res_4 = OutlierDetectorDecomp(ini_input_dict['start_ds'][i],ini_input_dict['ord_arr'][i])
detection_res.append([itemId, res_0, res_1,'[]',res_3,res_4, '[]'])
# 输出结果
write_table(detection_res, output_tables['detection_result'],['item_id','detection_res_of_cusum','detection_res_of_cusum_window','detection_res_of_bocp','detection_res_of_robuststat','detection_res_of_outlier','detection_res_of_var'],args['bizdate'])
# print('======end=====')