Apache Sqoop 终止发展,今后数据迁移怎么做?

Author
史少锋
Kyligence 合伙人 & 首席软件架构师,Apache Kylin 核心开发者和项目管理委员会主席 (PMC Chair),专注于大数据分析和云计算技术。曾任eBay全球分析基础架构部大数据高级工程师,IBM云计算软件架构师。
2021年 7月 15日

在今年 6月16日的 Apache 软件基金会董事例行会议上,董事会投票决定:Terminate the Apache Sqoop project(终止 Apache Sqoop 项目)!对大数据生态略有了解的同学,对 Apache Sqoop 项目一定不会陌生;在 Hadoop 的技术架构版图中,Sqoop 和 Kafka、Flume 等项目都属于 “数据导入/集成” 模块,起着为大数据平台引流的重要作用,在传统数据库 RDBMS 跟 Hadoop 之间建立一个桥梁。

如此重要的一个项目,怎么突然就终止了呢?用户们今后该怎么办?今天我们就聊一聊这个话题。

01. Apache Sqoop 介绍

开始之前我们先了解一下 Sqoop。Sqoop 项目出现在2009年,它的名字取自 SQL–to–Hadoop,最初是 Hadoop 内部一个模块,后来成为独立项目;它是 Apache 旗下一款“在 Hadoop 和关系数据库(RDBMS)之间传送数据”的工具。Sqoop 的核心的功能有两个:

  • 导入:将 MySQL,Oracle 等的数据导入到 Hadoop 的 HDFS、Hive、HBase 等系统中;
  • 导出:从 Hadoop 系统中的数据,导出到关系数据库如 MySQL 等。

Sqoop 本质是批量迁移数据,迁移的方式就是把 Sqoop 命令转换成 MapReduce 程序,进而能够分布式并行执行,提高数据导入导出的效率。

Sqoop 的架构比较简单,是客户端 + Hadoop 的架构(如下图所示);当客户端使用 Sqoop 命令的时候,Sqoop 会将任务翻译成一个或多个 Mapper 任务;由 Mapper 向 RDBMS 服务器发出 SQL 查询(通常是通过JDBC协议),随后将获取到的数据写入到 HDFS 、HBase 等存储系统:

Sqoop 有比较丰富的功能,例如可以把整张表导入、指定特定列导入、按 SQL 查询导入、可增量导入等等;Sqoop 出现后获得大量使用。

到了2012年的时候,社区发出了下一代 Sqoop 架构的设计讨论,这就是 Sqoop 2。Sqoop 2 主要解决 Sqoop 的一些安全缺陷,例如客户端需要安装配置 RDBMS connector、需要以 root 身份运行、需要配置连接密码等,这些问题对于重视数据安全的企业来说非常重要。

Sqoop 2 引入了 Sqoop Server 的组件(如下图),并将用户角色区分成 Admin 和 Operator。Admin 在 Server 端配置各个数据库的 connector 以及连接密码等重要信息;Operator 通过 CLI 或 API 触发 Sqoop 任务的执行;这样 Operator 及客户端的信息安全得到大幅度提升。

Sqoop 2 的设想是好的,但它也增加了用户部署和使用的复杂度;同时它跟 Sqoop 1 不兼容,不能无缝升级,一定程度上阻碍了它的普及。到 Sqoop 项目终止的时候,Sqoop 2 仍然难产,2017 年的时候 Cloudera 宣布 Sqoop 2 过时(不再发展),建议用户继续使用 Sqoop 1。这在大数据技术快速发展的黄金时期,是非常少见的事情。

02. Sqoop 项目停止发展之猜想

Apache 董事会宣布终止 Sqoop 项目,并不是心血来潮;回看 Sqoop 项目的发展历史,你会发现 Sqoop 最近一次版本发布是在两年多以前,最近30个月没有任何新的 PMC 和 committer 加入到这个项目,这意味着  Sqoop 项目的活跃程度已经非常低,面临无人维护的尴尬局面。最终由其项目管理委员会(PMC)成员发起讨论,决定终止项目发展,将其归档至 Apache Attic(阁楼)项目中,以告知广大用户相应的风险。

