Kyligence Copilot - AI 数智助理,以 AI 变革企业经营与管理! 立即了解更多

【开发实践】Kylin v2.5.0 出现 TopN 查询错误问题的解决

史少锋
2018年 11月 29日

作者:史少锋

为了让广大开发者了解大数据开发的实践,Apache Kylin 推出了 “开发实践” 系列文章,邀请来自 Apache Kylin 社区的开发者和维护者,为大家介绍在 Apache Kylin 开发过程中遇到的各种挑战与解决办法,包括架构与设计,bug 的解决,性能调优等多方面内容。

背景

我们知道,Apache Kylin v2.0 版本开始引入 Spark 来做 Cube 的加工,这显著提升了 Cube 生成的速度,但还不是所有步骤都使用 Spark 来完成。终于,在今年 9 月发布的 Kylin v2.5.0 里,所有构建任务都可以使用 Spark 了。

发布后不久,Apache Kylin 团队的开发人员在某次调试代码的时候发现,TopN 的查询在 Spark 引擎上构建后查询的结果比较奇怪,但又毫无头绪,起初这个问题没有引起我的注意,主要有三个原因:


- Top N 预计算是近似的预计算,有一定误差属于正常;

- Spark 的编程模型和 MR 引擎有一定差别,对于 TopN 这种算子的合并,更适合在精度比 Spark 更高的 MR 上计算,因此使用 Spark 构建后误差变大,属于预期行为;

- Kylin 的集成测试中有覆盖各个算子的正确性,会跟 H2 的查询结果进行比对,如果不正确则会报错;Kylin 改进后的 Spark 引擎也通过了集成测试,因此,当时我们主观上认为应该没有问题。

过了一段时间后,在一次讨论中有人提到,Kylin 集成测试里的 SQL 验证测试,有部分查询只覆盖了“left join”的情况,没有覆盖“inner join”的情况;而“inner join”的 Cube 是用 Spark 引擎构建的。那么也就是说,Spark 引擎构建的 Cube,可能没有完全被测试覆盖到,结合之前开发人员提出的疑惑,我意识到这个问题可能不简单。

打开 Kylin 代码,定位到,这里是查询测试的入口:

 org.apache.kylin.query.ITKylinQueryTest.java


确实看到有一些 test 方法,只在"left join" 的时候执行,例如:

为什么这些测试案例要排除 inner join 的情况呢?这里有一些历史原因:


- 在开始的时候,Spark 引擎还不支持一些高级度量的构建,例如 bitmap,percentile 等;主要原因是它们的序列化比较复杂,所以 inner join 的 Cube 不包含这几个度量,因此不能跑某些查询;但这些问题在后续进化中都被社区开发者修复了;现在的 Spark 引擎能支持所有度量。

- "Inner join" 会过滤数据,因此一些不带 “join” 的 SQL 语句,基于 Cube 的结果和基于原始数据的查询结果匹配不上。例如 “select count(*) from fact_table”,如果用 inner join 的 Cube 来回答的话,实际上等同于“select count(*) from fact_table inner join lookup_table on xxx”,会少数据;而 "left join" 则不会有这样的问题。

随着时间变迁,一些过去的限制条件已经不存在了,我们需要打开限制,让测试覆盖到尽可能多的情况。

重现问题


于是我去掉了这里对 join 条件的限制,在 IDE 里执行ITKylinQueryTest#testTopNQuery 这个测试,得到了失败的测试结果:

失败的查询 SQL 如下:

select test_kylin_fact.cal_dt, seller_id
 FROM test_kylin_fact
 inner JOIN edw.test_cal_dt as test_cal_dt
 ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
 inner JOIN test_category_groupings
 ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id
 AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id
 group by
 test_kylin_fact.cal_dt, test_kylin_fact.seller_id order by sum(test_kylin_fact.price) desc limit 20

可以看到这条 SQL 里面写足了 inner join 条件,所以上面的第二种 inner join 不适应的情况也被排除了。略微修改,将 sum(price) 输出出来,Kylin 的输出结果如下:

而 Hive 的如下:

可以看到,误差相当大,考虑到集成测试的数据量比较小,TopN 算子应该不会有如此大误差,所以也排除了是误差率导致的情况,从这条 SQL 是很难判断出具体的问题在哪里,因为它没有过滤条件、时间跨度大,且有不少的后聚合发生。

要想解决问题,我们需要要更准确地定位问题,还需要缩小数据查询的范围。

