Configurations
This page covers the different ways of configuring your job to write/read Hudi tables. At a high level, you can control behaviour at few levels.
- Spark Datasource Configs : These configs control the Hudi Spark Datasource, providing ability to define keys/partitioning, pick out the write operation, specify how to merge records or choosing query type to read.
- Flink SQL Configs : These configs control the Hudi Flink SQL source/sink connectors, providing ability to define record keys, pick out the write operation, specify how to merge records, enable/disable asynchronous compaction or choosing query type to read.
- WriteClient Configs : Internally, the Hudi datasource uses a RDD based
HoodieWriteClient
api to actually perform writes to storage. These configs provide deep control over lower level aspects like file sizing, compression, parallelism, compaction, write schema, cleaning etc. Although Hudi provides sane defaults, from time-time these configs may need to be tweaked to optimize for specific workloads. - RecordPayload Config : This is the lowest level of customization offered by Hudi. Record payloads define how to produce new values to upsert based on incoming new record and
stored old record. Hudi provides default implementations such as
OverwriteWithLatestAvroPayload
which simply update table with the latest/last-written record. This can be overridden to a custom class extendingHoodieRecordPayload
class, on both datasource and WriteClient levels.
Spark Datasource Configs
Spark jobs using the datasource can be configured by passing the below options into the option(k,v)
method as usual.
The actual datasource level configs are listed below.
Write Options
Additionally, you can pass down any of the WriteClient level configs directly using options()
or option(k,v)
methods.
inputDF.write()
.format("org.apache.hudi")
.options(clientOpts) // any of the Hudi client opts can be passed in as well
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.mode(SaveMode.Append)
.save(basePath);
Options useful for writing tables via write.format.option(...)
TABLE_NAME_OPT_KEY
Property: hoodie.datasource.write.table.name
[Required]
Hive table name, to register the table into.
OPERATION_OPT_KEY
Property: hoodie.datasource.write.operation
, Default: upsert
whether to do upsert, insert or bulkinsert for the write operation. Use bulkinsert
to load new data into a table, and there on use upsert
/insert
.
bulk insert uses a disk based write path to scale to load large inputs without need to cache it.
TABLE_TYPE_OPT_KEY
Property: hoodie.datasource.write.table.type
, Default: COPY_ON_WRITE
The table type for the underlying data, for this write. This can't change between writes.
PRECOMBINE_FIELD_OPT_KEY
Property: hoodie.datasource.write.precombine.field
, Default: ts
Field used in preCombining before actual write. When two records have the same key value,
we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)
PAYLOAD_CLASS_OPT_KEY
Property: hoodie.datasource.write.payload.class
, Default: org.apache.hudi.OverwriteWithLatestAvroPayload
Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.
This will render any value set for PRECOMBINE_FIELD_OPT_VAL
in-effective
RECORDKEY_FIELD_OPT_KEY
Property: hoodie.datasource.write.recordkey.field
, Default: uuid
Record key field. Value to be used as the recordKey
component of HoodieKey
. Actual value
will be obtained by invoking .toString() on the field value. Nested fields can be specified using
the dot notation eg: a.b.c
PARTITIONPATH_FIELD_OPT_KEY
Property: hoodie.datasource.write.partitionpath.field
, Default: partitionpath
Partition path field. Value to be used at the partitionPath
component of HoodieKey
.
Actual value ontained by invoking .toString()
HIVE_STYLE_PARTITIONING_OPT_KEY
Property: hoodie.datasource.write.hive_style_partitioning
, Default: false
When set to true, partition folder names follow the format of Hive partitions: [partition_column_name]=[partition_value]
KEYGENERATOR_CLASS_OPT_KEY
Property: hoodie.datasource.write.keygenerator.class
, Default: org.apache.hudi.keygen.SimpleKeyGenerator
Key generator class, that implements will extract the key out of incoming Row
object
COMMIT_METADATA_KEYPREFIX_OPT_KEY
Property: hoodie.datasource.write.commitmeta.key.prefix
, Default: _
Option keys beginning with this prefix, are automatically added to the commit/deltacommit metadata.
This is useful to store checkpointing information, in a consistent way with the hudi timeline
INSERT_DROP_DUPS_OPT_KEY
Property: hoodie.datasource.write.insert.drop.duplicates
, Default: false
If set to true, filters out all duplicate records from incoming dataframe, during insert operations.
ENABLE_ROW_WRITER_OPT_KEY
Property: hoodie.datasource.write.row.writer.enable
, Default: false
When set to true, will perform write operations directly using the spark native Row
representation. This is expected to be faster by 20 to 30% than regular bulk_insert by setting this config
HIVE_SYNC_ENABLED_OPT_KEY
Property: hoodie.datasource.hive_sync.enable
, Default: false
When set to true, register/sync the table to Apache Hive metastore
HIVE_DATABASE_OPT_KEY
Property: hoodie.datasource.hive_sync.database
, Default: default
database to sync to
HIVE_TABLE_OPT_KEY
Property: hoodie.datasource.hive_sync.table
, [Required]
table to sync to
HIVE_USER_OPT_KEY
Property: hoodie.datasource.hive_sync.username
, Default: hive
hive user name to use
HIVE_PASS_OPT_KEY
Property: hoodie.datasource.hive_sync.password
, Default: hive
hive password to use
HIVE_URL_OPT_KEY
Property: hoodie.datasource.hive_sync.jdbcurl
, Default: jdbc:hive2://localhost:10000
Hive metastore url
HIVE_PARTITION_FIELDS_OPT_KEY
Property: hoodie.datasource.hive_sync.partition_fields
, Default:
field in the table to use for determining hive partition columns.
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY
Property: hoodie.datasource.hive_sync.partition_extractor_class
, Default: org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
Class used to extract partition field values into hive partition columns.
HIVE_ASSUME_DATE_PARTITION_OPT_KEY
Property: hoodie.datasource.hive_sync.assume_date_partitioning
, Default: false
Assume partitioning is yyyy/mm/dd
HIVE_USE_JDBC_OPT_KEY
Property: hoodie.datasource.hive_sync.use_jdbc
, Default: true
Use JDBC when hive synchronization is enabled
HIVE_AUTO_CREATE_DATABASE_OPT_KEY
Property: hoodie.datasource.hive_sync.auto_create_database
Default: true
Auto create hive database if does not exists
HIVE_SKIP_RO_SUFFIX
Property: hoodie.datasource.hive_sync.skip_ro_suffix
Default: false
Skip the _ro
suffix for Read optimized table, when registering
HIVE_SUPPORT_TIMESTAMP
Property: hoodie.datasource.hive_sync.support_timestamp
Default: false
'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type. Disabled by default for backward compatibility.
Read Options
Options useful for reading tables via read.format.option(...)
QUERY_TYPE_OPT_KEY
Property: hoodie.datasource.query.type
, Default: snapshot
Whether data needs to be read, in incremental mode (new data since an instantTime)
(or) Read Optimized mode (obtain latest view, based on columnar data)
(or) Snapshot mode (obtain latest view, based on row & columnar data)
BEGIN_INSTANTTIME_OPT_KEY
Property: hoodie.datasource.read.begin.instanttime
, [Required in incremental mode]
Instant time to start incrementally pulling data from. The instanttime here need not
necessarily correspond to an instant on the timeline. New data written with an
instant_time > BEGIN_INSTANTTIME
are fetched out. For e.g: '20170901080000' will get
all new data written after Sep 1, 2017 08:00AM.
END_INSTANTTIME_OPT_KEY
Property: hoodie.datasource.read.end.instanttime
, Default: latest instant (i.e fetches all new data since begin instant time)
Instant time to limit incrementally fetched data to. New data written with an
instant_time <= END_INSTANTTIME
are fetched out.
INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY
Property: hoodie.datasource.read.schema.use.end.instanttime
, Default: false
Uses end instant schema when incrementally fetched data to. Default: users latest instant schema.
Flink SQL Config Options
Flink jobs using the SQL can be configured through the options in WITH
clause.
The actual datasource level configs are listed below.
Write Options
Option Name | Required | Default | Remarks |
---|---|---|---|
path | Y | N/A | Base path for the target hoodie table. The path would be created if it does not exist, otherwise a hudi table expects to be initialized successfully |
table.type | N | COPY_ON_WRITE | Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ |
write.operation | N | upsert | The write operation, that this write should do (insert or upsert is supported) |
write.precombine.field | N | ts | Field used in preCombining before actual write. When two records have the same key value, we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..) |
write.payload.class | N | OverwriteWithLatestAvroPayload.class | Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. This will render any value set for the option in-effective |
write.insert.drop.duplicates | N | false | Flag to indicate whether to drop duplicates upon insert. By default insert will accept duplicates, to gain extra performance |
write.ignore.failed | N | true | Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch. By default true (in favor of streaming progressing over data integrity) |
hoodie.datasource.write.recordkey.field | N | uuid | Record key field. Value to be used as the recordKey component of HoodieKey . Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using the dot notation eg: a.b.c |
hoodie.datasource.write.keygenerator.class | N | SimpleAvroKeyGenerator.class | Key generator class, that implements will extract the key out of incoming record |
write.tasks | N | 4 | Parallelism of tasks that do actual write, default is 4 |
write.batch.size.MB | N | 128 | Batch buffer size in MB to flush data into the underneath filesystem |
If the table type is MERGE_ON_READ, you can also specify the asynchronous compaction strategy through options:
Option Name | Required | Default | Remarks |
---|---|---|---|
compaction.async.enabled | N | true | Async Compaction, enabled by default for MOR |
compaction.trigger.strategy | N | num_commits | Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits; 'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction; 'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied; 'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied. Default is 'num_commits' |
compaction.delta_commits | N | 5 | Max delta commits needed to trigger compaction, default 5 commits |
compaction.delta_seconds | N | 3600 | Max delta seconds time needed to trigger compaction, default 1 hour |
Read Options
Option Name | Required | Default | Remarks |
---|---|---|---|
path | Y | N/A | Base path for the target hoodie table. The path would be created if it does not exist, otherwise a hudi table expects to be initialized successfully |
table.type | N | COPY_ON_WRITE | Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ |
read.tasks | N | 4 | Parallelism of tasks that do actual read, default is 4 |
read.avro-schema.path | N | N/A | Avro schema file path, the parsed schema is used for deserialization, if not specified, the avro schema is inferred from the table DDL |
read.avro-schema | N | N/A | Avro schema string, the parsed schema is used for deserialization, if not specified, the avro schema is inferred from the table DDL |
hoodie.datasource.query.type | N | snapshot | Decides how data files need to be read, in 1) Snapshot mode (obtain latest view, based on row & columnar data); 2) incremental mode (new data since an instantTime), not supported yet; 3) Read Optimized mode (obtain latest view, based on columnar data). Default: snapshot |
hoodie.datasource.merge.type | N | payload_combine | For Snapshot query on merge on read table. Use this key to define how the payloads are merged, in 1) skip_merge: read the base file records plus the log file records; 2) payload_combine: read the base file records first, for each record in base file, checks whether the key is in the log file records(combines the two records with same key for base and log file records), then read the left log file records |
hoodie.datasource.hive_style_partition | N | false | Whether the partition path is with Hive style, e.g. '{partition key}={partition value}', default false |
read.utc-timezone | N | true | Use UTC timezone or local timezone to the conversion between epoch time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x use UTC timezone, by default true |
If the table type is MERGE_ON_READ, streaming read is supported through options:
Option Name | Required | Default | Remarks |
---|---|---|---|
read.streaming.enabled | N | false | Whether to read as streaming source, default false |
read.streaming.check-interval | N | 60 | Check interval for streaming read of SECOND, default 1 minute |
read.streaming.start-commit | N | N/A | Start commit instant for streaming read, the commit time format should be 'yyyyMMddHHmmss', by default reading from the latest instant |
WriteClient Configs
Jobs programming directly against the RDD level apis can build a HoodieWriteConfig
object and pass it in to the HoodieWriteClient
constructor.
HoodieWriteConfig can be built using a builder pattern as below.
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.forTable(tableName)
.withSchema(schemaStr)
.withProps(props) // pass raw k,v pairs from a property file.
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withXXX(...).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withXXX(...).build())
...
.build();
Following subsections go over different aspects of write configs, explaining most important configs with their property names, default values.
withPath(hoodie_base_path)
Property: hoodie.base.path
[Required]
Base DFS path under which all the data partitions are created. Always prefix it explicitly with the storage scheme (e.g hdfs://, s3:// etc). Hudi stores all the main meta-data about commits, savepoints, cleaning audit logs etc in .hoodie directory under the base directory.
withSchema(schema_str)
Property: hoodie.avro.schema
[Required]
This is the current reader avro schema for the table. This is a string of the entire schema. HoodieWriteClient uses this schema to pass on to implementations of HoodieRecordPayload to convert from the source format to avro record. This is also used when re-writing records during an update.
forTable(table_name)
Property: hoodie.table.name
[Required]
Table name that will be used for registering with Hive. Needs to be same across runs.
withBulkInsertParallelism(bulk_insert_parallelism = 1500)
Property: hoodie.bulkinsert.shuffle.parallelism
Bulk insert is meant to be used for large initial imports and this parallelism determines the initial number of files in your table. Tune this to achieve a desired optimal size during initial import.
withUserDefinedBulkInsertPartitionerClass(className = x.y.z.UserDefinedPatitionerClass)
Property: hoodie.bulkinsert.user.defined.partitioner.class
If specified, this class will be used to re-partition input records before they are inserted.
withBulkInsertSortMode(mode = BulkInsertSortMode.GLOBAL_SORT)
Property: hoodie.bulkinsert.sort.mode
Sorting modes to use for sorting records for bulk insert. This is leveraged when user defined partitioner is not configured. Default is GLOBAL_SORT.
Available values are - GLOBAL_SORT: this ensures best file sizes, with lowest memory overhead at cost of sorting.
PARTITION_SORT: Strikes a balance by only sorting within a partition, still keeping the memory overhead of writing lowest and best effort file sizing.
NONE: No sorting. Fastest and matches spark.write.parquet()
in terms of number of files, overheads
withParallelism(insert_shuffle_parallelism = 1500, upsert_shuffle_parallelism = 1500)
Property: hoodie.insert.shuffle.parallelism
, hoodie.upsert.shuffle.parallelism
Once data has been initially imported, this parallelism controls initial parallelism for reading input records. Ensure this value is high enough say: 1 partition for 1 GB of input data
withDeleteParallelism(parallelism = 1500)
Property: hoodie.delete.shuffle.parallelism
This parallelism is Used for "delete" operation while deduping or repartioning.
combineInput(on_insert = false, on_update=true)
Property: hoodie.combine.before.insert
, hoodie.combine.before.upsert
Flag which first combines the input RDD and merges multiple partial records into a single record before inserting or updating in DFS
combineDeleteInput(on_Delete = true)
Property: hoodie.combine.before.delete
Flag which first combines the input RDD and merges multiple partial records into a single record before deleting in DFS
withMergeAllowDuplicateOnInserts(mergeAllowDuplicateOnInserts = false)
Property: hoodie.merge.allow.duplicate.on.inserts
When enabled, will route new records as inserts and will not merge with existing records.
Result could contain duplicate entries.
withWriteStatusStorageLevel(level = MEMORY_AND_DISK_SER)
Property: hoodie.write.status.storage.level
HoodieWriteClient.insert and HoodieWriteClient.upsert returns a persisted RDD[WriteStatus], this is because the Client can choose to inspect the WriteStatus and choose and commit or not based on the failures. This is a configuration for the storage level for this RDD
withAutoCommit(autoCommit = true)
Property: hoodie.auto.commit
Should HoodieWriteClient autoCommit after insert and upsert. The client can choose to turn off auto-commit and commit on a "defined success condition"
withAssumeDatePartitioning(assumeDatePartitioning = false)
Property: hoodie.assume.date.partitioning
Should HoodieWriteClient assume the data is partitioned by dates, i.e three levels from base path. This is a stop-gap to support tables created by versions < 0.3.1. Will be removed eventually
withConsistencyCheckEnabled(enabled = false)
Property: hoodie.consistency.check.enabled
Should HoodieWriteClient perform additional checks to ensure written files' are listable on the underlying filesystem/storage. Set this to true, to workaround S3's eventual consistency model and ensure all data written as a part of a commit is faithfully available for queries.
withRollbackParallelism(rollbackParallelism = 100)
Property: hoodie.rollback.parallelism
Determines the parallelism for rollback of commits.
withRollbackUsingMarkers(rollbackUsingMarkers = false)
Property: hoodie.rollback.using.markers
Enables a more efficient mechanism for rollbacks based on the marker files generated during the writes. Turned off by default.
withMarkersDeleteParallelism(parallelism = 100)
Property: hoodie.markers.delete.parallelism
Determines the parallelism for deleting marker files.
Index configs
Following configs control indexing behavior, which tags incoming records as either inserts or updates to older records.
withIndexConfig (HoodieIndexConfig)
This is pluggable to have a external index (HBase) or use the default bloom filter stored in the Parquet files
withIndexClass(indexClass = "x.y.z.UserDefinedIndex")
Property: hoodie.index.class
Full path of user-defined index class and must be a subclass of HoodieIndex class. It will take precedence over the hoodie.index.type
configuration if specified
withIndexType(indexType = BLOOM)
Property: hoodie.index.type
Type of index to use. Default is Bloom filter. Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE]. Bloom filters removes the dependency on a external system and is stored in the footer of the Parquet Data Files
Bloom Index configs
bloomIndexFilterType(bucketizedChecking = BloomFilterTypeCode.SIMPLE)
Property: hoodie.bloom.index.filter.type
Filter type used. Default is BloomFilterTypeCode.SIMPLE. Available values are [BloomFilterTypeCode.SIMPLE , BloomFilterTypeCode.DYNAMIC_V0]. Dynamic bloom filters auto size themselves based on number of keys.
bloomFilterNumEntries(numEntries = 60000)
Property: hoodie.index.bloom.num_entries
Only applies if index type is BLOOM.
This is the number of entries to be stored in the bloom filter. We assume the maxParquetFileSize is 128MB and averageRecordSize is 1024B and hence we approx a total of 130K records in a file. The default (60000) is roughly half of this approximation. HUDI-56 tracks computing this dynamically. Warning: Setting this very low, will generate a lot of false positives and index lookup will have to scan a lot more files than it has to and Setting this to a very high number will increase the size every data file linearly (roughly 4KB for every 50000 entries). This config is also used with DYNNAMIC bloom filter which determines the initial size for the bloom.
bloomFilterFPP(fpp = 0.000000001)
Property: hoodie.index.bloom.fpp
Only applies if index type is BLOOM.
Error rate allowed given the number of entries. This is used to calculate how many bits should be assigned for the bloom filter and the number of hash functions. This is usually set very low (default: 0.000000001), we like to tradeoff disk space for lower false positives. If the number of entries added to bloom filter exceeds the congfigured value (hoodie.index.bloom.num_entries
), then this fpp may not be honored.
bloomIndexParallelism(0)
Property: hoodie.bloom.index.parallelism
Only applies if index type is BLOOM.
This is the amount of parallelism for index lookup, which involves a Spark Shuffle. By default, this is auto computed based on input workload characteristics
bloomIndexPruneByRanges(pruneRanges = true)
Property: hoodie.bloom.index.prune.by.ranges
Only applies if index type is BLOOM.
When true, range information from files to leveraged speed up index lookups. Particularly helpful, if the key has a monotonously increasing prefix, such as timestamp. If the record key is completely random, it is better to turn this off.
bloomIndexUseCaching(useCaching = true)
Property: hoodie.bloom.index.use.caching
Only applies if index type is BLOOM.
When true, the input RDD will cached to speed up index lookup by reducing IO for computing parallelism or affected partitions
bloomIndexTreebasedFilter(useTreeFilter = true)
Property: hoodie.bloom.index.use.treebased.filter
Only applies if index type is BLOOM.
When true, interval tree based file pruning optimization is enabled. This mode speeds-up file-pruning based on key ranges when compared with the brute-force mode
bloomIndexBucketizedChecking(bucketizedChecking = true)
Property: hoodie.bloom.index.bucketized.checking
Only applies if index type is BLOOM.
When true, bucketized bloom filtering is enabled. This reduces skew seen in sort based bloom index lookup
bloomIndexFilterDynamicMaxEntries(maxNumberOfEntries = 100000)
Property: hoodie.bloom.index.filter.dynamic.max.entries
The threshold for the maximum number of keys to record in a dynamic Bloom filter row. Only applies if filter type is BloomFilterTypeCode.DYNAMIC_V0.
bloomIndexKeysPerBucket(keysPerBucket = 10000000)
Property: hoodie.bloom.index.keys.per.bucket
Only applies if bloomIndexBucketizedChecking is enabled and index type is bloom.
This configuration controls the "bucket" size which tracks the number of record-key checks made against a single file and is the unit of work allocated to each partition performing bloom filter lookup. A higher value would amortize the fixed cost of reading a bloom filter to memory.
withBloomIndexInputStorageLevel(level = MEMORY_AND_DISK_SER)
Property: hoodie.bloom.index.input.storage.level
Only applies when #bloomIndexUseCaching is set. Determine what level of persistence is used to cache input RDDs.
Refer to org.apache.spark.storage.StorageLevel for different values
bloomIndexUpdatePartitionPath(updatePartitionPath = false)
Property: hoodie.bloom.index.update.partition.path
Only applies if index type is GLOBAL_BLOOM.
When set to true, an update including the partition path of a record that already exists will result in inserting the incoming record into the new partition and deleting the original record in the old partition. When set to false, the original record will only be updated in the old partition.
HBase Index configs
hbaseZkQuorum(zkString) [Required]
Property: hoodie.index.hbase.zkquorum
Only applies if index type is HBASE. HBase ZK Quorum url to connect to.
hbaseZkPort(port) [Required]
Property: hoodie.index.hbase.zkport
Only applies if index type is HBASE. HBase ZK Quorum port to connect to.