Skip to main content
Version: 0.12.2

Spark Guide

This guide provides a quick peek at Hudi's capabilities using spark-shell. Using Spark datasources, we will walk through code snippets that allows you to insert and update a Hudi table of default table type: Copy on Write. After each write operation we will also show how to read the data both snapshot and incrementally.

Setup

Hudi works with Spark-2.4.3+ & Spark 3.x versions. You can follow instructions here for setting up Spark.

Spark 3 Support Matrix

HudiSupported Spark 3 version
0.12.x3.3.x (default build), 3.2.x, 3.1.x
0.11.x3.2.x (default build, Spark bundle only), 3.1.x
0.10.x3.1.x (default build), 3.0.x
0.7.0 - 0.9.03.0.x
0.6.0 and priornot supported

The default build Spark version indicates that it is used to build the hudi-spark3-bundle.

note

In 0.12.0, we introduce the experimental support for Spark 3.3.0. In 0.11.0, there are changes on using Spark bundles, please refer to 0.11.0 release notes for detailed instructions.

From the extracted directory run pyspark with Hudi:

# Spark 3.3
export PYSPARK_PYTHON=$(which python3)
pyspark \
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.2 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
# Spark 3.2
export PYSPARK_PYTHON=$(which python3)
pyspark \
--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.2 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
# Spark 3.1
export PYSPARK_PYTHON=$(which python3)
pyspark \
--packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.12.2 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
# Spark 2.4
export PYSPARK_PYTHON=$(which python3)
pyspark \
--packages org.apache.hudi:hudi-spark2.4-bundle_2.11:0.12.2 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
Please note the following
  • For Spark 3.2 and above, the additional spark_catalog config is required: --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
  • We have used hudi-spark-bundle built for scala 2.12 since the spark-avro module used can also depend on 2.12.

Setup table name, base path and a data generator to generate records for this guide.

# pyspark
tableName = "hudi_trips_cow"
basePath = "file:///tmp/hudi_trips_cow"
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
tip

The DataGenerator can generate sample inserts and updates based on the the sample trip schema here

Create Table

# pyspark
# No separate create table command required in spark. First batch of write to a table will create the table if not exists.

Insert data

Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below.
# pyspark
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

hudi_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': tableName,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}

df.write.format("hudi"). \
options(**hudi_options). \
mode("overwrite"). \
save(basePath)
info

mode(Overwrite) overwrites and recreates the table if it already exists. You can check the data generated under /tmp/hudi_trips_cow/<region>/<country>/<city>/. We provided a record key (uuid in schema), partition field (region/country/city) and combine logic (ts in schema) to ensure trip records are unique within each partition. For more info, refer to Modeling data stored in Hudi and for info on ways to ingest data into Hudi, refer to Writing Hudi Tables. Here we are using the default write operation : upsert. If you have a workload without updates, you can also issue insert or bulk_insert operations which could be faster. To know more, refer to Write operations

Checkout https://hudi.apache.org/blog/2021/02/13/hudi-key-generators for various key generator options, like Timestamp based, complex, custom, NonPartitioned Key gen, etc.

tip

With externalized config file, instead of directly passing configuration settings to every Hudi job, you can also centrally set them in a configuration file hudi-default.conf.

Query data

Load the data files into a DataFrame.

# pyspark
tripsSnapshotDF = spark. \
read. \
format("hudi"). \
load(basePath)
# load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery

tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
info

Since 0.9.0 hudi has support a hudi built-in FileIndex: HoodieFileIndex to query hudi table, which supports partition pruning and metatable for query. This will help improve query performance. It also supports non-global query path which means users can query the table by the base path without specifing the "*" in the query path. This feature has enabled by default for the non-global query path. For the global query path, hudi uses the old query path. Refer to Table types and queries for more info on all table types and query types supported.

Time Travel Query

Hudi supports time travel query since 0.9.0. Currently three query time formats are supported as given below.

#pyspark
spark.read. \
format("hudi"). \
option("as.of.instant", "20210728141108"). \
load(basePath)

spark.read. \
format("hudi"). \
option("as.of.instant", "2021-07-28 14:11:08.000"). \
load(basePath)

# It is equal to "as.of.instant = 2021-07-28 00:00:00"
spark.read. \
format("hudi"). \
option("as.of.instant", "2021-07-28"). \
load(basePath)

Update data

This is similar to inserting new data. Generate updates to existing trips using the data generator, load into a DataFrame and write DataFrame into the hudi table.

# pyspark
updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(basePath)
note

Notice that the save mode is now Append. In general, always use append mode unless you are trying to create the table for the first time. Querying the data again will now show updated trips. Each write operation generates a new commit denoted by the timestamp. Look for changes in _hoodie_commit_time, rider, driver fields for the same _hoodie_record_keys in previous commit.

