Skip to main content

Release 0.14.0

Release 0.14.0 (docs)

Apache Hudi 0.14.0 marks a significant milestone with a range of new functionalities and enhancements. These include the introduction of Record Level Index, automatic generation of record keys, the hudi_table_changes function for incremental reads, and more. Notably, this release also incorporates support for Spark 3.4. On the Flink front, version 0.14.0 brings several exciting features such as consistent hashing index support, Flink 1.17 support, and Update and Delete statement support. Additionally, this release upgrades the Hudi table version, prompting users to consult the Migration Guide provided below. We encourage users to review the release highlights, breaking changes, and behavior changes before adopting the 0.14.0 release.

Migration Guide

In version 0.14.0, we've made changes such as the removal of compaction plans from the ".aux" folder and the introduction of a new log block version. As part of this release, the table version is updated to version 6. When running a Hudi job with version 0.14.0 on a table with an older table version, an automatic upgrade process is triggered to bring the table up to version 6. This upgrade is a one-time occurrence for each Hudi table, as the hoodie.table.version is updated in the property file upon completion of the upgrade. Additionally, a command-line tool for downgrading has been included, allowing users to move from table version 6 to 5, or revert from Hudi 0.14.0 to a version prior to 0.14.0. To use this tool, execute it from a 0.14.0 environment. For more details, refer to the hudi-cli.

caution

If migrating from an older release (pre 0.14.0), please also check the upgrade instructions from each older release in sequence.

Bundle Updates

New Spark Bundles

In this release, we've expanded our support to include bundles for both Spark 3.4 (hudi-spark3.4-bundle_2.12) and Spark 3.0 (hudi-spark3.0-bundle_2.12). Please note that, the support for Spark 3.0 had been discontinued after Hudi version 0.10.1, but due to strong community interest, it has been reinstated in this release.

Breaking Changes

INSERT INTO behavior with Spark SQL

Before version 0.14.0, data ingested through INSERT INTO in Spark SQL followed the upsert flow, where multiple versions of records would be merged into one version. However, starting from 0.14.0, we've altered the default behavior of INSERT INTO to utilize the insert flow internally. This change significantly enhances write performance as it bypasses index lookups.

If a table is created with a preCombine key, the default operation for INSERT INTO remains as upsert. Conversely, if no preCombine key is set, the underlying write operation for INSERT INTO defaults to insert. Users have the flexibility to override this behavior by explicitly setting values for the config hoodie.spark.sql.insert.into.operation as per their requirements. Possible values for this config include insert, bulk_insert, and upsert.

Additionally, in version 0.14.0, we have deprecated two related older configs:

  • hoodie.sql.insert.mode
  • hoodie.sql.bulk.insert.enable.

Behavior changes

Simplified duplicates handling with Inserts in Spark SQL

In cases where the operation type is configured as insert for the Spark SQL INSERT INTO flow, users now have the option to enforce a duplicate policy using the configuration setting hoodie.datasource.insert.dup.policy. This policy determines the action taken when incoming records being ingested already exist in storage. The available values for this configuration are as follows:

  • none: No specific action is taken, allowing duplicates to exist in the Hudi table if the incoming records contain duplicates.
  • drop: Matching records from the incoming writes will be dropped, and the remaining ones will be ingested.
  • fail: The write operation will fail if the same records are re-ingested. In essence, a given record, as determined by the key generation policy, can only be ingested once into the target table.

With this addition, an older related configuration setting, hoodie.datasource.write.insert.drop.duplicates, will be deprecated. The newer configuration will take precedence over the old one when both are specified. If no specific configurations are provided, the default value for the newer configuration will be assumed. Users are strongly encouraged to migrate to the use of these newer configurations when using Spark SQL.

caution

This is only applicable to Spark SQL writing.

Compaction with MOR table

For Spark batch writers (both the Spark datasource and Spark SQL), compaction is automatically enabled by default for MOR (Merge On Read) tables, unless users explicitly override this behavior. Users have the option to disable compaction explicitly by setting hoodie.compact.inline to false. In case users do not override this configuration, compaction may be triggered for MOR tables approximately once every 5 delta commits (the default value for hoodie.compact.inline.max.delta.commits).

HoodieDeltaStreamer renamed to HoodieStreamer