于是,在上面的 SQL 基础上,增加时间条件,让它缩小到某一天:

 select test_kylin_fact.cal_dt, seller_id, sum(test_kylin_fact.price) as xx
 FROM test_kylin_fact
 inner JOIN edw.test_cal_dt as test_cal_dt
 ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
 inner JOIN test_category_groupings
 ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id
 AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id
 where test_kylin_fact.cal_dt = date'2012-04-01'
 group by
 test_kylin_fact.cal_dt, test_kylin_fact.seller_id order by xx desc limit 20


因为 CAL_DT 是一个独立的维度,seller_id 和 sum(price) 是定义在 TopN 度量里的,因此这条 SQL,会查询只包含 CAL_DT 的 Cuboid,并且只扫描其中一行,没有后聚合发生,从而可以排除后聚合以及 calcite 产生问题的可能性。Kylin 返回的结果如何呢?请看下图:

我们看到,每个的 seller_id 对应的销售值,都成了完全一样的-42.987。

同样的 SQL,在 Hive 里的运行结果如下:

Hive 的结果比较正常的,-42.99是最小的值;seller_id 信息跟 Kylin 对得上的。我又更改了日期条件,得出类似的结果。至此,我们可以确定的是,一定是 Kylin 的某个地方有问题了。

定位问题


这个问题让人比较迷惑的几点是:


1)对于 MapReduce 构建的 Cube,不存在此问题;
2)使用 Spark 引擎构建,在 Kylin v2.5 之前,不存在此问题;
3)Kylin 的 Spark 引擎与 MR 引擎共用核心代码。

所以,什么 bug 是会导致 MapReduce正确,而Spark里计算不正确呢?

