想做大数据实时分析?且看 Kylin 如何解决

俞霄翔
Kyligence 大数据研发工程师
2019年 4月 23日

4 月 13 日,Apache Kylin Meetup 北京站顺利落幕,吸引了众多对大数据分析技术感兴趣的伙伴们到场参与,现场座无虚席。Kyligence 大数据研发工程师俞霄翔在现场与大家分享了 Kylin Real-time OLAP 功能的设计和实现,利用该功能实现的新浪微博实时热点分析 Demo 掀起了现场的小高潮。

△ 俞霄翔讲解 Kylin Real-time OLAP

      

Kylin Real-time OLAP 功能的推出是以历史数据分析见长的 Kylin 涉足实时数据分析领域的重要里程碑。除了在 Meetup 现场的分享,俞霄翔还写作了《向 Kylin 添加实时 OLAP 能力》一文,文中详细讲解了 Real-time OLAP 的原理与使用步骤,同时列出了使用中的常见问题与答案。下文节选了该篇文章中《什么是 Real-time OLAP for Kylin》章节的内容。您可以点击此处下载完整版的文章。

背景

Apache Kylin 在诞生之初,主要目的在于解决海量数据上进行交互式数据分析的需求,数据源主要来自于数据仓库(Hive),数据大都是历史的而非实时的。流式数据处理是一个大数据开发的新兴领域,它要求数据一旦进入系统即刻可被查询。直到 v2.6,Apache Kylin 的主要能力仍然发挥在历史数据分析的领域,即便是 v1.6 引入的准实时数据摄入,依然有数分钟的延迟,难以满足对流式数据的实时查询需求。

为了紧跟大数据开发的发展步伐,eBay 的 Kylin 开发团队(GitHub ID: allenma,mingmwang,sanjulian,wangshisan 等)基于 Kylin 开发了 Real-time OLAP 的特性,实现了 Kylin 对 Kafka 流式数据的实时查询。此功能在 eBay 内部已经用于生产,并稳定运行超过一年时间,于 2018 年下半年开源,与社区共同改进和完善。

流式数据处理和实时 OLAP

对于很多商业公司,用户消息被分析后可用于商业决策和制定更好的市场计划,若消息更早地进入数据分析平台,那么决策者可以更快地做出响应,从而减少时间和资金的浪费。利用流式数据处理,意味着更快的信息反馈,决策者因此可以进行更加频繁和灵活的计划调整。

企业数据源类型多样,包括服务器、手机等移动设备、物联网设备等,来自不同源头的消息往往通过不同的主题加以区分,汇聚到消息队列(Message Queue/Message Bus)以供数据分析使用。传统的数据分析工具使用批处理工具如 MapReduce 来进行数据分析,其数据延迟较大,通常为数小时到数天。从下图我们可以看出,主要的数据延迟来自于两个过程:从消息队列通过 ETL 流程抽取到数据仓库,和从数据仓库抽取数据进行预计算将结果加载进 OLAP 系统供分析系统消费。由于这两部分都是使用批处理程序进行计算,计算耗时较长,无法满足实时查询需求,于是我们想到要解决问题只能绕过这些过程,通过在数据收集和 OLAP 平台间架起一道桥梁,让数据直接进入 OLAP 平台。

目前已经有一些成熟的实时 OLAP 方案,如 Druid,通过合并实时和历史部分的查询结果提供较低的数据延迟。目前 Kylin 在分析海量历史数据的方面达到了一定的水平,为了向实时 OLAP 领域迈出一步,Kylin 开发者们开发了 Real-time OLAP。

Real-time OLAP 的简介

在新的架构下,数据查询请求根据时间分区列(Timestamp Partition Column)分为两部分,历史数据的查询请求仍将发送给 HBase Region Server,最新时间段的查询请求将发送到实时计算节点,Query Server 需要将两者的结果整合后返回给查询客户端。