什么使得 Sqoop 项目停止了活跃?做为局外人,对于项目内部(特别是最大的贡献者 Cloudera)到底发生了什么不得而知,但根据我们对大数据技术的演进和发展,总结出有这么几个方面的促成因素,供大家参考:

  • Sqoop 功能趋于稳定和完整,功能比较完备了;前不久农行研发中心还做了一次从 Oracle 到 Hadoop 的数据迁移工具对比,Sqoop 性能出色,拔得头筹([2]);
  • Sqoop 1 存在安全缺陷,而 Sqoop 2 架构复杂,用户左右为难;
  • Sqoop 依赖 MapReduce;在越来越多用户迁移到云上、新的分布式计算框架(如 Spark、Flink、Kubernetes)越来越流行的今天,它已经不符合潮流;虽然社区有一些开发者将 Sqoop 改成可运行在 Spark 上,但并没有被合并进成为官方特性;
  • Hive、Spark、Presto 等大数据组件都增加了对 JDBC 数据源的直接支持,可直接访问 RDBMS 数据,从而导致了用户对 Sqoop 的依赖度降低;
  • 通过 CDC/binlog 的方式相比基于 batch 的 Sqoop 同步来讲实时性更好、对源系统的压力更小;
  • Sqoop 背后的商业公司 Cloudera 近几年业绩不佳,对 Sqoop 这种工具类的项目的支持力度不够。

综合以上因素,我们认为 Sqoop 退出历史舞台是迟早的事情。当然,Sqoop 不会消失,只是用的人会越来越少,这个过程可能持续数年时间。

03. Sqoop 之后大数据迁移怎么做

Apache Sqoop 项目虽然终止了,数据迁移的事情还是每天都要做,作为大数据「砖家」的你,一定想知道今后类似的任务该用什么组件来实现。由于 Sqoop 不支持 CDC/binlog 的方式实时同步数据,故 CDC 的方式我们这里不做展开,毕竟 CDC 的复杂度较高,运维等要求高,通常只有在特定场景下才会需要,感兴趣的同学可以自行搜索。

说到批量数据处理,毫无疑问最近这些年最火热、最成功的非 Apache Spark 项目莫属了。Spark 创立之初号称比 MapReduce 能快100倍,能够完全取代 MR,到今天看它确实做到了。Spark 已经发展到了 3.1 版本,生态完善,能够通过多种语言如 Java、Scala、SQL、Python 进行操作,支持数据处理、ETL,机器学习(MLLib)、Graph 图分析等多种场景。

在数据集成这块,Spark 很早就提供了 Data source API,支持多种数据源,如 Hive、Avro、CSV,也包括 JDBC 数据源:

过去需要先通过 Sqoop 导入数据到 Hadoop、再跑任务进行处理的工作,如今可以直接缩减成:直接读取 JDBC 数据进内存成为 dataframe, 然后使用各类 Spark API 进行处理,如下图所示:

让我们看一个使用 Spark 直接读取 MySQL 数据的例子:

val df = spark.read.format(“jdbc”)
.option(“url”,”jdbc:mysql://localhost/customer”)
.option(“driver”,”com.mysql.jdbc.driver”)
.option(“dbtable”,”customerProfile”)
.option(“user”,”user”)
.option(“password”,”xxxxxx”).load()

类似于 Sqoop, Spark 也允许指定 split 或分区方式,进而可以在多个 executor 的 task 中并行抽取数据;“ParitionColumn” 等价于 Sqoop 中的 “split-by” 选项. “LowerBound” 和 “UpperBound” 指定主键的最小和最大值范围,它们可以跟 “numPartitions” 参数一起使用,从而让 Spark 将抽取任务分成多个 task。“NumPartitions” 定义了最大并发的 JDBC 连接数。

val df = spark.read.format(“jdbc”)
.option(“url”,”jdbc:mysql://localhost/customer”)
.option(“driver”,”com.mysql.jdbc.driver”)
.option(“dbtable”,”customerProfile”)
.option(“user”,”user”).option(“password”,”xxxxxx”)
.option(“lowerBound”, 0)
.option(“upperBound”,10000)
.option(“numPartitions”, 4)
.option(“partitionColumn”, customer_id)
.load()

除了可指定 “dbtable”参数外,你还可以使用一个 “query” 参数,来指定一个进入到 dataframe 的查询集合。

val df = spark.read.format(“jdbc”)
.option(“url”,”jdbc:mysql://localhost/customer”)
.option(“driver”,”com.mysql.jdbc.driver”)
.option(“query”, “select * from customer.CustomerProfile where state = ’WA’”)
.option(“user”,”user”)
.option(“password”,”xxxxxx”)
.load()

一旦 dataframe 对象创建好以后,你可以像正常操作一样,使用各种过滤、转换、聚合算子对其进行处理,也可以将处理好的数据保存到文件系统,例如 Hive 数据仓库,或另一个关系型数据库。

保存到 Hive:

df.write.saveAsTable(‘customer.customerprofile’)

保存到数据库:

df.write.format(“jdbc”)
.option(“url”,”jdbc:mysql://localhost/customer”)
.option(“driver”,”com.mysql.jdbc.driver”)
.option(“dbtable”,”customerProfile”)
.option(“user”,”user”)
.option(“password”,”xxxxxx”).save()

这里可以看到,Spark 具备数据抽取、处理、保存的完整能力,完全可以取代 Sqoop;并且用户可以根据实际情况决定中间数据是否需要落盘,可跟其它数据源(hive、csv、parquet 等)进行关联处理,比 Sqoop 灵活简单,并且大大提升了工作效率。

然而,我们认为 Spark 的方案还有提升空间,大家注意上面的 Spark 代码片段,它融合了 Scala、JDBC 配置、SQL 语句于一体,可读性和可维护性都比较差;使用者除了需要了解多种语言以外,还需要打 jar 包并搭配 Spark shell 完成任务执行,门槛较高;今天我们给大家推荐一款国人开发的开源工具 MLSQL,简单易学,掌握后会比直接使用 Spark 提升许多倍效率。

04. 使用 MLSQL 完成 RDBMS 到 Hive/数据湖的迁移

MLSQL (https://www.mlsql.tech/) 是一门面向大数据和 AI 的语言,是一个真正整合数据管理,商业分析,机器学习的统一平台;它的操作完全使用 SQL 语言,后台使用 Spark 做执行引擎;MLSQL 在具备 Spark 高效分布式处理能力的同时,使得学习和使用门槛大大降低。

让我们看下如何使用 MLSQL 代码来实现 MySQL 到 Hive 数据的迁移:

connect jdbc where
 url="jdbc:mysql://127.0.0.1:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false"
 and driver="com.mysql.jdbc.Driver"
 and user="xxxxx"
 and password="xxxx"
 as db_cool;
 

-- 获取当前的时间 
set date = `select unix_timestamp(current_date(), "yyyy-MM-dd HH:mm:ss") as t ` where type="sql";

-- 加载 JDBC 数据 
load jdbc.`db_cool.script_file`  as script_file;

-- 如果有必要,可以过滤或者清理数据亦或是增量拉取数据,
-- 比如created_at作为增量过滤字段
select * from script_file where created_at>${date}
as new_script_file;

-- 保存数据到hive表
save overwrite new_script_file as hive.`mysql_mlsql_console.script_file`;

上面所有操作,包括 JDBC 配置、数据迁移的查询语句,以及保存到 Hive 的操作,都通过 SQL 来完成;MLSQL 是解释型语言,支持 HTTP 协议,直接把脚本发送给引擎执行就可以了。当然,MLSQL Console 提供了 Web IDE 支持,可以让用户全程在 Web 上调试和使用,拥有诸如脚本管理,代码补全,执行等功能。

除了支持批量的数据迁移以外,MLSQL 还直接支持流式程序,同时也支持 CDC 增量更新,这是 Sqoop 所不具备的。不过目标数据源暂时无法支持 Hive,现阶段只能使用内置的数据湖 Delta Lake。具体示例代码如下:

set streamName="mysql-cdc-example";

-- 加载binlog
load binlog.`` where 
host="127.0.0.1"
and port="3306"
and userName="xxxxx"
and password="xxxxx"
and databaseNamePattern="mlsql_console"
and tableNamePattern="script_file"
as table1;

-- replay binlog到数据湖里
save append table1 as rate.`mysql_{db}.{table}` 
options mode="Append"
and idCols="id"
and duration="5"
and syncType="binlog"
and checkpointLocation="/tmp/cpl-binlog2";

如上,三行 MLSQL 代码就启动了一个流程序,系统会自动消费 binlog,然后 replay 到数据湖 mysql_mlsql_console.script_file 中。

MLSQL 虽然是一门语言,相比传统的语言,他的门槛是足够低的,让分析师、算法、研发、甚至产品经理、运营都可以掌握。通过 MLSQL,大部分信息产业从业者都可以更好地玩转他们的数据。除了数据加载、数据处理之外,MLSQL 还支持 AI,这篇文章就不做展开了。

感兴趣的同学可以扫描下方二维码,欢迎加入 MLSQL 交流群,和大佬们一起讨论问题~

进群积极参与讨论或贡献 MLSQL,更有机会获得 Data & Cloud Summit 价值 1688 元的赠票哦!(*仅限三张,进群了解详情)

*此二维码仅在7月22日前有效,有任何问题请添加 K小助微信(uncertainly5)

想了解更多国内外开源生态在云和数据领域的共识方向吗?快来报名参加 Data & Cloud Summit 吧!除了「开源有道」分论坛,此次大会更有重磅嘉宾云集的一大主论坛,5 大分论坛哦~更多精彩议程请点击下方链接:

Data & Cloud Summit 2021 「云·数据·智能」 大会官方网站


阅读更多精彩文章: