Migration Guide for this release
- If migrating from an older release, please also check the upgrade instructions for each subsequent release below.
- With 0.9.0, Hudi is adding more table properties to aid in using an existing hudi table with spark-sql.
To smoothly aid this transition these properties added to
hoodie.propertiesfile. Whenever Hudi is launched with newer table version i.e 2 (or moving from pre 0.9.0 to 0.9.0), an upgrade step will be executed automatically. This automatic upgrade step will happen just once per Hudi table as the
hoodie.table.versionwill be updated in property file after upgrade is completed.
- Similarly, a command line tool for Downgrading (command -
downgrade) is added if in case some users want to downgrade Hudi from table version
1or move from Hudi 0.9.0 to pre 0.9.0. This needs to be executed from a 0.9.0
- With this release, we added a new framework to track config properties in code, moving away from string variables that
hold property names and values. This move helps us automate configuration doc generation and much more. While we still
support the older configs string variables, users are encouraged to use the new
ConfigPropertyequivalents, as noted in the deprecation notices. In most cases, it is as simple as calling
.defaultValue()on the corresponding alternative. e.g
RECORDKEY_FIELD_OPT_KEYcan be replaced by
Spark SQL DML and DDL Support
0.9.0 adds experimental support for DDL/DMLs using Spark SQL, taking a huge step towards making Hudi more easily accessible and
operable by all personas (non-engineers, analysts etc). Users can now use
CREATE TABLE....USING HUDI and
CREATE TABLE .. AS SELECT
statements to directly create and manage tables in catalogs like Hive. Users can then use
MERGE INTO and
sql statements to manipulate data. In addition,
INSERT OVERWRITE statement can be used to overwrite existing data in the table or partition
for existing batch ETL pipelines. For more information, checkout our docs here clicking on
Please see RFC-25
for more implementation details.
Query side improvements
Hudi tables are now registered with Hive as spark datasource tables, meaning Spark SQL on these tables now uses the datasource as well, instead of relying on the Hive fallbacks within Spark, which are ill-maintained/cumbersome. This unlocks many optimizations such as the use of Hudi's own FileIndex implementation for optimized caching and the use of the Hudi metadata table, for faster listing of large tables. We have also added support for timetravel query, for spark datasource.
Writer side improvements
Virtual keys support has been added where users can avoid adding meta fields to hudi table and leverage existing fields to populate record keys and partition paths. One needs to disable this config to enable virtual keys.
Bulk Insert operations using row writer enabled now supports pre-combining, sort modes and user defined partitioners and now turned on by default for fast inserts.
Hudi performs automatic cleanup of uncommitted data, which has now been enhanced to be performant over cloud storage, even for extremely large tables. Specifically, a new marker mechanism has been implemented leveraging the timeline server to perform centrally co-ordinated batched read/write of file markers to underlying storage. You can turn this using this config and learn more about it on this blog.
Async Clustering support has been added to both DeltaStreamer and Spark Structured Streaming Sink. More on this can be found in this blog post. In addition, we have added a new utility class HoodieClusteringJob to assist in building and executing a clustering plan together as a standalone spark job.
Users can choose to drop fields used to generate partition paths, using
hoodie.datasource.write.drop.partition.columns=true, to support
querying of Hudi snapshots using systems like BigQuery, which cannot handle this.
Hudi uses different types of spillable maps, for internally handling merges (compaction, updates or even MOR snapshot queries). In 0.9.0, we have added support for compression for the bitcask style default option and introduced a new spillable map backed by rocksDB, which can be more performant for large bulk updates or working with large base file sizes.
Added a new write operation
delete_partition operation, with support in spark. Users can leverage this to delete older partitions in bulk, in addition to
record level deletes. Deletion of specific partitions can be done using this config.
Support for Huawei Cloud Object Storage, BAIDU AFS storage format, Baidu BOS storage in Hudi.
A pre commit validator framework has been added for spark engine, which can used for DeltaStreamer and Spark Datasource writers. Users can leverage this to add any validations to be executed before committing writes to Hudi. Three validators come out-of-box
- org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator can be used to validate for equality of rows before and after the commit.
- org.apache.hudi.client.validator.SqlQueryInequalityPreCommitValidator can be used to validate for inequality of rows before and after the commit.
- org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator can be used to validate that a query on the table results in a specific value.
These can be configured by setting
hoodie.precommit.validators=<comma separated list of validator class names>. Users can also provide their own implementations by extending the abstract class SparkPreCommitValidator
and overriding this method
void validateRecordsBeforeAndAfter(Dataset<Row> before,
Flink Integration Improvements
The Flink writer now supports propagation of CDC format for MOR table, by turning on the option
changelog.enabled=true. Hudi would then persist all change flags of each record,
using the streaming reader of Flink, user can do stateful computation based on these change logs. Note that when the table is compacted with async compaction service, all the
intermediate changes are merged into one(last record), to only have UPSERT semantics.
Flink writing now also has most feature parity with spark writing, with addition of write operations like
insert_overwrite, support for non-partitioned tables,
automatic cleanup of uncommitted data, global indexing support, hive style partitioning and handling of partition path updates. Writing also supports a new log append mode, where
no records are de-duplicated and base files are directly written for each flush. To use this mode, set
Flink readers now support streaming reads from COW/MOR tables. Deletions are emitted by default in streaming read mode, the downstream receives the DELETE message as a Hoodie record with empty payload.
Hive sync has been greatly improved by support different Hive versions(1.x, 2.x, 3.x). Hive sync can also now be done asynchronously.
Flink Streamer tool now supports transformers.
We have enhanced Deltastreamer utility with 3 new sources.
JDBC source can take a extraction SQL statement and incrementally fetch data out of sources supporting JDBC. This can be useful for e.g when reading data from RDBMS sources. Note that, this approach may need periodic re-bootstrapping to ensure data consistency, although being much simpler to operate over CDC based approaches.
SQLSource takes a Spark SQL statement to fetch data out of existing tables and can be very useful for easy SQL based backfills use-cases e.g: backfilling just one column for the past N months.
S3EventsHoodieIncrSource and S3EventsSource
assist in reading data from S3 reliably and efficiently ingesting that to Hudi. Existing approach using
*DFSSource source classes uses last modification time of files as checkpoint to pull in new files.
But, if large number of files have the same modification time, this might miss some files to be read from the source. These two sources (S3EventsHoodieIncrSource and S3EventsSource) together ensures data
is reliably ingested from S3 into Hudi by leveraging AWS SNS and SQS services that subscribes to file events from the source bucket. This blog post presents a model for
scalable, reliable incremental ingestion by using these two sources in tandem.
In addition to pulling events from kafka using regular offset format, we also added support for timestamp based fetches, that can help with initial backfill/bootstrap scenarios. We have also added support for passing in basic auth credentials in schema registry provider url with schema provider.
Raw Release Notes
The raw release notes are available here