与此同时,实时计算节点会不断将本地数据上传到 HDFS,在满足一定条件时会通过 Hadoop 任务来构建完整的 segment,从而完成实时数据向历史数据的累进,并且实现了降低实时计算节点压力的目的。

Real-time OLAP 的概念和角色

为实现 Real-time OLAP, Kylin 引入了一些新的概念,这里跟大家做一个初步的介绍。

1. Streaming Receiver

Streaming Receiver 的角色是 worker,每个 receiver 是一个 Java 进程,受 Coordinator 的管理,它的主要职责包含:

  • 实时摄入数据;
  • 在内存构建 cuboid,定时将保存在内存的 cuboid 数据 flush 到磁盘,形成 Fragment 文件;
  • 定时 checkpoint 和合并 Fragment 文件;
  • 接受对它所负责的 Partition 的查询请求;
  • 当 segment 变为不可变后,上传到 HDFS 或者从本地删除(依据配置);

2. Receiver Cluster
Streaming Receiver 组成的集合称为 Receiver 集群。

3. Streaming Coordinator
Streaming Coordinator 作为 Receiver 集群的 Master 节点,主要职责是管理 Receiver,包括将 Kafka topic 的 partition 分配/解除分配到指定的 Replica Set,暂停或者恢复消费,收集和展示各项统计指标(例如message per second)。当 kylin.server.mode 被设置为 all 或者 stream_coordinator,这个进程就成为一个 Streaming Coordinator。Coordinator 只处理元数据和集群调度,不摄入消息。

4. Coordinator Cluster
多个 Coordinator 可以同时存在,组成一个 Coordinator 集群。在多个 Coordinator中,同一时刻只存在一个 Leader,只有 Leader 才可以响应请求,其余进程作为 standby/backup。

5. Replica Set
Replica Set 是一组 Streaming Receiver,它们动作一致。Replica Set 作为任务分配的最小单位,Replica Set 下的所有 Receiver 做相同的工作(即摄入相同的一组 partition),互为 backup。当集群中存在一些 Receiver 进程无法访问,但能保证每一个 Replica Set 至少存在一个健康的 Receiver,那么集群仍能正常工作并且返回合理的查询结果。

在一个 Replica Set 中,将存在一个 Leader Receiver 来做额外的工作,其余的 Receiver 作为 Follower。

6. Streaming Segment
当 Receiver 摄取一个新的消息,并且这个消息的时间分区列的时间戳不包含于现有的任意 Segment,那么 Receiver 会创建一个新的 Segment,这个 Segment 的初始状态为 Active,并且这个 Segment 的开始时间和结束时间的间隔等于 Segment Window,所有时间分区列的时间戳包含在这个Segment的开始时间和结束时间之间的消息,将由这个 Segment 负责。当时间达到结束时间,这个 Segment 并不会立即关闭,这是为了等待那些延迟的消息,但是等待时间并不是永久的,一旦满足以下条件后之一后,Segment 的状态将转为 IMMUTABLE,一旦处于 IMMUTABLE,延迟的消息默认将被丢弃:

  • 在 Segment Duration 长度的时间段一直未收到属于本 Segment 时间段的消息
  • 等待时间的总和超过了某一个固定的阈值

7. Retention Policy
当 Segment 转变为 IMMUTABLE 后,该 Segment 的本地数据如何处理将由这个配置项决定,这个配置项目前有两个选项:

  • FULL_BUILD 当前 Receiver 进程是所属 Replica Set 的 Leader 时,会上传 Segment 的本地数据文件到 HDFS,并且当上传成功后,将 Segment 状态置为 REMOTE_PERSISTED;若是所属 Replica Set 的 Follower 时,不做处理。
  • PURGE 等待一定时间然后删除本地数据文件若用户只对最近一段时间的数据分析结果感兴趣,可以考虑使用 PURGE 选项