Starting from version 0.14.0, we have renamed HoodieDeltaStreamer to HoodieStreamer. We have ensured backward compatibility so that existing user jobs remain unaffected. However, in upcoming releases, support for Deltastreamer might be discontinued. Hence, we strongly advise users to transition to using HoodieStreamer instead.

MERGE INTO JOIN condition

Starting from version 0.14.0, Hudi has the capability to automatically generate primary record keys when users do not provide explicit specifications. This enhancement enables the MERGE INTO JOIN clause to reference any data column for the join condition in Hudi tables where the primary keys are generated by Hudi itself. However, in cases where users configure the primary record key, the join condition still expects the primary key fields as specified by the user.

Release Highlights

Record Level Index

Hudi version 0.14.0, introduces a new index implementation -
Record Level Index. The Record level Index significantly enhances write performance for large tables by efficiently storing per-record locations and enabling swift retrieval during index lookup operations. It can effectively replace other Global indices like Global_bloom, Global_Simple, or Hbase, commonly used in Hudi.

Bloom and Simple Indexes exhibit slower performance for large datasets due to the high costs associated with gathering index data from various data files during lookup. Moreover, these indexes do not preserve a one-to-one record-key to record file path mapping; instead, they deduce the mapping through an optimized search at lookup time. The per-file overhead required by these indexes makes them less effective for datasets with a larger number of files or records.

On the other hand, the Hbase Index saves a one-to-one mapping for each record key, resulting in fast performance that scales with the dataset size. However, it necessitates a separate HBase cluster for maintenance, which is operationally challenging and resource-intensive, requiring specialized expertise.

The Record Index combines the speed and scalability of the HBase Index without its limitations and overhead. Being a part of the HUDI Metadata Table, any future performance enhancements in writes and queries will automatically translate into improved performance for the Record Index. Adopting the Record Level Index has the potential to boost index lookup performance by 4 to 10 times, depending on the workload, even for extremely large-scale datasets (e.g., 1TB).

With the Record Level Index, significant performance improvements can be observed for large datasets, as latency is directly proportional to the amount of data being ingested. This is in contrast to other Global indices where index lookup time increases linearly with the table size. The Record Level Index is specifically designed to efficiently handle lookups for such large-scale data without a linear increase in lookup times as the table size grows.

To harness the benefits of this lightning-fast index, users need to enable two configurations:

  • hoodie.metadata.record.index.enable must be enabled to write the Record Level Index to the metadata table.
  • hoodie.index.type needs to be set to RECORD_INDEX for the index lookup to utilize the Record Level Index.

Support for Hudi tables with Autogenerated keys

Since the initial official version of Hudi, the primary key was a mandatory field that users needed to configure for any Hudi table. Starting 0.14.0, we are relaxing this constraint. This enhancement addresses a longstanding need within the community, where certain use-cases didn't naturally possess an intrinsic primary key. Version 0.14.0 now offers the flexibility for users to create a Hudi table without the need to explicitly configure a primary key (by omitting the configuration setting - hoodie.datasource.write.recordkey.field). Hudi will automatically generate the primary keys in such cases. This feature is applicable only for new tables and cannot be altered for existing ones.

This functionality is available in all spark writers with certain limitations. For append only type of use cases, Inserts and bulk_inserts are allowed with all four writers - Spark Datasource, Spark SQL, Spark Streaming, Hoodie Streamer. Updates and Deletes are supported only using spark-sql MERGE INTO , UPDATE and DELETE statements. With Spark Datasource, UPDATE and DELETE are supported only when the source dataframe contains Hudi's meta fields. Please check out our quick start guide for code snippets on Hudi table CRUD operations where keys are autogenerated.

Spark 3.4 version support

Spark 3.4 support is added; users who are on Spark 3.4 can use hudi-spark3.4-bundle. Spark 3.2, Spark 3.1, Spark3.0 and Spark 2.4 will continue to be supported. Please check the migration guide for bundle updates. To quickly get started with Hudi and Spark 3.4, you can explore our quick start guide.

Query side improvements:

Metadata table support with Athena

