Docker Demo
一个使用 Docker 容器的 Demo
我们来使用一个真实世界的案例,来看看 Hudi 是如何闭环运转的。 为了这个目的,在你的计算机中的本地 Docker 集群中组建了一个自包含的数据基础设施。
以下步骤已经在一台 Mac 笔记本电脑上测试过了。
前提条件
- Docker 安装 : 对于 Mac ,请依照 [https://docs.docker.com/v17.12/docker-for-mac/install/] 当中定义的步骤。 为了运行 Spark-SQL 查询,请确保至少分配给 Docker 6 GB 和 4 个 CPU 。(参见 Docker -> Preferences -> Advanced)。否则,Spark-SQL 查询可能被因为内存问题而被杀停。
- kafkacat : 一个用于发布/消费 Kafka Topic 的命令行工具集。使用
brew install kafkacat
来安装 kafkacat 。 - /etc/hosts : Demo 通过主机名引用了多个运行在容器中的服务。将下列设置添加到 /etc/hosts :
127.0.0.1 adhoc-1
127.0.0.1 adhoc-2
127.0.0.1 namenode
127.0.0.1 datanode1
127.0.0.1 hiveserver
127.0.0.1 hivemetastore
127.0.0.1 kafkabroker
127.0.0.1 sparkmaster
127.0.0.1 zookeeper
此外,这未在其它一些环境中进行测试,例如 Windows 上的 Docker 。
设置 Docker 集群
构建 Hudi
构建 Hudi 的第一步:
cd <HUDI_WORKSPACE>
mvn package -DskipTests
组建 Demo 集群
下一步是运行 Docker 安装脚本并设置配置项以便组建集群。 这需要从 Docker 镜像库拉取 Docker 镜像,并设置 Docker 集群。
cd docker
./setup_demo.sh
....
....
....
Stopping spark-worker-1 ... done
Stopping hiveserver ... done
Stopping hivemetastore ... done
Stopping historyserver ... done
.......
......
Creating network "hudi_demo" with the default driver
Creating hive-metastore-postgresql ... done
Creating namenode ... done
Creating zookeeper ... done
Creating kafkabroker ... done
Creating hivemetastore ... done
Creating historyserver ... done
Creating hiveserver ... done
Creating datanode1 ... done
Creating presto-coordinator-1 ... done
Creating sparkmaster ... done
Creating presto-worker-1 ... done
Creating adhoc-1 ... done
Creating adhoc-2 ... done
Creating spark-worker-1 ... done
Copying spark default config and setting up configs
Copying spark default config and setting up configs
Copying spark default config and setting up configs
$ docker ps
至此, Docker 集群将会启动并运行。 Demo 集群提供了下列服务:
- HDFS 服务( NameNode, DataNode )
- Spark Master 和 Worker
- Hive 服务( Metastore, HiveServer2 以及 PostgresDB )
- Kafka Broker 和一个 Zookeeper Node ( Kafka 将被用来当做 Demo 的上游数据源 )
- 用来运行 Hudi/Hive CLI 命令的 Adhoc 容器
Demo
Stock Tracker 数据将用来展示不同的 Hudi 视图以及压缩带来的影响。
看一下 docker/demo/data
目录。那里有 2 批股票数据——都是 1 分钟粒度的。
第 1 批数据包含一些股票代码在交易窗口(9:30 a.m 至 10:30 a.m)的第一个小时里的行情数据数据。第 2 批包含接下来 30 分钟(10:30 - 11 a.m)的交易数据。 Hudi 将被用来将两个批次的数据采集到一个数据集中,这个数据集将会包含最新的小时级股票行情数据。
两个批次被有意地按窗口切分,这样在第 2 批数据中包含了一些针对第 1 批数据条目的更新数据。
Step 1 : 将第 1 批数据发布到 Kafka
将第 1 批数据上传到 Kafka 的 Topic “stock ticks” 中 cat docker/demo/data/batch_1.json | kafkacat -b kafkabroker -t stock_ticks -P
为了检查新的 Topic 是否出现,使用
kafkacat -b kafkabroker -L -J | jq .
{
"originating_broker": {
"id": 1001,
"name": "kafkabroker:9092/1001"
},
"query": {
"topic": "*"
},
"brokers": [
{
"id": 1001,
"name": "kafkabroker:9092"
}
],
"topics": [
{
"topic": "stock_ticks",
"partitions": [
{
"partition": 0,
"leader": 1001,
"replicas": [
{
"id": 1001
}
],
"isrs": [
{
"id": 1001
}
]
}
]
}
]
}
Step 2: 从 Kafka Topic 中增量采集数据
Hudi 自带一个名为 DeltaStreamer 的工具。 这个工具能连接多种数据源(包括 Kafka),以便拉取变更,并通过 upsert/insert 操作应用到 Hudi 数据集。此处,我们将使用这个工具从 Kafka Topic 下载 JSON 数据,并采集到前面步骤中初始化的 COW 和 MOR 表中。如果数据集不存在,这个工具将自动初始化数据集到文件系统中。
docker exec -it adhoc-2 /bin/bash
# Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_cow dataset in HDFS
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE --storage-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path /user/hive/warehouse/stock_ticks_cow --target-table stock_ticks_cow --props /var/demo/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
# Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_mor dataset in HDFS
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE --storage-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path /user/hive/warehouse/stock_ticks_mor --target-table stock_ticks_mor --props /var/demo/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider --disable-compaction
# As part of the setup (Look at setup_demo.sh), the configs needed for DeltaStreamer is uploaded to HDFS. The configs
# contain mostly Kafa connectivity settings, the avro-schema to be used for ingesting along with key and partitioning fields.
exit
你可以使用 HDFS 的 Web 浏览器来查看数据集
http://namenode:50070/explorer#/user/hive/warehouse/stock_ticks_cow
.
你可以浏览在数据集中新创建的分区文件夹,同时还有一个在 .hoodie 目录下的 deltacommit 文件。
在 MOR 数据集中也有类似的设置
http://namenode:50070/explorer#/user/hive/warehouse/stock_ticks_mor
Step 3: 与 Hive 同步
到了这一步,数据集在 HDFS 中可用。我们需要与 Hive 同步来创建新 Hive 表并添加分区,以便在那些数据集上执行 Hive 查询。
docker exec -it adhoc-2 /bin/bash
# THis command takes in HIveServer URL and COW Hudi Dataset location in HDFS and sync the HDFS state to Hive
/var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh --jdbc-url jdbc:hive2://hiveserver:10000 --user hive --pass hive --partitioned-by dt --base-path /user/hive/warehouse/stock_ticks_cow --database default --table stock_ticks_cow
.....
2018-09-24 22:22:45,568 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(112)) - Sync complete for stock_ticks_cow
.....
# Now run hive-sync for the second data-set in HDFS using Merge-On-Read (MOR storage)
/var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh --jdbc-url jdbc:hive2://hiveserver:10000 --user hive --pass hive --partitioned-by dt --base-path /user/hive/warehouse/stock_ticks_mor --database default --table stock_ticks_mor
...
2018-09-24 22:23:09,171 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(112)) - Sync complete for stock_ticks_mor
...
2018-09-24 22:23:09,559 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(112)) - Sync complete for stock_ticks_mor_rt
....
exit
执行了以上命令后,你会发现:
- 一个名为
stock_ticks_cow
的 Hive 表被创建,它为写时复制数据集提供了读优化 视图。 - 两个新表
stock_ticks_mor
和stock_ticks_mor_rt
被创建用于读时合并数据集。 前者为 Hudi 数据集提供了读优化视图,而后者为数据集提供了实时视图。
Step 4 (a): 运行 Hive 查询
执行一个 Hive 查询来为股票 GOOG 找到采集到的最新时间戳。你会注意到读优化视图( COW 和 MOR 数据集都是如此)和实时视图(仅对 MOR 数据集)给出了相同的值 “10:29 a.m”,这是因为 Hudi 为每个批次的数据创建了一个 Parquet 文件。
docker exec -it adhoc-2 /bin/bash
beeline -u jdbc:hive2://hiveserver:10000 --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false
# List Tables
0: jdbc:hive2://hiveserver:10000> show tables;
+---------------------+--+
| tab_name |
+---------------------+--+
| stock_ticks_cow |
| stock_ticks_mor |
| stock_ticks_mor_rt |
+---------------------+--+
2 rows selected (0.801 seconds)
0: jdbc:hive2://hiveserver:10000>
# Look at partitions that were added
0: jdbc:hive2://hiveserver:10000> show partitions stock_ticks_mor_rt;
+----------------+--+
| partition |
+----------------+--+
| dt=2018-08-31 |
+----------------+--+
1 row selected (0.24 seconds)
# COPY-ON-WRITE Queries:
=========================
0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
+---------+----------------------+--+
| symbol | _c1 |
+---------+----------------------+--+
| GOOG | 2018-08-31 10:29:00 |
+---------+----------------------+--+
Now, run a projection query:
0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924221953 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924221953 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
+----------------------+---------+----------------------+---------+------------+-----------+--+
# Merge-On-Read Queries:
==========================
Lets run similar queries against M-O-R dataset. Lets look at both
ReadOptimized and Realtime views supported by M-O-R dataset
# Run against ReadOptimized View. Notice that the latest timestamp is 10:29
0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
+---------+----------------------+--+
| symbol | _c1 |
+---------+----------------------+--+
| GOOG | 2018-08-31 10:29:00 |
+---------+----------------------+--+
1 row selected (6.326 seconds)
# Run against Realtime View. Notice that the latest timestamp is again 10:29
0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
+---------+----------------------+--+
| symbol | _c1 |
+---------+----------------------+--+
| GOOG | 2018-08-31 10:29:00 |
+---------+----------------------+--+
1 row selected (1.606 seconds)
# Run projection query against Read Optimized and Realtime tables
0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG';
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
+----------------------+---------+----------------------+---------+------------+-----------+--+
0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
+----------------------+---------+----------------------+---------+------------+-----------+--+
exit
exit
Step 4 (b): 执行 Spark-SQL 查询
Hudi 支持以 Spark 作为类似 Hive 的查询引擎。这是在 Spartk-SQL 中执行与 Hive 相同的查询
docker exec -it adhoc-1 /bin/bash
$SPARK_INSTALL/bin/spark-shell --jars $HUDI_SPARK_BUNDLE --master local[2] --driver-class-path $HADOOP_CONF_DIR --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client --driver-memory 1G --executor-memory 3G --num-executors 1 --packages com.databricks:spark-avro_2.11:4.0.0
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
scala> spark.sql("show tables").show(100, false)
+--------+------------------+-----------+
|database|tableName |isTemporary|
+--------+------------------+-----------+
|default |stock_ticks_cow |false |
|default |stock_ticks_mor |false |
|default |stock_ticks_mor_rt|false |
+--------+------------------+-----------+
# Copy-On-Write Table
## Run max timestamp query against COW table
scala> spark.sql("select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false)
[Stage 0:> (0 + 1) / 1]SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes#StaticLoggerBinder for further details.
+------+-------------------+
|symbol|max(ts) |
+------+-------------------+
|GOOG |2018-08-31 10:29:00|
+------+-------------------+
## Projection Query
scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'").show(100, false)
+-------------------+------+-------------------+------+---------+--------+
|_hoodie_commit_time|symbol|ts |volume|open |close |
+-------------------+------+-------------------+------+---------+--------+
|20180924221953 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |
|20180924221953 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|
+-------------------+------+-------------------+------+---------+--------+
# Merge-On-Read Queries:
==========================
Lets run similar queries against M-O-R dataset. Lets look at both
ReadOptimized and Realtime views supported by M-O-R dataset
# Run against ReadOptimized View. Notice that the latest timestamp is 10:29
scala> spark.sql("select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG'").show(100, false)
+------+-------------------+
|symbol|max(ts) |
+------+-------------------+
|GOOG |2018-08-31 10:29:00|
+------+-------------------+
# Run against Realtime View. Notice that the latest timestamp is again 10:29
scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
+------+-------------------+
|symbol|max(ts) |
+------+-------------------+
|GOOG |2018-08-31 10:29:00|
+------+-------------------+
# Run projection query against Read Optimized and Realtime tables
scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG'").show(100, false)
+-------------------+------+-------------------+------+---------+--------+
|_hoodie_commit_time|symbol|ts |volume|open |close |
+-------------------+------+-------------------+------+---------+--------+
|20180924222155 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |
|20180924222155 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|
+-------------------+------+-------------------+------+---------+--------+
scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)
+-------------------+------+-------------------+------+---------+--------+
|_hoodie_commit_time|symbol|ts |volume|open |close |
+-------------------+------+-------------------+------+---------+--------+
|20180924222155 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |
|20180924222155 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|
+-------------------+------+-------------------+------+---------+--------+
Step 4 (c): 执行 Presto 查询
这是 Presto 查询,它们与 Hive 和 Spark 的查询类似。目前 Hudi 的实时视图不支持 Presto 。
docker exec -it presto-worker-1 presto --server presto-coordinator-1:8090
presto> show catalogs;
Catalog
-----------
hive
jmx
localfile
system
(4 rows)
Query 20190817_134851_00000_j8rcz, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:04 [0 rows, 0B] [0 rows/s, 0B/s]
presto> use hive.default;
USE
presto:default> show tables;
Table
--------------------
stock_ticks_cow
stock_ticks_mor
stock_ticks_mor_rt
(3 rows)
Query 20190822_181000_00001_segyw, FINISHED, 2 nodes
Splits: 19 total, 19 done (100.00%)
0:05 [3 rows, 99B] [0 rows/s, 18B/s]
# COPY-ON-WRITE Queries:
=========================
presto:default> select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
symbol | _col1
--------+---------------------
GOOG | 2018-08-31 10:29:00
(1 row)
Query 20190822_181011_00002_segyw, FINISHED, 1 node
Splits: 49 total, 49 done (100.00%)
0:12 [197 rows, 613B] [16 rows/s, 50B/s]
presto:default> select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
_hoodie_commit_time | symbol | ts | volume | open | close
---------------------+--------+---------------------+--------+-----------+----------
20190822180221 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02
20190822180221 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085
(2 rows)
Query 20190822_181141_00003_segyw, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:02 [197 rows, 613B] [109 rows/s, 341B/s]
# Merge-On-Read Queries:
==========================
Lets run similar queries against M-O-R dataset.
# Run against ReadOptimized View. Notice that the latest timestamp is 10:29
presto:default> select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
symbol | _col1
--------+---------------------
GOOG | 2018-08-31 10:29:00
(1 row)
Query 20190822_181158_00004_segyw, FINISHED, 1 node
Splits: 49 total, 49 done (100.00%)
0:02 [197 rows, 613B] [110 rows/s, 343B/s]
presto:default> select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG';
_hoodie_commit_time | symbol | ts | volume | open | close
---------------------+--------+---------------------+--------+-----------+----------
20190822180250 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02
20190822180250 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085
(2 rows)
Query 20190822_181256_00006_segyw, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:02 [197 rows, 613B] [92 rows/s, 286B/s]
presto:default> exit