Incremental query

Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. We do not need to specify endTime, if we want all changes after the given commit (as is the common case).

# pyspark
# reload data
spark. \
read. \
format("hudi"). \
load(basePath). \
createOrReplaceTempView("hudi_trips_snapshot")

commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").limit(50).collect()))
beginTime = commits[len(commits) - 2] # commit time we are interested in

# incrementally query data
incremental_read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': beginTime,
}

tripsIncrementalDF = spark.read.format("hudi"). \
options(**incremental_read_options). \
load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
info

This will give all changes that happened after the beginTime commit with the filter of fare > 20.0. The unique thing about this feature is that it now lets you author streaming pipelines on batch data.

Structured Streaming

Hudi supports Spark Structured Streaming reads and writes. Structured Streaming reads are based on Hudi Incremental Query feature, therefore streaming read can return data for which commits and base files were not yet removed by the cleaner. You can control commits retention time.

Streaming Read

# pyspark
# reload data
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
dataGen.generateInserts(10))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

hudi_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': tableName,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}

df.write.format("hudi"). \
options(**hudi_options). \
mode("overwrite"). \
save(basePath)

# read stream to streaming df
df = spark.readStream \
.format("hudi") \
.load(basePath)

# ead stream and output results to console
spark.readStream \
.format("hudi") \
.load(basePath) \
.writeStream \
.format("console") \
.start()

Streaming Write

# pyspark
# prepare to stream write to new table
streamingTableName = "hudi_trips_cow_streaming"
baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"

hudi_streaming_options = {
'hoodie.table.name': streamingTableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': streamingTableName,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}

# create streaming df
df = spark.readStream \
.format("hudi") \
.load(basePath)

# write stream to new hudi table
df.writeStream.format("hudi") \
.options(**hudi_streaming_options) \
.outputMode("append") \
.option("path", baseStreamingPath) \
.option("checkpointLocation", checkpointLocation) \
.trigger(once=True) \
.start()

info

Spark SQL can be used within ForeachBatch sink to do INSERT, UPDATE, DELETE and MERGE INTO. Target table must exist before write.

Table maintenance

Hudi can run async or inline table services while running Structured Streaming query and takes care of cleaning, compaction and clustering. There's no operational overhead for the user.
For CoW tables, table services work in inline mode by default.
For MoR tables, some async services are enabled by default.

note

Since Hudi 0.11 Metadata Table is enabled by default. When using async table services with Metadata Table enabled you must use Optimistic Concurrency Control to avoid the risk of data loss (even in single writer scenario). See Metadata Table deployment considerations for detailed instructions.

If you're using Foreach or ForeachBatch streaming sink you must use inline table services, async table services are not supported.

Hive Sync works with Structured Streaming, it will create table if not exists and synchronize table to metastore aftear each streaming write.

Point in time query

Lets look at how to query data as of a specific time. The specific time can be represented by pointing endTime to a specific commit time and beginTime to "000" (denoting earliest possible commit time).

# pyspark
beginTime = "000" # Represents all commits > this time.
endTime = commits[len(commits) - 2]

# query point in time data
point_in_time_read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.end.instanttime': endTime,
'hoodie.datasource.read.begin.instanttime': beginTime
}

tripsPointInTimeDF = spark.read.format("hudi"). \
options(**point_in_time_read_options). \
load(basePath)

tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

Delete data

Apache Hudi supports two types of deletes:

  1. Soft Deletes: This retains the record key and just nulls out the values for all the other fields. The records with nulls in soft deletes are always persisted in storage and never removed.
  2. Hard Deletes: This physically removes any trace of the record from the table. Check out the deletion section for more details.

Soft Deletes

Soft deletes retain the record key and null out the values for all the other fields. For example, records with nulls in soft deletes are always persisted in storage and never removed.

note

Notice that the save mode is Append.

# pyspark
from pyspark.sql.functions import lit
from functools import reduce

spark.read.format("hudi"). \
load(basePath). \
createOrReplaceTempView("hudi_trips_snapshot")

# fetch total records count
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()

# fetch two records for soft deletes
soft_delete_ds = spark.sql("select * from hudi_trips_snapshot").limit(2)

# prepare the soft deletes by ensuring the appropriate fields are nullified
meta_columns = ["_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", \
"_hoodie_partition_path", "_hoodie_file_name"]
excluded_columns = meta_columns + ["ts", "uuid", "partitionpath"]
nullify_columns = list(filter(lambda field: field[0] not in excluded_columns, \
list(map(lambda field: (field.name, field.dataType), soft_delete_ds.schema.fields))))

