Kyligence AI 服务 - 让大模型完成准确、可靠的数值计算和回答! 立即了解更多
AI 数智助理
Kyligence Zen Kyligence Zen
Kyligence Enterprise Kyligence Enterprise
Kyligence Turbo Kyligence Turbo
指标平台解决方案
OLAP 解决方案
行业解决方案
客户总览
金融
零售
制造
医药
其他
云平台
BI
寻求合作
资源
Kyligence Enterprise
Kyligence Zen
培训
Apache Kylin
Byzer
Gluten
博客
关于
市场活动
本月在上海举行 Apache Kylin Innovation Meetup 活动中,eBay 大数据平台团队的马刚老师,为大家分享了 Kylin 的实时流式 OLAP 分析的新功能;特别提醒,该功能目前已经开源,会在未来的版本中发布给社区试用!
演讲完整视频
在以往的交流中,我们发现许多企业的大数据分析场景对数据的实时性要求很高,例如网站流量监测、安全告警、用户推荐等等,传统的批处理模式往往有数小时甚至数天的延迟,不能满足业务需要。eBay 内部也有一些实时 OLAP 的需求,社区其实在 v1.6 版本之后已经有提供近实时(Near real-time,简称 NRT) 解决方案,通过微批次去消费 Kafka 的数据,然后利用 Hadoop 任务加工数据。为什么我们还要继续开发 Real-time Streaming 呢, 主要有三点考量:
第一,分钟级的数据准备时间比较长,因为它需要定时触发,比如说每5分钟构建一个 Cube segment,构建 Cube 的过程,比如说需要5分钟的话,最长就有10分钟准备延迟了。即使通过进一步的改进,准备时间也不大可能低于5分钟。
第二,需要一个 Lambda 的架构, 实时的数据不断流过来,上面的程序写数据出错了,历史的数据需要修改怎么办?所以我们希望实时 Cube 可以去更新。所谓的更新,因为实时流是不可能随时更新的,必须要从另外的数据源去刷 Cube,把原来的错误数据更改回来,这是一个 Lambda 的 Cube 概念。
第三, 更少的Hadoop 任务,以及创建更少的 HBase 表位批次的方式。比如说每5分钟去提交一个请求,去构建一个 Segment,Kylin 里面每一个 Segment 都是一个 HBase Table,这样操作的结果会造成 MR job 比较多,HBase Table 的数量也会增加,对于 Hadoop 集群和 HBase 集群会造成比较大的压力,需要不断的去做merge, 但merge job 也是需要消耗相当大的资源,这些都是当时使用下来发现的一些问题。
eBay 在整体架构中增加了一个 Real-time Streaming 的组件。下图中间这一段就是增加的实时集群,它包括一个 Streaming Coordinator,和若干个 Streaming Receiver。它的主要任务是去消费实时数据源的数据,并且存在我们实时集群里面去;Receiver 会定期的去调用 Build Engine,把这些实时的数据构建到历史数据里。
当查询进来之后,如果击中了一个实时的 Cube 的话,不仅会去查 HBase,还会去查实时集群里面的数据,这样结合两者的结果,可以保证最终数据的实时性,实时数据都能查到。
下图是整个数据流的过程,消息从数据源出来之后,会到我们的内存里面做聚合,内存的数据到达阈值或者是等到一定时间之后,会 flush 到我们实时集群里面磁盘上;再过一段时间之后,我们可以上传磁盘的数据,通过 MapReduce,将 Cuboid 数据构建到 HBase。整体而言,数据存在在以上三个部分。
需要注意的是,这三个部分的数据都是可以查询的,这样就保证了查询的实时性,数据一旦消费进来就可以被查到,就可以做到毫秒级的延迟。
实时集群,包括Query Engine,Coordinator, Receiver,Metadata Store。
Query Engine 会先找负责消费数据源的数据 Receiver,根据查询去拉取数据。Receiver Cluster 是一个集群,所以需要有一个协调者,Streaming Coordinator 去协调哪些 Receiver 来负责消费 Kafka 里面的 Partition,待查询需求指令下达时,知道需要通过 Coordinator 来获取 Cube 的数据是在哪些 Receiver 里面的。另外,Metadata Store,主要是用来存分配方面的信息,哪个 Topic 的 Partition 被哪些 Receiver 承担摄入和查询任务;Metadata Store 还保存有一些高可用(HighAvailability,简称HA)的信息。
我们也修改了查询引擎和构建引擎。如果查询的时候需要查实时数据部分和历史数据部分,构建引擎可以把实时数据构建到 HBase 里面。后面我将详细得介绍整个查询跟构建的过程。
Streaming Cube 请求进来之后,我们通过 Kafka API 获取 Cube 涉及的 Topic,这个 Topic 有多少个 Partition。Steaming Coordinator 会做一些分配工作,根据现有的一些集群的情况做分配,下面哪几个 Receiver 负责消费 Topic、哪几个 partition 的数据。把这个分配好之后,Steaming Receiver 就可以消费这些 Kafka 的数据了,我这里面标的是 Replica Set,Replica Set 是一个 HA 的概念,其实 Replica Set 里面是一个或者多个的 Steaming Receiver。
消费数据之后,实时数据会消费到 Steaming Receiver 那边 ,Receiver 会做一些Cuboid 的构建,另外也可以增加查询常用的 Cuboid,这样利于提高查询性能。过一段时间之后,它会把本地的磁盘上的数据写到 HDFS 上,并通知 Coordinator,等到全部 Replica Set 把 Cube Segment 的所有实时节点数据都被传到 HDFS 后,Coordinator 触发 MapReduce Job 进行一个批量的构建。之后就是 MapReduce 从 HDFS 去拉这些实时数据做构建,做一些合并工作并将 Cuboid 构建到HBase。MapReduce Job 结束时实时数据就被构建到 HBase 的 Segment 里面去之后,Coordinator 会通知实时集群去把实时数据删掉,以上是完整的实时 Cube 的构建过程。
查询的过程比较简单,当 QueryServer 接受新查询后,会请求 Coordinator 查询的Cube 是不是实时 Cube。如果是的话,会看这个查询包括实时数据和历史数据都要,就发 RPC 请求到 HBase,并且同时发查询请求到我们的实时集群,将结果汇总到查询引擎做一个聚合,再返回给用户。
设计细节
1) Segment & storage
实时集群里面,Segment 的时间窗口长度是可以配置的, 在 Cube 设计的时候去配置,默认是一个小时。数据过来的时候,实时流数据都会有时间戳的字段,Receiver会根据时间戳字段来判断它落在哪个 Segment 里,然后就会把数据落到 Segment 的Memory Store里面。进入 Memory Store 的时候,需要做 Cuboid 的聚合。过一段时间之后,如果 Memory Store 达到某个阈值了,会把它放在磁盘上,增加一个 Fragment File。Memory Store 和 Fragment File 的概念其实跟 HBase 其实是蛮像的,HBase 写数据也是开始写的 Memory Store,Memory Store 满足一定条件下会写到 Fragment File 。
开始状态的 Segment 是不断往里面写数据的,但是这个 Segment 什么时候变成Immutable 呢?一个 Immutable Segment,我们现在定义的策略是, 这个 Segment 持续一段时间都没有新的数据进入,就将它标志为 Immutable,然后它就可以传到 HDFS 上面去了,以上就是一个 Segment 状态转化的过程。
在实时节点上 Segment 的 Fragment 文件结构是这样的,最上面是一个 Cube 的名字,接下来是一个 Segment 的名字,是起始时间和结束时间。接下来是一个 Fragment 的名称,每一次增加 Fragment 文件都会生成一个 Fragment ID,这是一个递增的值。
刚才提及的 Fragment 文件结构是一个列式结构,包括两个文件,Fragment 的数据文件,和 Metadata 文件。数据文件可以包含多个 Cuboid 的数据,默认只会构建一个 Base cuboid,如果有配置其它 mandatory cuboid 的话,会根据配置生成多个Cuboid;这里的数据是一个 Cuboid 一个 Cuboid 依次来保存的,每一个 Cuboid 内是以列式存储,相同列的数据存在一起。基本上现在的 OLAP 存储为了性能通常都是列式存储。每一个维度的数据包括这三部分:
Metadata 文件里面存有重要的元数据,例如一些 Offset,包含这个维度的数据是从哪个位置开始是这个数据,数据长度是多少,Index,也就是反向索引的长度是多少等等,方便以后查询的时候比较快的定位到。元数据还包含一些压缩信息,指定了数据文件是用什么样的方法进行压缩的。
反向索引使用 Roaring Bitmap 来保存索引,出于性能方面的考量分两种方式存储。
实时存储方面也做了一些压缩,现在是支持两种压缩方式。
2)高可用(HA)
现在 eBay 的 HA 方式比较简单,通过引入 Replica Set 概念来实现。一个 Replica Set 可以包含多个 Receiver,一个 Replica Set 的所有的 Receiver 是共享 Assignment 数据的,Replica Set 下面的 Receiver 消费相同的数据。一个 Replica Set 中存在一个 Leader 做额外的工作,这个工作,是指把这些实时的数据存到HDFS,Leader 选举是用 Zookeeper 来做的。以上是实时集群如何实现 HA 的,可以防止宕掉了对查询和构建造成影响。
3) Check point
接下来介绍一下是如何处理错误恢复的,Receiver 重启怎么保证数据不丢呢?我们是通过Check Point的方式去实现。现在是每5分钟在本地做一个Check Point,把消费的信息存在一个文件里,包含哪些信息呢?一个是kafka topic消费的offset信息,还有一个是本地磁盘信息,例如最大的Fragments ID是多少;重新启动的时候根据这个去恢复。首先会从这里消费,看一下磁盘状态,ID最大的 Fragments 存在的话,会把这个删掉,因为是没有做出方案的,就继续消费,可以保证它的数据是不丢的。
这个是Local Check Point,有一个问题是本地的,数据都存在本地的磁盘,就跟本地磁盘数据是一样的,是跟本地的Segment数据是存在一起的,一个问题是,当整个机器宕掉之后,如果从另外一个起来,只能够从 Kafka 最开始的地方去消费,这样的话,如果数据量非常多,可能要等到很久才能追上最新的,所以说我们引入了一个 Remote Check Point。
Remote Check Point 把一些消费状态信息存在 HBase 的 Segment 里面,保存历史的 Segment 信息的时候,会把这些消费信息存在 Segment 的元数据里面,构建这个 Segment 的时候,最早是消费到哪个数据的,信息存在那里。
性能
Real-time 的存储性能,之前测下来 36 Million 行数据的话,做一个 Count 查询,大概是耗时 800ms 左右,每秒钟每个 Receiver 可以消费大约 44,000条 Event,每个 Event 包括 11个维度和 1个Metric。
在 eBay 的使用情况
eBay的生产环境部署是20个 Streaming Receiver 的集群,每一台机器是86G内存和16个vCore 。前面的性能测试数据也是在这个规格的 Receiver 上测试的。现在主要的 Use case 为 Site Speed,即分析 eBay APP 上访问 eBay 站点的性能,现在大概是16个维度,50个 Metrics。
下面介绍我们下一步的计划。
此外,我们还会进一步的提升实时节点、实时存储的查询性能。最后我们会把实时集群放在 Kubernetes 上,一些资源的分配、管理工作都让 Kubernetes 完成,因为增加了 Receiver 集群的 Kylin 维护的成本还是比较高的。
Q&A
Q:有三个问题想问一下,求一个精确的UV,Count Distinct这样的途径会不会出问题?因为我一部分要查HBase,一部分要查实时内存,怎么处理的?
A:这是个好问题,现在实时分析的精确去重只能支持到int类型的,这样不需要全局做字典转换。
Q:想问一下,replica set中的这两个receiver是怎么获取数据的?消费同一份还是再复制一份?
A:各管各的。每一个都是有自己Consumer就可以了
Q:这个架构感觉跟 Druid 架构非常相似,你们做的这个结构是不是借鉴他们的?
A:蛮像的,因为大部分这种实时的架构都是类似的。
01 现象 社区小伙伴最近在为 Kylin 4 开发 Soft Affinity + Local Cache
01 背景 随着顺丰末端物流(末端物流主要分为对小哥、柜机、区域等的资源的管理和分批;对路径、排班、改派等信息
Apache Kylin 的今天 目前,Apache Kylin 的最新发布版本是 4.0.1。Apache
Kylin 入选《上海市重点领域(金融类)“十四五”紧缺人才开发目录》 数字经济已成为全球增长新动
在 Kyligence 主办的 Data & Cloud Summit 2021 行业峰会的「数字化转
近日由 Kyligence 主办的 Data & Cloud Summit 2021 行业峰会在上海成
近五年来,Kyligence 服务了金融、制造、零售、互联网等各个行业的龙头企业,我们在服务这些企业的过程中,
2021年1月14日,Kyligence 产品经理陈思捷开启了我们在 2021 年的首场线上分享,为大家介绍了
400 8658 757
工作日:10:00 - 18:00
已有账号? 点此登陆
预约演示,您将获得
完整的产品体验
从数据导入、建模到分析的全流程操作演示。
行业专家解惑
与资深行业专家的交流机会,解答您的个性化问题。
请填写真实信息,我们会在 1-2 个工作日内电话与您联系。
全行业落地场景演示
涵盖金融、零售、餐饮、医药、制造等多个行业,最贴合您的业务需求与场景。
Data + AI 应用落地咨询
与资深技术专家深入交流,助您的企业快速落地 AI 场景应用。
立即预约,您将获得
精准数据计算能力:
接入高精度数值计算大模型服务,为您的企业级AI应用提供强大支持。
个性化业务场景解决方案:
量身定制的计算模型和数据分析服务,切实贴合您的业务需求和应用场景。
Data + AI 落地应用咨询:
与资深专家深入探讨数据和 AI 如何帮助您的企业加速实现应用落地,构建更智能的数据驱动未来。
申请体验,您将获得
体验数据处理性能 2x 加速
同等规模资源、同等量级数据、同一套数据处理逻辑,处理耗时下降一半
专家支持
试用部署、生成数据、性能对比各操作环节在线支持