Flink 指南
本指南提供了使用 Flink SQL 操作 Hudi 的文档。阅读本指南,您可以学习如何快速开始使用 Flink 读写 Hudi,同时对配置和任务优化有更深入的了解:
- 快速开始 :通过阅读 快速开始,你可以快速开始使用 Flink sql client 去读写 Hudi
- 配置 :对于 Flink 配置,使用
$FLINK_HOME/conf/flink-conf.yaml
来配置。 对于任意一个作业的配置,通过表参数来设置 - 写功能 :Flink 支持多种写功能用例,例如 离线批量导入,全量接增量,Changelog 模式,Insert 模式 和 离线 Compaction
- 查询功能 :Flink 支持多种查询功能用例,例如 Hive 查询, Presto 查询
- 优化 :针对 Flink 读写 Hudi 的操作,本指南提供了一些优化建议,例如 内存优化 和 写入限流
快速开始
安装
我们推荐使用 Flink Sql Client 来读写 Hudi,因为 Flink sql client 对于 SQL 用户来说更容易上手。
步骤1 下载 Flink jar
我们推荐使用 Flink-1.12.x 来读写 Hudi。 你可以按照 Flink 安装文档 的指导来安装 Flink。 hudi-flink-bundle.jar
使用的是 scala 2.11,所以我们推荐 Flink-1.12.x 配合 scala 2.11 来使用。
步骤2 启动 Flink 集群
在 Hadoop 环境下启动 standalone 的 Flink 集群。 在你启动 Flink 集群前,我们推荐先配置如下参数:
- 在
$FLINK_HOME/conf/flink-conf.yaml
中添加配置:taskmanager.numberOfTaskSlots: 4
- 在
$FLINK_HOME/conf/flink-conf.yaml
中,根据数据量大小和集群大小来添加其他的 Flink 配置 - 在
$FLINK_HOME/conf/workers
中添加4核localhost
来保证我们本地集群中有4个 workers
启动集群:
# HADOOP_HOME 是 Hadoop 的根目录。
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
# 启动 Flink standalone 集群
./bin/start-cluster.sh
步骤3 启动 Flink SQL client
Hudi 将 Flink 板块单独打包为 hudi-flink-bundle.jar
,该 Jar 包需要在启动的时候加载。
你可以在 hudi-source-dir/packaging/hudi-flink-bundle
下手动的打包这个 Jar 包,或者从 Apache Official Repository
中下载。
启动 Flink SQL Client:
# HADOOP_HOME 是 Hadoop 的根目录。
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded -j .../hudi-flink-bundle_2.1?-*.*.*.jar shell
小提示:
- 为了兼容大部分的对象存储,我们推荐使用 Hadoop 2.9 x+的 Hadoop 版本
- flink-parquet 和 flink-avro 格式已经打包在 hudi-flink-bundle.jar 中了
根据下面的不同功能来设置表名,存储路径和操作类型。 Flink SQL Client 是逐行执行 SQL 的。
插入数据
先创建一个 Flink Hudi 表,然后在通过下面的 VALUES
语句往该表中插入数据。
-- 为了更直观的显示结果,推荐把 CLI 的输出结果模式设置为 tableau。
set execution.result-mode=tableau;
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'schema://base-path',
'table.type' = 'MERGE_ON_READ' -- 创建一个 MERGE_ON_READ类型的表,默认是 COPY_ON_ERITE
);
-- 使用 values 语句插入数据
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
查询数据
-- 从 Hudi 表中查询
select * from t1;
该查询语句提供的 是 快照读(Snapshot Querying)。 如果想了解更多关于表类型和查询类型的介绍,可以参考文档 表类型和查询类型
更新数据
数据的更新和插入数据类似:
-- 这条语句会更新 key 为 'id1' 的记录
insert into t1 values
('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
需要注意的是:现在使用的存储类型为 Append
。通常我们都是使用 apennd 模式,除非你是第一次创建这个表。 再次 查询数据 就会显示更新后的结果。
每一次的插入操作都会在时间轴上生成一个带时间戳的新的 commit,在元数据字段 _hoodie_commit_time
和同一 _hoodie_record_key
的 age
字段中查看更新。
流式查询
Hudi Flink 也有能力来查询从指定时间戳开始的流式记录集合。 该功能可以使用Hudi的流式查询,只需要提供一个查询开始的时间戳就可以完成查询。如果我们需要的 是指定时间戳后的所有数据,我们就不需要指定结束时间。
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'oss://vvr-daily/hudi/t1',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true', -- 该参数开启流式查询
'read.streaming.start-commit' = '20210316134557' -- 指定开始的时间戳
'read.streaming.check-interval' = '4' -- 指定检查新的commit的周期,默认是60秒
);
-- 开启流式的查询
select * from t1;
上述的查询会查询出 read.streaming.start-commit
时间戳后的所有数据。该功能的特殊在于可以同时在流和批的 pipeline 上执行。
删除数据
在 流式查询 中使用数据时,Hudi Flink 源还可以接受来自底层数据源的更改日志,然后可以按行级别应用更新和删除。所以,你可以在 Hudi 上同步各种 RDBMS 的近实时快照。
Flink 配置
在使用 Flink 前,你需要在 $FLINK_HOME/conf/flink-conf.yaml
中设置一些全局的 Flink 配置。
并行度
名称 | 默认值 | 类型 | 描述 |
---|---|---|---|
taskmanager.numberOfTaskSlots | 1 | Integer | 单个 TaskManager 可以运行的并行 task 数。我们建议将该值设置为 > 4,实际值需根据数据量进行设置 |
parallelism.default | 1 | Integer | 当用户为指定算子并行度时,会使用这个并行度(默认值是1)。例如 write.bucket_assign.tasks 没有设置,就会使用这个默认值 |
内存
名称 | 默认值 |
---|