hudi_soft_delete_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': tableName,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}

soft_delete_df = reduce(lambda df,col: df.withColumn(col[0], lit(None).cast(col[1])), \
nullify_columns, reduce(lambda df,col: df.drop(col[0]), meta_columns, soft_delete_ds))

# simply upsert the table after setting these fields to null
soft_delete_df.write.format("hudi"). \
options(**hudi_soft_delete_options). \
mode("append"). \
save(basePath)

# reload data
spark.read.format("hudi"). \
load(basePath). \
createOrReplaceTempView("hudi_trips_snapshot")

# This should return the same total count as before
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
# This should return (total - 2) count as two records are updated with nulls
spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()

Hard Deletes

Hard deletes physically remove any trace of the record from the table. For example, this deletes records for the HoodieKeys passed in.

note

Only Append mode is supported for delete operation.

# pyspark
# fetch total records count
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
# fetch two records to be deleted
ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)

# issue deletes
hudi_hard_delete_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': tableName,
'hoodie.datasource.write.operation': 'delete',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}

from pyspark.sql.functions import lit
deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
hard_delete_df = spark.sparkContext.parallelize(deletes).toDF(['uuid', 'partitionpath']).withColumn('ts', lit(0.0))
hard_delete_df.write.format("hudi"). \
options(**hudi_hard_delete_options). \
mode("append"). \
save(basePath)

# run the same read query as above.
roAfterDeleteViewDF = spark. \
read. \
format("hudi"). \
load(basePath)
roAfterDeleteViewDF.createOrReplaceTempView("hudi_trips_snapshot") 

# fetch should return (total - 2) records
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

Insert Overwrite

Generate some new trips, overwrite the all the partitions that are present in the input. This operation can be faster than upsert for batch ETL jobs, that are recomputing entire target partitions at once (as opposed to incrementally updating the target tables). This is because, we are able to bypass indexing, precombining and other repartitioning steps in the upsert write path completely.

# pyspark
spark.read.format("hudi"). \
load(basePath). \
select(["uuid", "partitionpath"]). \
sort(["partitionpath", "uuid"]). \
show(n=100, truncate=False)

inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)). \
filter("partitionpath = 'americas/united_states/san_francisco'")
hudi_insert_overwrite_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': tableName,
'hoodie.datasource.write.operation': 'insert_overwrite',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}
df.write.format("hudi").options(**hudi_insert_overwrite_options).mode("append").save(basePath)
spark.read.format("hudi"). \
load(basePath). \
select(["uuid", "partitionpath"]). \
sort(["partitionpath", "uuid"]). \
show(n=100, truncate=False)

More Spark SQL Commands

Alter Table

Schema evolution can be achieved via ALTER TABLE commands. Below shows some basic examples.

note

For more detailed examples, please prefer to schema evolution

Syntax

-- Alter table name
ALTER TABLE oldTableName RENAME TO newTableName

-- Alter table add columns
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)

-- Alter table column type
ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType

-- Alter table properties
ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')

Examples

--rename to:
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;

--add column:
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);

--change column:
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint;

--set properties;
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');

Partition SQL Command

Syntax

-- Drop Partition
ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] )

-- Show Partitions
SHOW PARTITIONS tableIdentifier

Examples

--show partition:
show partitions hudi_cow_pt_tbl;

--drop partition:
alter table hudi_cow_pt_tbl drop partition (dt='2021-12-09', hh='10');
note

Currently, the result of show partitions is based on the filesystem table path. It's not precise when delete the whole partition data or drop certain partition directly.

Procedures

Syntax

--Call procedure by positional arguments
CALL system.procedure_name(arg_1, arg_2, ... arg_n)

--Call procedure by named arguments
CALL system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1, ... arg_name_n => arg_n)

Examples

--show commit's info
call show_commits(table => 'test_hudi_table', limit => 10);

Call command has already support some commit procedures and table optimization procedures, more details please refer to procedures.

Where to go from here?

You can also do the quickstart by building hudi yourself, and using --jars <path to hudi_code>/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.1?-*.*.*-SNAPSHOT.jar in the spark-shell command above instead of --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.2. Hudi also supports scala 2.12. Refer build with scala 2.12 for more info.

Also, we used Spark here to show case the capabilities of Hudi. However, Hudi can support multiple table types/query types and Hudi tables can be queried from query engines like Hive, Spark, Presto and much more. We have put together a demo video that show cases all of this on a docker based setup with all dependent systems running locally. We recommend you replicate the same setup and run the demo yourself, by following steps here to get a taste for it. Also, if you are looking for ways to migrate your existing data to Hudi, refer to migration guide.