Users now have the ability to utilize Hudi’s Metadata table seamlessly with Athena. The file listing index removes the need for recursive file system calls like "list files" by retrieving information from an index that maintains a mapping of partitions to files. This approach proves to be highly efficient, particularly when dealing with extensive datasets. With Hudi 0.14.0, users can activate file listing based on the metadata table when performing Glue catalog synchronization for their Hudi tables. To enable this functionality, users can configure hoodie.datasource.meta.sync.glue.metadata_file_listing and set it to true during the Glue sync process.

Leverage Parquet bloom filters w/ read queries

In Hudi 0.14.0, users can now utilize the native Parquet bloom filters, provided their compute engine supports Apache Parquet 1.12.0 or higher. This support covers both the writing and reading of datasets. Hudi facilitates the use of native Parquet bloom filters through Hadoop configuration. Users are required to set a Hadoop configuration with a specific key representing the column for which the bloom filter is to be applied. For example, parquet.bloom.filter.enabled#rider=true creates a bloom filter for the rider column. Whenever a query involves a predicate on the rider column, the bloom filter comes into play, enhancing read performance.

Incremental queries with multi-writers

In multi-writer scenarios, there can be instances of gaps in the timeline (requests or inflight instants that are not the latest instant) due to concurrent writing activities. These gaps may result in inconsistent outcomes when performing incremental queries. To address this issue, Hudi 0.14.0 introduces a new configuration setting, hoodie.read.timeline.holes.resolution.policy, specifically designed for handling these inconsistencies in incremental queries. The configuration provides three possible policies:

  • FAIL: This serves as the default policy and throws an exception when such timeline gaps are identified during an incremental query.
  • BLOCK: In this policy, the results of an incremental query are limited to the time range between the holes in the timeline. For instance, if a gap is detected at instant t1 within the incremental query range from t0 to t2, the query will only display results between t0 and t1 without failing.
  • USE_TRANSITION_TIME: This policy is experimental and involves using the state transition time, which is based on the file modification time of commit metadata files in the timeline, during the incremental query.

Timestamp support with Hive 3.x

For quite some time, Hudi users encountered challenges regarding reading Timestamp type columns written by Spark and subsequently attempting to read them with Hive 3.x. While in Hudi 0.13.x, we introduced a workaround to mitigate this issue, version 0.14.0 now ensures full compatibility of HiveAvroSerializer with Hive 3.x to resolve this.

Google BigQuery sync enhancements

With 0.14.0, the BigQuerySyncTool supports syncing table to BigQuery using manifests. This is expected to have better query performance compared to legacy way. Schema evolution is supported with the manifest approach. Partition column no longer needs to be dropped from the files due to new schema handling improvements. To enable this feature, users can set hoodie.gcp.bigquery.sync.use_bq_manifest_file to true.

Spark read side improvements

Snapshot read support for MOR Bootstrap tables

With 0.14.0, MOR snapshot read support is added for Bootstrapped tables. The default behavior has been changed in several ways to match the behavior of non-bootstrapped MOR tables. Snapshot reads will now be the default reading mode. Use hoodie.datasource.query.type=read_optimized for read optimized queries which was previously the default behavior. Hive sync for such tables will result in both _ro and _rt suffixed to the table name to signify read optimized and snapshot reading respectively.

Table-valued function named hudi_table_changes designed for incremental reading through Spark SQL

Hudi offers the functionality to fetch a stream of records changed since a specified commit timestamp through the incremental query type. With the release of Hudi 0.14.0, we've introduced a more straightforward method to access the most recent state or change streams of Hudi datasets. This is achieved using a table-valued function named hudi_table_changes in Spark SQL. Here's the syntax and several examples of how to utilize this function:

SYNTAX
hudi_table_changes(table, queryType, beginTime [, endTime]);
-- table: table identifier, example: db.tableName, tableName, or path for the table, example: hdfs://path/to/hudiTable.
-- queryType: incremental query mode, valid values: latest_state, cdc
(for cdc query, first enable cdc for the table by setting cdc.enabled=true),
-- beginTime: instantTime to begin query from, example: earliest, 202305150000,
-- endTime: optional instantTime to end query at, example: 202305160000,

EXAMPLES
-- incrementally query data by table name
-- start from earliest available commit, end at latest available commit.
SELECT * FROM hudi_table_changes('db.table', 'latest_state', 'earliest');

