Docker Demo
A Demo using docker containers
Lets use a real world example to see how hudi works end to end. For this purpose, a self contained data infrastructure is brought up in a local docker cluster within your computer.
The steps have been tested on a Mac laptop
Prerequisites
-
Docker Setup : For Mac, Please follow the steps as defined in [https://docs.docker.com/v17.12/docker-for-mac/install/]. For running Spark-SQL queries, please ensure atleast 6 GB and 4 CPUs are allocated to Docker (See Docker -> Preferences -> Advanced). Otherwise, spark-SQL queries could be killed because of memory issues.
-
kcat : A command-line utility to publish/consume from kafka topics. Use
brew install kcat
to install kcat. -
/etc/hosts : The demo references many services running in container by the hostname. Add the following settings to /etc/hosts
127.0.0.1 adhoc-1
127.0.0.1 adhoc-2
127.0.0.1 namenode
127.0.0.1 datanode1
127.0.0.1 hiveserver
127.0.0.1 hivemetastore
127.0.0.1 kafkabroker
127.0.0.1 sparkmaster
127.0.0.1 zookeeper -
Java : Java SE Development Kit 8.
-
Maven : A build automation tool for Java projects.
-
jq : A lightweight and flexible command-line JSON processor. Use
brew install jq
to install jq.
Also, this has not been tested on some environments like Docker on Windows.
Setting up Docker Cluster
Build Hudi
The first step is to build hudi. Note This step builds hudi on default supported scala version - 2.11.
cd <HUDI_WORKSPACE>
mvn package -DskipTests
Bringing up Demo Cluster
The next step is to run the docker compose script and setup configs for bringing up the cluster. This should pull the docker images from docker hub and setup docker cluster.
cd docker
./setup_demo.sh
....
....
....
[+] Running 10/13
⠿ Container zookeeper Removed 8.6s
⠿ Container datanode1 Removed 18.3s
⠿ Container trino-worker-1 Removed 50.7s
⠿ Container spark-worker-1 Removed 16.7s
⠿ Container adhoc-2 Removed 16.9s
⠿ Container graphite Removed 16.9s
⠿ Container kafkabroker Removed 14.1s
⠿ Container adhoc-1 Removed 14.1s
⠿ Container presto-worker-1 Removed 11.9s
⠿ Container presto-coordinator-1 Removed 34.6s
.......
......
[+] Running 17/17
⠿ adhoc-1 Pulled 2.9s
⠿ graphite Pulled 2.8s
⠿ spark-worker-1 Pulled 3.0s
⠿ kafka Pulled 2.9s
⠿ datanode1 Pulled 2.9s
⠿ hivemetastore Pulled 2.9s
⠿ hiveserver Pulled 3.0s
⠿ hive-metastore-postgresql Pulled 2.8s
⠿ presto-coordinator-1 Pulled 2.9s
⠿ namenode Pulled 2.9s
⠿ trino-worker-1 Pulled 2.9s
⠿ sparkmaster Pulled 2.9s
⠿ presto-worker-1 Pulled 2.9s
⠿ zookeeper Pulled 2.8s
⠿ adhoc-2 Pulled 2.9s
⠿ historyserver Pulled 2.9s
⠿ trino-coordinator-1 Pulled 2.9s
[+] Running 17/17
⠿ Container zookeeper Started 41.0s
⠿ Container kafkabroker Started 41.7s
⠿ Container graphite Started 41.5s
⠿ Container hive-metastore-postgresql Running 0.0s
⠿ Container namenode Running 0.0s
⠿ Container hivemetastore Running 0.0s
⠿ Container trino-coordinator-1 Runni... 0.0s
⠿ Container presto-coordinator-1 Star... 42.1s
⠿ Container historyserver Started 41.0s
⠿ Container datanode1 Started 49.9s
⠿ Container hiveserver Running 0.0s
⠿ Container trino-worker-1 Started 42.1s
⠿ Container sparkmaster Started 41.9s
⠿ Container spark-worker-1 Started 50.2s
⠿ Container adhoc-2 Started 38.5s
⠿ Container adhoc-1 Started 38.5s
⠿ Container presto-worker-1 Started 38.4s
Copying spark default config and setting up configs
Copying spark default config and setting up configs
$ docker ps
At this point, the docker cluster will be up and running. The demo cluster brings up the following services
- HDFS Services (NameNode, DataNode)
- Spark Master and Worker
- Hive Services (Metastore, HiveServer2 along with PostgresDB)
- Kafka Broker and a Zookeeper Node (Kafka will be used as upstream source for the demo)
- Containers for Presto setup (Presto coordinator and worker)
- Containers for Trino setup (Trino coordinator and worker)
- Adhoc containers to run Hudi/Hive CLI commands
Demo
Stock Tracker data will be used to showcase different Hudi query types and the effects of Compaction.
Take a look at the directory docker/demo/data
. There are 2 batches of stock data - each at 1 minute granularity.
The first batch contains stocker tracker data for some stock symbols during the first hour of trading window
(9:30 a.m to 10:30 a.m). The second batch contains tracker data for next 30 mins (10:30 - 11 a.m). Hudi will
be used to ingest these batches to a table which will contain the latest stock tracker data at hour level granularity.
The batches are windowed intentionally so that the second batch contains updates to some of the rows in the first batch.
Step 1 : Publish the first batch to Kafka
Upload the first batch to Kafka topic 'stock ticks' cat docker/demo/data/batch_1.json | kcat -b kafkabroker -t stock_ticks -P
To check if the new topic shows up, use
kcat -b kafkabroker -L -J | jq .
{
"originating_broker": {
"id": 1001,
"name": "kafkabroker:9092/1001"
},
"query": {
"topic": "*"
},
"brokers": [
{
"id": 1001,
"name": "kafkabroker:9092"
}
],
"topics": [
{
"topic": "stock_ticks",
"partitions": [
{
"partition": 0,
"leader": 1001,
"replicas": [
{
"id": 1001
}
],
"isrs": [
{
"id": 1001
}
]
}
]
}
]
}
Step 2: Incrementally ingest data from Kafka topic
Hudi comes with a tool named DeltaStreamer. This tool can connect to variety of data sources (including Kafka) to pull changes and apply to Hudi table using upsert/insert primitives. Here, we will use the tool to download json data from kafka topic and ingest to both COW and MOR tables we initialized in the previous step. This tool automatically initializes the tables in the file-system if they do not exist yet.
docker exec -it adhoc-2 /bin/bash
# Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_cow table in HDFS
spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
--table-type COPY_ON_WRITE \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field ts \
--target-base-path /user/hive/warehouse/stock_ticks_cow \
--target-table stock_ticks_cow --props /var/demo/config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
# Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_mor table in HDFS
spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
--table-type MERGE_ON_READ \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field ts \
--target-base-path /user/hive/warehouse/stock_ticks_mor \
--target-table stock_ticks_mor \
--props /var/demo/config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--disable-compaction
# As part of the setup (Look at setup_demo.sh), the configs needed for DeltaStreamer is uploaded to HDFS. The configs
# contain mostly Kafa connectivity settings, the avro-schema to be used for ingesting along with key and partitioning fields.
exit
You can use HDFS web-browser to look at the tables
http://namenode:50070/explorer.html#/user/hive/warehouse/stock_ticks_cow
.
You can explore the new partition folder created in the table along with a "commit" / "deltacommit" file under .hoodie which signals a successful commit.
There will be a similar setup when you browse the MOR table
http://namenode:50070/explorer.html#/user/hive/warehouse/stock_ticks_mor
Step 3: Sync with Hive
At this step, the tables are available in HDFS. We need to sync with Hive to create new Hive tables and add partitions inorder to run Hive queries against those tables.
docker exec -it adhoc-2 /bin/bash
# This command takes in HiveServer URL and COW Hudi table location in HDFS and sync the HDFS state to Hive
/var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh \
--jdbc-url jdbc:hive2://hiveserver:10000 \
--user hive \
--pass hive \
--partitioned-by dt \
--base-path /user/hive/warehouse/stock_ticks_cow \
--database default \
--table stock_ticks_cow
.....
2020-01-25 19:51:28,953 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(129)) - Sync complete for stock_ticks_cow
.....
# Now run hive-sync for the second data-set in HDFS using Merge-On-Read (MOR table type)
/var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh \
--jdbc-url jdbc:hive2://hiveserver:10000 \
--user hive \
--pass hive \
--partitioned-by dt \
--base-path /user/hive/warehouse/stock_ticks_mor \
--database default \
--table stock_ticks_mor
...
2020-01-25 19:51:51,066 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(129)) - Sync complete for stock_ticks_mor_ro
...
2020-01-25 19:51:51,569 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(129)) - Sync complete for stock_ticks_mor_rt
....
exit
After executing the above command, you will notice
- A hive table named
stock_ticks_cow
created which supports Snapshot and Incremental queries on Copy On Write table. - Two new tables
stock_ticks_mor_rt
andstock_ticks_mor_ro
created for the Merge On Read table. The former supports Snapshot and Incremental queries (providing near-real time data) while the later supports ReadOptimized queries.
Step 4 (a): Run Hive Queries
Run a hive query to find the latest timestamp ingested for stock symbol 'GOOG'. You will notice that both snapshot (for both COW and MOR _rt table) and read-optimized queries (for MOR _ro table) give the same value "10:29 a.m" as Hudi create a parquet file for the first batch of data.
docker exec -it adhoc-2 /bin/bash
beeline -u jdbc:hive2://hiveserver:10000 \
--hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \
--hiveconf hive.stats.autogather=false
# List Tables
0: jdbc:hive2://hiveserver:10000> show tables;
+---------------------+--+
| tab_name |
+---------------------+--+
| stock_ticks_cow |
| stock_ticks_mor_ro |
| stock_ticks_mor_rt |
+---------------------+--+
3 rows selected (1.199 seconds)
0: jdbc:hive2://hiveserver:10000>
# Look at partitions that were added
0: jdbc:hive2://hiveserver:10000> show partitions stock_ticks_mor_rt;
+----------------+--+
| partition |
+----------------+--+
| dt=2018-08-31 |
+----------------+--+
1 row selected (0.24 seconds)
# COPY-ON-WRITE Queries:
=========================
0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
+---------+----------------------+--+
| symbol | _c1 |
+---------+----------------------+--+
| GOOG | 2018-08-31 10:29:00 |
+---------+----------------------+--+
Now, run a projection query:
0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924221953 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924221953 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
+----------------------+---------+----------------------+---------+------------+-----------+--+
# Merge-On-Read Queries:
==========================
Lets run similar queries against M-O-R table. Lets look at both
ReadOptimized and Snapshot(realtime data) queries supported by M-O-R table
# Run ReadOptimized Query. Notice that the latest timestamp is 10:29
0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG';
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
+---------+----------------------+--+
| symbol | _c1 |
+---------+----------------------+--+
| GOOG | 2018-08-31 10:29:00 |
+---------+----------------------+--+
1 row selected (6.326 seconds)
# Run Snapshot Query. Notice that the latest timestamp is again 10:29
0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
+---------+----------------------+--+
| symbol | _c1 |
+---------+----------------------+--+
| GOOG | 2018-08-31 10:29:00 |
+---------+----------------------+--+
1 row selected (1.606 seconds)
# Run Read Optimized and Snapshot project queries
0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
+----------------------+---------+----------------------+---------+------------+-----------+--+
0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
+----------------------+---------+----------------------+---------+------------+-----------+--+
exit