Kyligence AI 服务 - 让大模型完成准确、可靠的数值计算和回答! 立即了解更多
AI 数智助理
Kyligence Zen Kyligence Zen
Kyligence Enterprise Kyligence Enterprise
Kyligence Turbo Kyligence Turbo
指标平台解决方案
OLAP 解决方案
行业解决方案
客户总览
金融
零售
制造
医药
其他
云平台
BI
寻求合作
资源
Kyligence Enterprise
Kyligence Zen
培训
Apache Kylin
Byzer
Gluten
博客
关于
市场活动
本文讲解了时间序列数据异常值检测的基本概念和在 Kylin 中开发使用异常值检测 UDF 的方法,可以作为其他 UDF 开发的参考。
通过在 Kylin 中移植 Hivemall 的 UDF,我们可以充分利用 Kylin 的优势,减少直接使用 Hivemall 过程中的数据加工、存储等繁杂步骤的工作量,提升用户的查询体验。本文使用的验证环境为 Kylin 2.6.3。
时间序列数据是聚合数据中的一种重要类别,数据分析人员经常需要使用各种方法从不同角度对聚合得到的时间序列数据进行挖掘,异常值检测(Anomaly Detection)就是其中的一种常见方法。异常值检测的主要目标是从时间序列数据中区分出与预期的正常值不符的值[1],如离群值(outlier)和突变点(change-point)等,这些值往往具有比较高的关注价值,是分析人员在进行业务分析时需要重点关注的对象。
时间序列数据的异常值检测具有广泛的应用场景,例如:应用在一般的商业领域中,有助于定位生产销售中的异常波动;应用在运维中,有助于迅速发现故障;应用在医学上,有助于医生做出诊断,等等。
使用传统方法在大数据集上进行异常值检测存在效率低、不够灵活等问题,因此就有人尝试引入 Hive,通过对 Hive 进行扩展来解决这些问题。例如,Apache 孵化项目 Hivemall (http://hivemall.incubator.apache.org)为 Hive 提供了大量的数据分析 UDF(User-defined Function,用户定义函数)作为扩展,其中就包括一个用于进行异常值检测的函数 changefinder。changefinder 函数实现了 ChangeFinder 算法[2],提供一维和二维数据浮点数据的离群值和突变点检测功能。
Hivemall 在官网中提供了一个使用示例数据(https://github.com/apache/incubator-hivemall/blob/master/core/src/test/resources/hivemall/anomaly/twitter.csv.gz?raw=true,这是一组来自 Twitter 的时间序列数据)进行异常值检测的例子,例子中的查询语句和查询结果片段如下:
SELECT num, changefinder(value, "-outlier_threshold 0.03 -changepoint_threshold 0.0035") AS resultFROM timeseriesORDER BY num ASC;
SELECT
num,
changefinder(value, "-outlier_threshold 0.03 -changepoint_threshold 0.0035") AS result
FROM
timeseries
ORDER BY num ASC
;
Apache Kylin 是一个基于 Hadoop 平台的 OLAP 引擎,它采用预计算的理念,预先对查询中可能使用到的表和维度进行关联、聚合,使得后续的查询可以直接使用预计算的结果。预计算结果被存储下来 ,通过消耗存储空间换取更快速的查询响应,即所谓的空间换时间。Hivemall 基于 Hive 大幅提升了在大数据集上进行异常值检测的效率和灵活性,但在分析聚合数据的场景下,以其查询效率仍然无法实现实时交互式分析,因此可以考虑通过引入 Kylin 来解决这个问题。
与 Hive 等类似,Kylin 也支持 UDF,允许用户对查询中需要使用的函数进行一定的扩展,下面我们就尝试通过创建 UDF 的方式,在 Kylin 中引入 Hivemall 的一维 changefinder 函数。
创建 Kylin 的 UDF 主要有两个步骤:
在动手之前,我们先简要考察一下 ChangeFinder 算法的原理和它在 Hivemall 中的实现。ChangeFinder 是一种基于自回归模型的算法,包括分别检测离群值和突变点的两大阶段,每个阶段又可以细分为更新自回归模型和计算异常分值两个步骤。这其中两个阶段的计算过程是几乎相同的,输出分别是离群分值和突变分值,主要区别在于输入不同,突变点检测阶段以离群值检测阶段的输出作为输入。
ChangeFinder 算法定义了时间序列上的自回归模型,并设计了用于估计模型参数的算法,称为 Sequentially Discounting AR Model Learning 算法,简称 SDAR 算法。SDAR 算法有两个参数,分别是模型的阶数和衰减系数,序列数据保存在长度为的 RingBuffer 中,经 SDAR 算法计算就可以得到该序列对应自回归模型的参数,这部分在一维数据上的实现在 hivemall.anomaly.SDAR1D 类中。
有了这些参数,就可以使用模型的概率密度函数进行分值的计算,Hivemall 中提供了海林格距离和对数两种计算方式,其中默认使用的是海林格距离。此外,ChangeFinder 算法在突变点检测阶段的开始和结束时会分别对输入的离群分值和输出的突变分值进行窗口平滑,这就需要额外的两个输入参数,即窗口大小T1和 T2。
static double evalScoreY(DoubleRingBuffer xRing, SDAR1D sdar1, DoubleRingBuffer yRing, SDAR1D sdar2, DoubleRingBuffer outlierScores, DoubleRingBuffer changepointScores, double x, int k) { //计算离群分值 double scoreX = evalScoreX(xRing, sdar1, x, k); //第一次平滑 double y = smoothing(outlierScores.add(scoreX)); //计算突变分值 double[] ySeries = new double[k + 1]; yRing.add(y).toArray(ySeries, false); int k2 = yRing.size() - 1; double y_hat = sdar2.update(ySeries, k2); double lossY = (k2 == 0D) ? 0D : hellingerLoss(sdar2); //第二次平滑 double scoreY = smoothing(changepointScores.add(lossY)); return scoreY;}
static double evalScoreY(DoubleRingBuffer xRing,
SDAR1D sdar1,
DoubleRingBuffer yRing,
SDAR1D sdar2,
DoubleRingBuffer outlierScores, DoubleRingBuffer changepointScores,
double x, int k) {
//计算离群分值
double scoreX = evalScoreX(xRing, sdar1, x, k);
//第一次平滑
double y = smoothing(outlierScores.add(scoreX));
//计算突变分值
double[] ySeries = new double[k + 1];
yRing.add(y).toArray(ySeries, false);
int k2 = yRing.size() - 1;
double y_hat = sdar2.update(ySeries, k2);
double lossY = (k2 == 0D) ? 0D : hellingerLoss(sdar2);
//第二次平滑
double scoreY = smoothing(changepointScores.add(lossY));
return scoreY;
}
了解了上面这些信息,就可以开始动手移植了,动手过程中需要解决几个问题:
首先是 UDF 的类型问题。Kylin 的 SQL 支持来自另一个 Apache 开源项目 Calcite(http://calcite.apache.org),Calcite支持的 UDF 除最基本的单行映射的函数外,还有聚合函数和窗口聚合函数两种,即 UDAF 和 Window UDAF。
不同类型的 UDF,其类定义和在 SQL 中的使用方法也不尽相同,聚合函数将整列数据聚合成一个,如 count、sum 等,窗口聚合函数则是与 over 子句一起使用的聚合函数。从上面考察的 changefinder 算法的原理来看,这里我们应该选择窗口聚合函数,这样就需要在类中定义 5 个方法:init、add、merge、remove result (http://calcite.apache.org/docs/adapter.html,Extensibility 小节),并定义一个类,用于储存异常分值计算的中间结果。
其次,要解决函数拆分的问题。Hivemall 中的 changefinder 函数将离群值和突变点的检测放在了一起,以 json 的形式输出计算结果,我们可以将它拆分成两个 UDF,即定义两个类 OutlierWindowUDF 和 ChangePointWindowUDF,分别用于计算离群分值和突变分值,以方便进一步的分析。以 changepoint 函数为例,拆分后的主要计算过程如下:
static double evalScoreY(DoubleRingBuffer xRing, SDAR1D sdar1, DoubleRingBuffer yRing, SDAR1D sdar2, DoubleRingBuffer outlierScores, DoubleRingBuffer changepointScores, double x, int k) { //计算离群分值 double scoreX = evalScoreX(xRing, sdar1, x, k); //第一次平滑 double y = smoothing(outlierScores.add(scoreX)); //计算突变分值 double[] ySeries = new double[k + 1]; yRing.add(y).toArray(ySeries, false); int k2 = yRing.size() - 1; double y_hat = sdar2.update(ySeries, k2); double lossY = (k2 == 0D) ? 0D : hellingerLoss(sdar2); //第二次平滑 double scoreY = smoothing(changepointScores.add(lossY)); return scoreY;
接下来要解决数据结构初始化的问题。Hive 允许 UDF 在进行计算之前使用传入的参数进行初始化,但 Calcite 的 UDF 并不支持带参数的初始化,因此需要在计算第一行时使用、等参数进行初始化,OutlierWindowUDF 需要初始化 1 个 DoubleRingBuffer 变量和 1 个 SDAR1D 变量,而 ChangePointWindowUDF 需要初始化 4 个 DoubleRingBuffer 变量和 2 个 SDAR1D 变量。Calcite 聚合函数类型的 UDF 需要使用一个用户定义的类作为 Accumulator,因此我们可以定义一个内部类,用于存储计算过程中所需的各种变量,以 outlier 函数为例:
class OutlierWindowStatus { DoubleRingBuffer xRing; SDAR1D sdar1; double result; boolean initialized; OutlierWindowStatus() { xRing = null; sdar1 = null; result = 0D; initialized = false; } void initialize(int k, double r) { xRing = new DoubleRingBuffer(k + 1); sdar1 = new SDAR1D(r, k); initialized = true; } void setResult(double result) { this.result = result; }}public OutlierWindowStatus init() { return new OutlierWindowStatus();}public OutlierWindowStatus add(OutlierWindowStatus ows, BigDecimal x, int k, BigDecimal r1) { if (!ows.initialized) { ows.initialize(k, r1.doubleValue()); } ows.setResult(evalScoreX(ows.xRing, ows.sdar1, x.doubleValue(), k)); return ows;}public OutlierWindowStatus merge(OutlierWindowStatus ows1, OutlierWindowStatus ows2) { return ows1;}public OutlierWindowStatus remove(OutlierWindowStatus ows, double x) { return ows;}public double result(OutlierWindowStatus ows) { return ows.result;}
class OutlierWindowStatus {
DoubleRingBuffer xRing;
SDAR1D sdar1;
double result;
boolean initialized;
OutlierWindowStatus() {
xRing = null;
sdar1 = null;
result = 0D;
initialized = false;
void initialize(int k, double r) {
xRing = new DoubleRingBuffer(k + 1);
sdar1 = new SDAR1D(r, k);
initialized = true;
void setResult(double result) {
this.result = result;
public OutlierWindowStatus init() {
return new OutlierWindowStatus();
public OutlierWindowStatus add(OutlierWindowStatus ows, BigDecimal x, int k, BigDecimal r1) {
if (!ows.initialized) {
ows.initialize(k, r1.doubleValue());
ows.setResult(evalScoreX(ows.xRing, ows.sdar1, x.doubleValue(), k));
return ows;
public OutlierWindowStatus merge(OutlierWindowStatus ows1, OutlierWindowStatus ows2) {
return ows1;
public OutlierWindowStatus remove(OutlierWindowStatus ows, double x) {
public double result(OutlierWindowStatus ows) {
return ows.result;
还要注意的是,由于 Calcite 会把窗口聚合函数输入的非整数作为 BigDecimal 类型来处理,这里也应该把相应的参数定义为 BigDecimal 类型。
最后,使用 maven 的 shade 插件进行打包,并排除其中的 SF、DSA、RSA 等签名文件,就得到了可以放入 Kylin 的 lib 目录的 jar 文件,在 kylin.properties 文件中加入以下两行并重启 Kylin,就可以使用 UDF 了:
kylin.query.udf.outlier_window=org.apache.kylin.query.udf.OutlierWindowUDFkylin.query.udf.changepoint_window=org.apache.kylin.query.udf.ChangePointWindowUDF
kylin.query.udf.outlier_window=org.apache.kylin.query.udf.OutlierWindowUDF
kylin.query.udf.changepoint_window=org.apache.kylin.query.udf.ChangePointWindowUDF
我们可以将 Hivemall 提供的示例数据导入到 Hive 中,使用 Kylin 建立 Cube 并进行简单的查询验证,以 outlier 函数为例:
如上图,我们使用一条简单的 SQL 语句验证 outlier 函数,返回的结果就是在示例数据上计算得到的离群分值。为了使结果更加直观一些,可以将查询结果导出,从中截取一个 100 行的片段绘图,在图中加入一条表示阈值的虚线:
图中的横轴表示数据的行号,纵轴表示数据的离群分值,取阈值为 0.03,可以看到,函数成功检测出了这 100 条数据中的 3 个比较明显的离群值。
作者介绍:郑嵘,Kyligence 研发工程师,主要负责 Kyligence MDX 研发。
参考文献
[1] Chandola, Varun, Arindam Banerjee, and Vipin Kumar. Anomaly detection: A survey. ACM computing surveys (CSUR) 41.3 (2009): 15.
[2] K. Yamanishi and J. Takeuchi. A Unifying Framework for Detecting Outliers and Change Points from Non-Stationary Time Series Data. KDD 2002.
近年来,随着商业环境的竞争日益激烈,企业对于实时数据服务的需求急剧增加。Kyligence 在服务众多客户的过
数据要素在银行各业务领域和流程中发挥着至关重要的作用,面对激烈的市场竞争和客户需求,银行越来越注重从数据管理中
作为一名消费者,炎热的夏天我们会走进一家便利店,从冰柜中选出一瓶汽水;下午工作有点累了,我们会在公司的自动贩卖
2024 年伊始,Kyligence 联合创始人兼 CEO 韩卿(Luke)分享了对 AI 与数据行业的一些战
房地产行业是我国国民经济中的重要支柱产业之一,在房地产市场供求关系发生重大变化的当下,房企面临多重挑战。Kyl
今年年初,Kyligence 高级副总裁兼合伙人葛双寅(Silas Ge)受邀在阿斯利康“跃行致远三十周年年会
2024 年伊始,Kyligence 联合创始人兼 CEO 韩卿在公司内部的飞书订阅号发表了多篇 Rethin
400 8658 757
工作日:10:00 - 18:00
已有账号? 点此登陆
预约演示,您将获得
完整的产品体验
从数据导入、建模到分析的全流程操作演示。
行业专家解惑
与资深行业专家的交流机会,解答您的个性化问题。
请填写真实信息,我们会在 1-2 个工作日内电话与您联系。
全行业落地场景演示
涵盖金融、零售、餐饮、医药、制造等多个行业,最贴合您的业务需求与场景。
Data + AI 应用落地咨询
与资深技术专家深入交流,助您的企业快速落地 AI 场景应用。
立即预约,您将获得
精准数据计算能力:
接入高精度数值计算大模型服务,为您的企业级AI应用提供强大支持。
个性化业务场景解决方案:
量身定制的计算模型和数据分析服务,切实贴合您的业务需求和应用场景。
Data + AI 落地应用咨询:
与资深专家深入探讨数据和 AI 如何帮助您的企业加速实现应用落地,构建更智能的数据驱动未来。
申请体验,您将获得
体验数据处理性能 2x 加速
同等规模资源、同等量级数据、同一套数据处理逻辑,处理耗时下降一半
专家支持
试用部署、生成数据、性能对比各操作环节在线支持