-- start from earliest, end at 202305160000.
SELECT * FROM hudi_table_changes('table', 'latest_state', 'earliest', '202305160000');

-- incrementally query data by path
-- start from earliest available commit, end at latest available commit.
SELECT * FROM hudi_table_changes('path/to/table', 'cdc', 'earliest');

Checkout the quickstart for more examples.

New MOR file format reader in Spark:

Based on a proposal from RFC-72 aimed at redesigning Hudi-Spark integration, we are introducing an experimental file format reader for MOR (Merge On Read) tables. This reader is expected to significantly reduce read latencies by 20 to 40% when compared to the older file format, particularly for snapshot and bootstrap queries. The goal is to bring the latencies closer to those of the COW (Copy On Write) file format. To utilize this new file format, users need to set hoodie.datasource.read.use.new.parquet.file.format=true. It's important to note that this feature is still experimental and comes with a few limitations. For more details and if you're interested in contributing, please refer to HUDI-6568.

Spark write side improvements

Bulk_Insert and row writer enhancements

The 0.14.0 release provides support for using bulk insert operation while performing SQL operations like INSERT OVERWRITE TABLE and INSERT OVERWRITE PARTITION. To enable bulk insert, set config hoodie.spark.sql.insert.into.operation to value bulk_insert. Bulk insert has better write performance compared to insert operation. Row writer support is also added for Simple bucket index.

Hoodie Streamer enhancements

Dynamic configuration updates

When Hoodie Streamer is run in continuous mode, the properties can be refreshed/updated before each sync calls. Interested users can implement org.apache.hudi.utilities.deltastreamer.ConfigurationHotUpdateStrategy to leverage this.

SQL File based source for HoodieStreamer

A new source - SqlFileBasedSource, has been added to HoodieStreamer designed to facilitate one-time backfill scenarios.

Below are the Flink Engine based enhancements in the 0.14.0 release.

Consistent hashing index support

In comparison to the static hashing index (BUCKET index), the consistent hashing index offers dynamic scalability of data buckets for the writer. To utilize this feature, configure the option index.type as BUCKET and set hoodie.index.bucket.engine to CONSISTENT_HASHING.

When enabling the consistent hashing index, it's important to activate clustering scheduling within the writer. The clustering plan should be executed through an offline Spark job. During this process, the writer will perform dual writes for both the old and new data buckets while the clustering is pending. Although the dual write does not impact correctness, it is strongly recommended to execute clustering as quickly as possible.

Dynamic partition pruning for streaming read

Before 0.14.0, the Flink streaming reader can not prune the datetime partitions correctly when the queries have predicates with constant datetime filtering. Since this release, the Flink streaming queries have been fixed to support any pattern of filtering predicates, including but not limited to the datetime filtering.

Simple bucket index table query speed up (with index fields)

For a simple bucket index table, if the query takes equality filtering predicates on index key fields, Flink engine would optimize the planning to only include the source data files from a very specific data bucket; such queries expect to have nearly hoodie.bucket.index.num.buckets times performance improvement in average.

Flink 1.17 is supported with a new compile maven profile flink1.17, adding profile -Pflink1.17 in the compile cmd of Flink Hudi bundle jar to enable the integration with Flink 1.17.

UPDATE and DELETE statements have been integrated since this release for batch queries. Current only table that defines primary keys can handle the statement correctly.

UPDATE hudi_table SET ... WHERE ...
DELETE FROM hudi_table WHERE ...

EXAMPLES
-- update the specific records with constant age
UPDATE hudi_table SET age=19 WHERE UUID in ('id1', 'id2');
-- delete all the records that with age greater than 23
DELETE FROM hudi_table WHERE age > 23;

Java Enhancements

Lot of write operations have been extended to support Java engine to bring it to parity with other engines. For eg, compaction, clustering, and metadata table support has been added to Java Engine with 0.14.0.

Known Regressions

In Hudi 0.14.0, when querying a table that uses ComplexKeyGenerator or CustomKeyGenerator, partition values are returned as string. Note that there is no type change on the storage i.e. partition fields are written in the user-defined type on storage. However, this is a breaking change for the aforementioned key generators and will be fixed in 0.14.1 - HUDI-6914

Raw Release Notes

The raw release notes are available here.