首先,从 IDE 里运行 DebugTomcat,在本地调试 SQL 的执行,断点打在反序列化TopN 度量的地方(TopNCounterSerializer# deserialize),Kylin 从 HBase 读取数据后会做反序列化,可以清楚地看到,反序列化出的 TopN 对象,里面的 counter 数组,都是相同的数值。

基于这个我们确定问题出现在查询之前,也就是Cube构建的阶段,后面的逻辑不用 debug 了。

随后,我仔细排查了 Cube 构建的代码,特别是多线程安全相关的代码,因为 Spark 是多线程计算,MR 一般是单线程。对一些可疑的多线程地方做了巩固和完善,随后又进行了测试,但是此问题依旧存在,继续迷惑。

无奈之下,修改代码加入更多日志的输出,看看数据在哪个环节发生了错误;日志加在了 TopNCounterSerializer# serialize 和 TopNCounterSerializer# deserialize 方法上,分别看看进出的地方是否正确。

重新运行集成测试,开启 Spark history server,然后查看 Spark executor 的 System.out 输入;在 Spark cubing by layer 这一步 log 输出如下:

可以看到,序列化进去的时候的 counters,是各自不同且递增的,符合预期。

但是,在 Convert to HFile 步骤的 Spark log 中却是这样的:

可以看出,上一步序列化写入的 [233.49, 572.95],在这一步反序列化出来,却成了 [233.49, 233.49]。所以问题出在了使用 Spark 将 Cube 转成 HFile 的这一步,因为在 Kylin v2.5 之前,这一步都是用 MR 运行的,这解释了为什么之前的版本没有此问题。

那么回到问题本身,难道这些 byte 数组,在写入 Hadoop 和读出 Hadoop 之间被篡改了吗?

于是,加入更多 log,打印出在Spark cubing 过程中写进 ByteBuffer 的 bytes,以及在下一步读出的 bytes,虽然有一点暴力,但为了排查问题也是值得的。

Spark cubing 步骤写入的 bytes:

Convert to hfile 步骤读到的bytes:

我们可以看到,前后读写的 bytes 是完全一样的,说明 Hadoop Sequence 文件是稳定的,没有数据改变(要有就奇怪了)。

把这个的 byte 数组拿到 IDE 里来调试,代码也很简单:

 DoubleDeltaSerializer dds = new DoubleDeltaSerializer(3);
 String value = "698000020000000000039012a5c0800000000000";
 byte[] bytes = Bytes.fromHex(value);
 double[] counters = dds.deserialize(ByteBuffer.wrap(bytes));
 System.out.println(Arrays.toString(counters));

输出结果是:

 [233.49, 572.95]

符合预期!这就奇怪了,同样的 bytes,在本地解析是正确的,在Spark里解析是错误的。可喜的是,我们已经排除了Hadoop输入输出问题的可能性,排除了Spark cubing 步骤出错的可能;问题的范围被缩小到 Convert to HFile in Spark 这一步,更进一步缩小到以下方法中:

org.apache.kylin.measure.topn.DoubleDeltaSerializer#deserialize(java.nio.ByteBuffer)

虽然此类和此方法有自己的单元测试,并且在过去两年多时间里未曾做过修改,但眼前的事实不得不让我们对它产生怀疑。

分析问题

DoubleDeltaSerializer 这个类,它的功能是把一组排序的 double 数值以 delta 的方式序列化,这样比起存储每个原始值来可以更加节省空间(Kylin里有很多类似压缩算法)。乍一看这个类没有什么特殊之处,是较纯粹的 Utility class,静态变量都是 final 的,私有成员变量 “precision” 和 “multiplier” 也是 immutable 的,看起来不可能有问题。它的头部声明和构造器如下:

一个可疑点是它使用了一个 ThreadLocal 对象 “deltasThreadLocal”,缓存了一个double[] 数组,目的是减少重复在堆内请求内存,但是这个 ThreadLocal 只在 serialize()方法中有使用,在 deserialize()方法中没有用过,所以可以排除。

接下来我们看一下 deserialize()方法:

这段方法以一个 ByteBuffer 为输入,然后进行一系列的与或 [SF(F1] 操作,先取到第一个数字的值,然后解析后续的 delta 值,并依次累加,还原出原本的数值,放入到 result 数组中。

现在我们看到表象是所有值都相同(为最小值),说明 current 值在后来的循环中一直没有得到累加,那么这里的 delta 值一直是 0。delta 的值在前面计算中,都会跟一个叫 MASKS 的数组中的值进行与操作,而 MASKS 是 final static 对象,不会发生修改。这段逻辑在本地 Java 调试中是毫无问题的,只有在 Spark 里运行的时候才出了问题,这也让我们甚至曾怀疑 JVM 的版本是否一致,不过这点也很快就排除了,毕竟我们本地开发环境和测试集群用的都是 JDK 8 版本。

后来,我们突然想到了静态代码检查!本地的 IDE 里就有 SonarLint 插件,运行一下看静态代码检查是否能报告出什么疑点没有,Sonar 报告的第一个问题如下:

位置指向了 MASKS 变量声明下面的一行:

这个问题的具体描述是这样的:

Only static class initializers should be used
Non-static initializers are rarely used, and can be confusing for most developers because they only run when new class instances are created. When possible, non-static initializers should be refactored into standard constructors or field initializers.

看到这里估计大家都恍然大悟了: MASKS 变量是静态变量,但是它这里的初始化方法却没有声明为静态的;非静态的初始化代码块,会在调用构造函数的时候被执行,基于 MR 的引擎因为每次的对象都是新出来的,所以没出现问题。

而 Spark 中,这个对象是通过序列化分发到各个 executor 中去的,并不是在 executor 的 JVM 中 new 出来的。了解 Java 序列化的同学会知道,Java 序列化一个对象的时候,其静态变量不会被序列化,因为它不属于某个对象。因此,“MASKS” 变量在被序列化并分发到 Spark executor 中的时候,它的值恢复成了声明的值 new long[64],于是其中每个值都成了 0,导致了后续的与操作的结果都为 0,所以产生了所有数值都一样的错误。

解决问题

到这里问题的 root cause 已经找到,要修复它很简单,只需要把下面的代码块也一起声明称 static:

再运行一次 Spark convert to HFile 的任务,在日志中看到,这次的结果正确了:

SQL 查询结果也恢复了正确,跟 Hive 结果保持了一致:

至此,这个困扰我们多日的查询错误问题得到了解决。

总结

这次问题排查和解决后,我们也总结了一些自己的收获:


1. 测试覆盖很重要,否则出了 bug 不能发现,是件危险的事情;
2. 重视问题反馈,对于问题要有敏感性,而不是想当然地忽视;
3. 静态代码检查能帮助发现很多潜在的问题;有些问题可能不会立刻发生,但等它发生的时候,排查起来成本很高。
4. Spark 和 MapReduce 的差异,提醒我们在写代码和解决问题的时候务必要小心对象的序列化和多线程安全。


为这次问题我创建了 KYLIN-3693 的 JIRA,相关 commit 已经进入到了 Kylin 主分支,并且将在 Kylin v2.5.2 发布。目前集成测试已经覆盖了 TopN 查询的结果,并且我们会进一步重视静态代码检查的报告。

后续我们还会总结一些开发和排错经验,跟社区用户和开发者一起分享和交流,也非常欢迎大家一起参与进来进行写作,有兴趣同学可以直接邮件联系我:

shaofengshi@apache.org

"Apache and Apache Kylin are either registered trademarks or trademarks of The Apache Software Foundation in the US and/or other countries. No endorsement by The Apache Software Foundation is implied by the use of these marks."

添加企微

kyligence
关注我们

kyligence