8. Assignment & Assigner
Assignment 是一种 Map 型的数据结构,其 key 是 Replica Set 的 ID,value 是分配给该 Replica Set 的 Partition 列表(用 Java 语言来表示的话是:Map<Integer, List<Partition>>),下图是一个简单的例子。

Assignment 十分重要,它表示了一个 Kafka Topic 的 Partition 分别是由哪些 Replica Set 去负责的,了解它对如何使用 Rebalance 相关的 API 和学习 Streaming 元数据十分重要。

基于当前集群资源和 Topic Partition 数量,如何进行合适的 partition 分配,有不同的策略,这些策略由Assigner负责。Assigner目前有两种具体实现,分别是CubePartitionRoundRobinAssignerDefaultAssigner

9. Checkpoint
Checkpoint 能够让 Receiver 重新启动后,从上次结束的安全点继续消费。Checkpoint 使得 receiver 重启后,数据能够不丢失,同时尽可能减少数据的重复消费。Checkpoint 主要分为两种,一种是 local checkpoint,一种是 remote checkpoint。其中 remote checkpoint 发生在将本地segment 数据文件上传到 HDFS 时,将 offset 信息记录在元数据里;local checkpoint 是由 receiver 定时调度或者由事件触发,会将数据 flush 到本地,并在在本地文件记录 offset。


当 Receiver 启动 Kafka Consumer API 时,会尝试检查 local checkpoint 和 remote checkpoint,寻找最新的 offset 开始消费。

Real-time OLAP的架构

数据流向方面,我们能看到数据的流向是从 Kafka 到 Receiver,再由 Receiver 上传到HDFS,最后由MapReduce 程序合并和重新加工 Segment 进入 HBase。

查询方面,查询请求由 Query Server 发出,根据查询条件中出现的时间分区列,分发请求到 Receiver 和HBase Region Server 两端。

Topic Partition 的分配和 Rebalance、Segment 状态管理和作业提交由 Coordinator 负责。

Real-time OLAP 的特性

  1. 数据一旦进入,将立刻在内存计算 cuboid,即刻可被查询(毫秒级数据延迟);
  2. 自动化的数据状态管理和作业调度;
  3. 根据查询条件,查询结果可包含实时结果和历史结果;
  4. 实时部分使用列式存储和倒排索引加速查询,降低查询延迟;
  5. 一个新的分布式计算和存储集群被引入到 Apache Kylin;
  6. Coordinator 和 Receiver 具有高可用性。

Real-time OLAP 的 Local Segment Cache

Receiver 端的 Segment 数据由两部分组成,MemoryStore 和 Fragment File。在 MemoryStore 内部是用 Map<String[], MeasureAggregator[]>> 存储聚合后的数据,其中 key 是维度值组成的字符串数组,value 是 MeasureAggregator 数组。

随着 Receiver 不断摄入消息,当 MemoryStore 的数据行数达到阈值会触发 flush 操作,将整个MemoryStore flush 到磁盘形成一个 Fragment File。对 Fragment File 的读取使用了 memory-mapped file 来加速读取速度,可以参考源代码

另外,Fragment File 为了优化扫描和过滤性能也使用了多种方式来加速扫描,包括:

  • 压缩
  • 倒排索引
  • 列式存储

最后,Fragment File 可以进行 Merge 来减少数据冗余,提升扫描性能。

Real-time OLAP 的元数据结构

Real-time OLAP 增加了集群管理和 Topic Partition 的分配关系、Replica Set 等新的元数据类型,这些元数据目前由保存在 Zookeeper 中,主要包括:

  1. 当前的 Coordinator Leader 节点
  2. Receiver 节点信息
  3. Replica Set 信息和 Replica Set Leader 节点
  4. Cube 的 Assignment 信息
  5. Cube 下 Segment 的构建状态和上传完整度

...

戳此处获取完整版《向 Kylin 添加实时 OLAP 能力》

申请试用
关注我们