It may be helpful to understand the different write operations of Hudi and how best to leverage them. These operations can be chosen/changed across each commit/deltacommit issued against the table. See the How To docs on Writing Data to see more examples.
This is the default operation where the input records are first tagged as inserts or updates by looking up the index. The records are ultimately written after heuristics are run to determine how best to pack them on storage to optimize for things like file sizing. This operation is recommended for use-cases like database change capture where the input almost certainly contains updates. The target table will never show duplicates.
This operation is very similar to upsert in terms of heuristics/file sizing but completely skips the index lookup step. Thus, it can be a lot faster than upserts for use-cases like log de-duplication (in conjunction with options to filter duplicates mentioned below). This is also suitable for use-cases where the table can tolerate duplicates, but just need the transactional writes/incremental pull/storage management capabilities of Hudi.
Both upsert and insert operations keep input records in memory to speed up storage heuristics computations faster (among other things) and thus can be cumbersome for initial loading/bootstrapping a Hudi table at first. Bulk insert provides the same semantics as insert, while implementing a sort-based data writing algorithm, which can scale very well for several hundred TBs of initial load. However, this just does a best-effort job at sizing files vs guaranteeing file sizes like inserts/upserts do.
Hudi supports implementing two types of deletes on data stored in Hudi tables, by enabling the user to specify a different record payload implementation.
- Soft Deletes : Retain the record key and just null out the values for all the other fields. This can be achieved by ensuring the appropriate fields are nullable in the table schema and simply upserting the table after setting these fields to null.
- Hard Deletes : A stronger form of deletion is to physically remove any trace of the record from the table. This can be achieved in 3 different ways.
- Using DataSource, set
DELETE_OPERATION_OPT_VAL. This will remove all the records in the DataSet being submitted.
- Using DataSource, set
"org.apache.hudi.EmptyHoodieRecordPayload". This will remove all the records in the DataSet being submitted.
- Using DataSource or Hudi Streamer, add a column named
_hoodie_is_deletedto DataSet. The value of this column must be set to
truefor all the records to be deleted and either
falseor left null for any records which are to be upserted.
- Using DataSource, set
Hudi supports migrating your existing large tables into a Hudi table using the
bootstrap operation. There are a couple of ways to approach this. Please refer to
bootstrapping page for more details.
This operation is used to rerwrite the all the partitions that are present in the input. This operation can be faster
upsert for batch ETL jobs, that are recomputing entire target partitions at once (as opposed to incrementally
updating the target tables). This is because, we are able to bypass indexing, precombining and other repartitioning
steps in the upsert write path completely. This comes in handy if you are doing any backfill or any such type of use-cases.
This operation can be used to overwrite the entire table for whatever reason. The Hudi cleaner will eventually clean up the previous table snapshot's file groups asynchronously based on the configured cleaning policy. This operation is much faster than issuing explicit deletes.
In addition to deleting individual records, Hudi supports deleting entire partitions in bulk using this operation.
Deletion of specific partitions can be done using the config
Spark based configs:
|hoodie.datasource.write.operation||upsert (Optional)||Whether to do upsert, insert or bulk_insert for the write operation. Use bulk_insert to load new data into a table, and there on use upsert/insert. bulk insert uses a disk based write path to scale to load large inputs without need to cache it.|
|hoodie.datasource.write.precombine.field||ts (Optional)||Field used in preCombining before actual write. When two records have the same key value, we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)|
|hoodie.combine.before.insert||false (Optional)||When inserted records share same key, controls whether they should be first combined (i.e de-duplicated) before writing to storage.|
|hoodie.datasource.write.insert.drop.duplicates||false (Optional)||If set to true, records from the incoming dataframe will not overwrite existing records with the same key during the write operation. This config is deprecated as of 0.14.0. Please use hoodie.datasource.insert.dup.policy instead.|
|hoodie.bulkinsert.sort.mode||NONE (Optional)||org.apache.hudi.execution.bulkinsert.BulkInsertSortMode: Modes for sorting records during bulk insert. |
|hoodie.bootstrap.base.path||N/A (Required)||Applicable only when operation type is |
|hoodie.bootstrap.mode.selector||org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector (Optional)||Selects the mode in which each file/partition in the bootstrapped dataset gets bootstrapped|
|hoodie.datasource.write.partitions.to.delete||N/A (Required)||Applicable only when operation type is |
Flink based configs:
|write.operation||upsert (Optional)||The write operation, that this write should do|
|precombine.field||ts (Optional)||Field used in preCombining before actual write. When two records have the same key value, we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)|
|write.precombine||false (Optional)||Flag to indicate whether to drop duplicates before insert/upsert. By default these cases will accept duplicates, to gain extra performance: 1) insert operation; 2) upsert for MOR table, the MOR table deduplicate on reading|
|write.bulk_insert.sort_input||true (Optional)||Whether to sort the inputs by specific fields for bulk insert tasks, default true|
|write.bulk_insert.sort_input.by_record_key||false (Optional)||Whether to sort the inputs by record keys for bulk insert tasks, default false|
The following is an inside look on the Hudi write path and the sequence of events that occur during a write.
- First your input records may have duplicate keys within the same batch and duplicates need to be combined or reduced by key.
- Index Lookup
- Next, an index lookup is performed to try and match the input records to identify which file groups they belong to.
- File Sizing
- Then, based on the average size of previous commits, Hudi will make a plan to add enough records to a small file to get it close to the configured maximum limit.
- We now arrive at partitioning where we decide what file groups certain updates and inserts will be placed in or if new file groups will be created
- Write I/O
- Now we actually do the write operations which is either creating a new base file, appending to the log file, or versioning an existing base file.
- Update Index
- Now that the write is performed, we will go back and update the index.
- Finally we commit all of these changes atomically. (A callback notification is exposed)
- Clean (if needed)
- Following the commit, cleaning is invoked if needed.
- If you are using MOR tables, compaction will either run inline, or be scheduled asynchronously
- Lastly, we perform an archival step which moves old timeline items to an archive folder.