Apache Hive Metastore
Hive Metastore is an RDBMS-backed service from Apache Hive that acts as a catalog for your data warehouse or data lake. It can store all the metadata about the tables, such as partitions, columns, column types, etc. One can sync the Hudi table metadata to the Hive metastore as well. This unlocks the capability to query Hudi tables not only through Hive but also using interactive query engines such as Presto and Trino. In this document, we will go through different ways to sync the Hudi table to Hive metastore.
Spark Data Source example
Prerequisites: setup hive metastore properly and configure the Spark installation to point to the hive metastore by placing hive-site.xml under $SPARK_HOME/conf
Assume that
- hiveserver2 is running at port 10000
- metastore is running at port 9083
Then start a spark-shell with Hudi spark bundle jar as a dependency (refer to Quickstart example)
We can run the following script to create a sample hudi table and sync it to hive.
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val databaseName = "my_db"
val tableName = "hudi_cow"
val basePath = "/user/hive/warehouse/hudi_cow"
val schema = StructType(Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("orderingField", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
StructField("intToLong", IntegerType,true),
StructField("longToInt", LongType,true)
))
val data0 = Seq(Row("row_1", "2021/01/01",0L,"bob","v_0","toBeDel0",0,1000000L),
Row("row_2", "2021/01/01",0L,"john","v_0","toBeDel0",0,1000000L),
Row("row_3", "2021/01/02",0L,"tom","v_0","toBeDel0",0,1000000L))
var dfFromData0 = spark.createDataFrame(data0,schema)
dfFromData0.write.format("hudi").
options(getQuickstartWriteConfigs).
option("hoodie.table.ordering.fields", "orderingField").
option("hoodie.datasource.write.recordkey.field", "rowId").
option("hoodie.datasource.write.partitionpath.field", "partitionId").
option("hoodie.database.name", databaseName).
option("hoodie.table.name", tableName).
option("hoodie.datasource.write.table.type", "COPY_ON_WRITE").
option("hoodie.datasource.write.operation", "upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.meta.sync.enable", "true").
option("hoodie.datasource.hive_sync.mode", "hms").
option("hoodie.datasource.hive_sync.metastore.uris", "thrift://hive-metastore:9083").
mode(Overwrite).
save(basePath)
If prefer to use JDBC instead of HMS sync mode, omit hoodie.datasource.hive_sync.metastore.uris and configure these instead
hoodie.datasource.hive_sync.mode=jdbc
hoodie.datasource.hive_sync.jdbcurl=<e.g., jdbc:hive2://hiveserver:10000>
hoodie.datasource.hive_sync.username=<username>
hoodie.datasource.hive_sync.password=<password>
Query using HiveQL
beeline -u jdbc:hive2://hiveserver:10000/my_db
--hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat
--hiveconf hive.stats.autogather=false
Beeline version 1.2.1.spark2 by Apache Hive
0: jdbc:hive2://hiveserver:10000> show tables;
+-----------+--+
| tab_name |
+-----------+--+
| hudi_cow |
+-----------+--+
1 row selected (0.531 seconds)
0: jdbc:hive2://hiveserver:10000> select * from hudi_cow limit 1;
+-------------------------------+--------------------------------+------------------------------+----------------------------------+----------------------------------------------------------------------------+-----------------+-------------------+----------------+---------------------+--------------------------+---------------------+---------------------+-----------------------+--+
| hudi_cow._hoodie_commit_time | hudi_cow._hoodie_commit_seqno | hudi_cow._hoodie_record_key | hudi_cow._hoodie_partition_path | hudi_cow._hoodie_file_name | hudi_cow.rowid | hudi_cow.orderingfield | hudi_cow.name | hudi_cow.versionid | hudi_cow.tobedeletedstr | hudi_cow.inttolong | hudi_cow.longtoint | hudi_cow.partitionid |
+-------------------------------+--------------------------------+------------------------------+----------------------------------+----------------------------------------------------------------------------+-----------------+-------------------+----------------+---------------------+--------------------------+---------------------+---------------------+-----------------------+--+
| 20220120090023631 | 20220120090023631_1_2 | row_1 | partitionId=2021/01/01 | 0bf9b822-928f-4a57-950a-6a5450319c83-0_1-24-314_20220120090023631.parquet | row_1 | 0 | bob | v_0 | toBeDel0 | 0 | 1000000 | 2021/01/01 |
+-------------------------------+--------------------------------+------------------------------+----------------------------------+----------------------------------------------------------------------------+-----------------+-------------------+----------------+---------------------+--------------------------+---------------------+---------------------+-----------------------+--+
1 row selected (5.475 seconds)
0: jdbc:hive2://hiveserver:10000>