Kyligence AI 服务 - 让大模型完成准确、可靠的数值计算和回答! 立即了解更多
AI 数智助理
Kyligence Zen Kyligence Zen
Kyligence Enterprise Kyligence Enterprise
Kyligence Turbo Kyligence Turbo
指标平台解决方案
OLAP 解决方案
行业解决方案
客户总览
金融
零售
制造
医药
其他
云平台
BI
寻求合作
资源
Kyligence Enterprise
Kyligence Zen
培训
Apache Kylin
Byzer
Gluten
博客
关于
市场活动
Kylin 用户在使用 Spark的过程中,经常会遇到任务提交缓慢、构建节点不稳定的问题。为了更方便地向 Spark 提交、管理和监控任务,有些用户会使用 Livy 作为 Spark 的交互接口。在最新的 Apache Kylin 3.0 版本中,Kylin 加入了通过 Apache Livy 递交 Spark 任务的新功能[KYLIN-3795],特此感谢滴滴靳国卫同学对此功能的贡献。
Apache Livy 是一个基于 Spark 的开源 REST 服务,是 Apache 基金会的一个孵化项目,它能够通过 REST 的方式将代码片段或是序列化的二进制代码提交到 Spark 集群中去执行。它提供了如下基本功能:
1. 当前 Spark 存在的问题
Spark 当前支持两种交互方式:
两种方式都需要用户登录到 Gateway 节点上通过脚本启动 Spark 进程,但是会出现以下问题:
2. Livy 优势
一方面,接受并解析用户的 REST 请求,转换成相应的操作;另一方面,它管理着用户所启动的所有的 Spark 集群。
Livy 具有如下功能:
1. 引入 Livy 之前 Kylin 是如何使用 Spark 的
Spark 是在 Kylin v2.0 引入的,主要应用于 Cube 构建,构建过程介绍可以查看:https://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/
下面是 SparkExecutable 类的 doWork 方法关于提交 Spark job 的一段代码,我们可以看到 Kylin 会从配置中获取 Spark job 包的路径(默认为 $KYLIN_HOME/lib),通过本地指令的形式提交 Spark job,然后循环获取 Spark job 的执行状态和结果。我们可以看到 Kylin 单独开了一个线程在本地向 Spark 客户端发送来 job 请求并且循环获取结果,额外增加了节点系统压力。
@Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { //略... String jobJar = config.getKylinJobJarPath(); //获取job jar的路径 //略... final String cmd = String.format(Locale.ROOT, stringBuilder.toString(), hadoopConf,KylinConfig.getSparkHome(), jars, jobJar, formatArgs()); //构建本地command //略... //创建指令执行线程 Callable callable = new Callable<Pair<Integer, String>>() { @Override public Pair<Integer, String> call() throws Exception { Pair<Integer, String> result; try { result = exec.execute(cmd, patternedLogger); } catch (Exception e) { logger.error("error run spark job:", e); result = new Pair<>(-1, e.getMessage()); } return result; } }; //略... try { Future<Pair<Integer, String>> future = executorService.submit(callable); Pair<Integer, String> result = null; while (!isDiscarded() && !isPaused()) { if (future.isDone()) { result = future.get(); //循环获取指令执行结果 break; } else { Thread.sleep(5000); //每隔5秒检查一次job执行状态 } } //略... } catch (Exception e) { logger.error("Error run spark job:", e); return ExecuteResult.createError(e); } //略... }
2. Livy for Kylin 详细解析
Livy 向 Spark 提交 job 一共有两种,分别是 Session 和 Batch,Kylin 是通过 Batch 的方式提交 job 的,需要提前构建好 Spark job 对应的 jar 包并上传到 HDFS 中,并且将配置项 kylin.engine.livy-conf.livy-key.file=hdfs:///path-to-kylin-job-jar 加入到 kyiln.properties 中。
Batch 一共具有如下九种状态:
public enum LivyStateEnum { starting, running, success, dead, error, not_started, idle, busy, shutting_down; }
下面是 SparkExecutableLivy 类的 doWork 方法和 LivyRestExecutor 类的 execute 方法关于提交 Spark job 的一段代码,Kylin 通过 livyRestBuilder 读取配置文件获取 Spark job 的包路径,然后通过 restClient 向 Livy 发送 Http 请求。在提交 job 之后会每隔 10 秒查询一次 job 执行的结果,直到 job 的状态变为 shutting_down, error, dead, success 中的一种。每一次都是通过 Http 的方式发送请求,相比较于通过本地 Spark 客户端提交任务,更加稳定而且减少了 Kylin 节点系统压力。
@Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { //略... livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.job); executor.execute(livyRestBuilder, patternedLogger); //调用LivyRestExecutor类的execute方法 if (isDiscarded()) { return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded"); } if (isPaused()) { return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped"); } //略... } public void execute(LivyRestBuilder livyRestBuilder, Logger logAppender) { LivyRestClient restClient = new LivyRestClient(); String result = restClient.livySubmitJobBatches(dataJson); //向Livy发送http请求 JSONObject resultJson = new JSONObject(result); String state = resultJson.getString("state"); //得到Livy请求结果 final String livyTaskId = resultJson.getString("id"); while (!LivyStateEnum.shutting_down.toString().equalsIgnoreCase(state) && !LivyStateEnum.error.toString().equalsIgnoreCase(state) && !LivyStateEnum.dead.toString().equalsIgnoreCase(state) && !LivyStateEnum.success.toString().equalsIgnoreCase(state)) { String statusResult = restClient.livyGetJobStatusBatches(livyTaskId); //获取Spark job执行状态 JSONObject stateJson = new JSONObject(statusResult); if (!state.equalsIgnoreCase(stateJson.getString("state"))) { logAppender.log("Livy status Result: " + stateJson.getString("state")); } state = stateJson.getString("state"); Thread.sleep(10*1000); //每10秒检查一次结果 } }
3. Livy 在 Kylin 中的应用
构建 Intermediate Flat Hive Table 和 Redistribute Flat Hive Table 原本都是通过 Hive 客户端(Cli 或 Beeline)进行构建的,引入 Livy 之后,Kylin 通过 Livy 来调用 SparkSQL 进行构建,提高了平表的构建速度。在引入 Livy 之后,Cube 的构建主要改变的是以下几个步骤,对应的任务日志输出如下:
1)构建 Cube
2)转换 Cuboid 为 HFile
4. 引入 Livy 对 Kylin 的好处
5. 如何在 Kylin 中启用 Livy
在 Kylin 启用 Livy 前,请先确保 Livy 能够正常工作
1)在 Kylin.properties 中,加入如下配置,并重启使之生效。
//此处为CDH5.7环境下的配置 kylin.engine.livy-conf.livy-enabled=true kylin.engine.livy-conf.livy-url=http://cdh-client:8998 kylin.engine.livy-conf.livy-key.file=hdfs:///path/kylin-job-3.0.0-SNAPSHOT.jar //请根据个人环境替换对应版本的包 kylin.engine.livy-conf.livy-arr.jars=hdfs:///path/hbase-client-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-common-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop2-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-server-1.2.0-cdh5.7.5.jar,hdfs:///path/htrace-core-3.2.0-incubating.jar,hdfs:///path/metrics-core-2.2.0.jar
其中 livy-key.file 和 livy-arr.jars 地址之间不要有空格,否则可能会出不可预知的错误。
2)Cube 构建引擎选用 Spark。
以下问题往往为使用不当和配置错误的原因,非 Kylin 本身存在的问题,此处仅为友情提示。
1. Table or view not found
输出日志:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: `DEFAULT`.`KYLIN_SALES`; line 21 pos 6;
解决方法:
//将hive-site.xml拷贝到spark的配置文件目录中 ln -s /etc/hive/conf/hive-site.xml $SPARK_CONF_DIR
2. livy request 400 error解决方法:
//kylin.properties Livy配置项jar包地址之间不要留空格 //此处为CDH5.7环境下的依赖包,请根据个人环境替换对应版本的包 kylin.engine.livy-conf.livy-arr.jars=hdfs:///path/hbase-client-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-common-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/hbase-hadoop2-compat-1.2.0-cdh5.7.5.jar,hdfs:///path/
3. NoClassDefFoundError输出日志:
NoClassDefFoundError: org/apache/hadoop/hbase/protobuf/generated/HFileProtos
find /opt -type f -name "hbase-protocol*.jar" cp /path/to/hbase-protocol-1.2.0-cdh5.7.5.jar $SPARK_HOME/jars
4. livy sql 执行错误解决方法:
//kylin.properties中添加如下配置 kylin.source.hive.quote-enabled=false
Livy 本质上是在 Spark 上的 REST 服务,对于 Kylin cube 的构建没有本质上的性能提升,但是通过引入 Livy,Kylin 能够直接通过 Spark SQL 代替 Hive 构建 Flat Table,而且管理 Spark job 也更加方便。但是,Livy 当前也存在一些问题,比如使用较低或较高版本的 Spark 无法正常工作以及单点故障等问题,用户可以考虑自身的实际场景选择是否需要在 Kylin 中使用 Livy。
作者简介:王汝鹏,Kyligence 大数据研发工程师,主要负责 Apache Kylin 社区维护和开发。GitHub:https://github.com/rupengwang。
01 现象 社区小伙伴最近在为 Kylin 4 开发 Soft Affinity + Local Cache
01 背景 随着顺丰末端物流(末端物流主要分为对小哥、柜机、区域等的资源的管理和分批;对路径、排班、改派等信息
Apache Kylin 的今天 目前,Apache Kylin 的最新发布版本是 4.0.1。Apache
Kylin 入选《上海市重点领域(金融类)“十四五”紧缺人才开发目录》 数字经济已成为全球增长新动
在 Kyligence 主办的 Data & Cloud Summit 2021 行业峰会的「数字化转
近日由 Kyligence 主办的 Data & Cloud Summit 2021 行业峰会在上海成
近五年来,Kyligence 服务了金融、制造、零售、互联网等各个行业的龙头企业,我们在服务这些企业的过程中,
2021年1月14日,Kyligence 产品经理陈思捷开启了我们在 2021 年的首场线上分享,为大家介绍了
400 8658 757
工作日:10:00 - 18:00
已有账号? 点此登陆
预约演示,您将获得
完整的产品体验
从数据导入、建模到分析的全流程操作演示。
行业专家解惑
与资深行业专家的交流机会,解答您的个性化问题。
请填写真实信息,我们会在 1-2 个工作日内电话与您联系。
全行业落地场景演示
涵盖金融、零售、餐饮、医药、制造等多个行业,最贴合您的业务需求与场景。
Data + AI 应用落地咨询
与资深技术专家深入交流,助您的企业快速落地 AI 场景应用。
立即预约,您将获得
精准数据计算能力:
接入高精度数值计算大模型服务,为您的企业级AI应用提供强大支持。
个性化业务场景解决方案:
量身定制的计算模型和数据分析服务,切实贴合您的业务需求和应用场景。
Data + AI 落地应用咨询:
与资深专家深入探讨数据和 AI 如何帮助您的企业加速实现应用落地,构建更智能的数据驱动未来。
申请体验,您将获得
体验数据处理性能 2x 加速
同等规模资源、同等量级数据、同一套数据处理逻辑,处理耗时下降一半
专家支持
试用部署、生成数据、性能对比各操作环节在线支持