配置
该页面介绍了几种配置写入或读取Hudi数据集的作业的方法。 简而言之,您可以在几个级别上控制行为。
- Spark数据源配置 : 这些配置控制Hudi Spark数据源,提供如下功能: 定义键和分区、选择写操作、指定如何合并记录或选择要读取的视图类型。
- WriteClient 配置 : 在内部,Hudi数据源使用基于RDD的
HoodieWriteClient
API 真正执行对存储的写入。 这些配置可对文件大小、压缩(compression)、并行度、压缩(compaction)、写入模式、清理等底层方面进行完全控制。 尽管Hudi提供了合理的默认设置,但在不同情形下,可能需要对这些配置进行调整以针对特定的工作负载进行优化。 - RecordPayload 配置 : 这是Hudi提供的最底层的定制。
RecordPayload定义了如何根据传入的新记录和存储的旧记录来产生新值以进行插入更新。
Hudi提供了诸如
OverwriteWithLatestAvroPayload
的默认实现,该实现仅使用最新或最后写入的记录来更新存储。 在数据源和WriteClient级别,都可以将其重写为扩展HoodieRecordPayload
类的自定义类。
与云存储连接
无论使用RDD/WriteClient API还是数据源,以下信息都有助于配置对云存储的访问。
- AWS S3
S3和Hudi协同工作所需的配置。 - Google Cloud Storage
GCS和Hudi协同工作所需的配置。
Spark数据源配置
可以通过将以下选项传递到option(k,v)
方法中来配置使用数据源的Spark作业。
实际的数据源级别配置在下面列出。
写选项
另外,您可以使用options()
或option(k,v)
方法直接传递任何WriteClient级别的配置。
inputDF.write()
.format("org.apache.hudi")
.options(clientOpts) // 任何Hudi客户端选项都可以传入
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.mode(SaveMode.Append)
.save(basePath);
用于通过write.format.option(...)
写入数据集的选项
TABLE_NAME_OPT_KEY
属性:hoodie.datasource.write.table.name
[必须]
Hive表名,用于将数据集注册到其中。
OPERATION_OPT_KEY
属性:hoodie.datasource.write.operation
, 默认值:upsert
是否为写操作进行插入更新、插入或批量插入。使用bulkinsert
将新数据加载到表中,之后使用upsert
或insert
。
批量插入使用基于磁盘的写入路径来扩展以加载大量输入,而无需对其进行缓存。
STORAGE_TYPE_OPT_KEY
属性:hoodie.datasource.write.storage.type
, 默认值:COPY_ON_WRITE
此写入的基础数据的存储类型。两次写入之间不能改变。
PRECOMBINE_FIELD_OPT_KEY
属性:hoodie.datasource.write.precombine.field
, 默认值:ts
实际写入之前在preCombining中使用的字段。
当两个记录具有相同 的键值时,我们将使用Object.compareTo(..)从precombine字段中选择一个值最大的记录。
PAYLOAD_CLASS_OPT_KEY
属性:hoodie.datasource.write.payload.class
, 默认值:org.apache.hudi.OverwriteWithLatestAvroPayload
使用的有效载荷类。如果您想在插入更新或插入时使用自己的合并逻辑,请重写此方法。
这将使得PRECOMBINE_FIELD_OPT_VAL
设置的任何值无效
RECORDKEY_FIELD_OPT_KEY
属性:hoodie.datasource.write.recordkey.field
, 默认值:uuid
记录键字段。用作HoodieKey
中recordKey
部分的值。
实际值将通过在字段值上调用.toString()来获得。可以使用点符号指定嵌套字段,例如:a.b.c
PARTITIONPATH_FIELD_OPT_KEY
属性:hoodie.datasource.write.partitionpath.field
, 默认值:partitionpath
分区路径字段。用作HoodieKey
中partitionPath
部分的值。
通过调用.toString()获得实际的值
KEYGENERATOR_CLASS_OPT_KEY
属性:hoodie.datasource.write.keygenerator.class
, 默认值:org.apache.hudi.SimpleKeyGenerator
键生成器类,实现从输入的Row
对象中提取键
COMMIT_METADATA_KEYPREFIX_OPT_KEY
属性:hoodie.datasource.write.commitmeta.key.prefix
, 默认值:_
以该前缀开头的选项键会自动添加到提交/增量提交的元数据中。
这对于与hudi时间轴一致的方式存储检查点信息很有用
INSERT_DROP_DUPS_OPT_KEY
属性:hoodie.datasource.write.insert.drop.duplicates
, 默认值:false
如果设置为true,则在插入操作期间从传入DataFrame中过滤掉所有重复记录。
HIVE_SYNC_ENABLED_OPT_KEY
属性:hoodie.datasource.hive_sync.enable
, 默认值:false
设置为true时,将数据集注册并同步到Apache Hive Metastore
HIVE_DATABASE_OPT_KEY
属性:hoodie.datasource.hive_sync.database
, 默认值:default
要同步到的数据库
HIVE_TABLE_OPT_KEY
属性:hoodie.datasource.hive_sync.table
, [Required]
要同步到的表
HIVE_USER_OPT_KEY
属性:hoodie.datasource.hive_sync.username
, 默认值:hive
要使用的Hive用户名
HIVE_PASS_OPT_KEY
属性:hoodie.datasource.hive_sync.password
, 默认值:hive
要使用的Hive密码
HIVE_URL_OPT_KEY
属性:hoodie.datasource.hive_sync.jdbcurl
, 默认值:jdbc:hive2://localhost:10000
Hive metastore url
HIVE_PARTITION_FIELDS_OPT_KEY
属性:hoodie.datasource.hive_sync.partition_fields
, 默认值:
数据集中用于确定Hive分区的字段。
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY
属性:hoodie.datasource.hive_sync.partition_extractor_class
, 默认值:org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
用于将分区字段值提取到Hive分区列中的类。
HIVE_ASSUME_DATE_PARTITION_OPT_KEY
属性:hoodie.datasource.hive_sync.assume_date_partitioning
, 默认值:false
假设分区格式是yyyy/mm/dd
读选项
用于通过read.format.option(...)
读取数据集的选项
VIEW_TYPE_OPT_KEY
属性:hoodie.datasource.view.type
, 默认值:read_optimized
是否需要以某种模式读取数据,增量模式(自InstantTime以来的新数据)
(或)读优化模式(基于列数据获取最新视图)
(或)实时模式(基于行和列数据获取最新视图)
BEGIN_INSTANTTIME_OPT_KEY
属性:hoodie.datasource.read.begin.instanttime
, [在增量模式下必须]
开始增量提取数据的即时时间。这里的instanttime不必一定与时间轴上的即时相对应。
取出以instant_time > BEGIN_INSTANTTIME
写入的新数据。
例如:'20170901080000'将获取2017年9月1日08:00 AM之后写入的所有新数据。
END_INSTANTTIME_OPT_KEY
属性:hoodie.datasource.read.end.instanttime
, 默认值:最新即时(即从开始即时获取所有新数据)
限制增量提取的数据的即时时间。取出以instant_time <= END_INSTANTTIME
写入的新数据。
WriteClient 配置
直接使用RDD级别api进行编程的Jobs可以构建一个HoodieWriteConfig
对象,并将其传递给HoodieWriteClient
构造函数。
HoodieWriteConfig可以使用以下构建器模式构建。
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.forTable(tableName)
.withSchema(schemaStr)
.withProps(props) // 从属性文件传递原始k、v对。
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withXXX(...).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withXXX(...).build())
...
.build();
以下各节介绍了写配置的不同方面,并解释了最重要的配置及其属性名称和默认值。
withPath(hoodie_base_path)
属性:hoodie.base.path
[必须]
创建所有数据分区所依据的基本DFS路径。
始终在前缀中明确指明存储方式(例如hdfs://,s3://等)。
Hudi将有关提交、保存点、清理审核日志等的所有主要元数据存储在基本目录下的.hoodie目录中。
withSchema(schema_str)
属性:hoodie.avro.schema
[必须]
这是数据集的当前读取器的avro模式(schema)。
这是整个模式的字符串。HoodieWriteClient使用此模式传递到HoodieRecordPayload的实现,以从源格式转换为avro记录。
在更新过程中重写记录时也使用此模式。
forTable(table_name)
属性:hoodie.table.name
[必须]
数据集的表名,将用于在Hive中注册。每次运行需要相同。
withBulkInsertParallelism(bulk_insert_parallelism = 1500)
属性:hoodie.bulkinsert.shuffle.parallelism
批量插入旨在用于较大的初始导入,而此处的并行度决定了数据集中文件的初始数量。
调整此值以达到在初始导入期间所需的最佳尺寸。
withParallelism(insert_shuffle_parallelism = 1500, upsert_shuffle_parallelism = 1500)
属性:hoodie.insert.shuffle.parallelism
, hoodie.upsert.shuffle.parallelism
最初导入数据后,此并行度将控制用于读取输入记录的初始并行度。
确保此值足够高,例如:1个分区用于1 GB的输入数据
combineInput(on_insert = false, on_update=true)
属性:hoodie.combine.before.insert
, hoodie.combine.before.upsert
在DFS中插入或更新之前先组合输入RDD并将多个部分记录合并为单个记录的标志
withWriteStatusStorageLevel(level = MEMORY_AND_DISK_SER)
属性:hoodie.write.status.storage.level
HoodieWriteClient.insert和HoodieWriteClient.upsert返回一个持久的RDD[WriteStatus],
这是因为客户端可以选择检查WriteStatus并根据失败选择是否提交。这是此RDD的存储级别的配置
withAutoCommit(autoCommit = true)
属性:hoodie.auto.commit
插入和插入更新后,HoodieWriteClient是否应该自动提交。
客户端可以选择关闭自动提交,并在"定义的成功条件"下提交
withAssumeDatePartitioning(assumeDatePartitioning = false)
属性:hoodie.assume.date.partitioning
HoodieWriteClient是否应该假设数据按日 期划分,即从基本路径划分为三个级别。
这是支持 < 0.3.1版本创建的表的一个补丁。最终将被删除
withConsistencyCheckEnabled(enabled = false)
属性:hoodie.consistency.check.enabled
HoodieWriteClient是否应该执行其他检查,以确保写入的文件在基础文件系统/存储上可列出。
将其设置为true可以解决S3的最终一致性模型,并确保作为提交的一部分写入的所有数据均能准确地用于查询。
索引配置
以下配置控制索引行为,该行为将传入记录标记为对较旧记录的插入或更新。
withIndexConfig (HoodieIndexConfig)
可插入以具有外部索引(HBase)或使用存储在Parquet文件中的默认布隆过滤器(bloom filter)
withIndexType(indexType = BLOOM)
属性:hoodie.index.type
要使用的索引类型。默认为布隆过滤器。可能的选项是[BLOOM | HBASE | INMEMORY]。
布隆过滤器消除了对外部系统的依赖,并存储在Parquet数据文件的页脚中
bloomFilterNumEntries(numEntries = 60000)
属性:hoodie.index.bloom.num_entries
仅在索引类型为BLOOM时适用。
这是要存储在布隆过滤器中的条目数。
我们假设maxParquetFileSize为128MB,averageRecordSize为1024B,因此,一个文件中的记录总数约为130K。
默认值(60000)大约是此近似值的一半。HUDI-56
描述了如何动态地对此进行计算。
警告:将此值设置得太低,将产生很多误报,并且索引查找将必须扫描比其所需的更多的文件;如果将其设置得非常高,将线性增加每个数据文件的大小(每50000个条目大约4KB)。
bloomFilterFPP(fpp = 0.000000001)
属性:hoodie.index.bloom.fpp
仅在索引类型为BLOOM时适用。
根据条目数允许的错误率。
这用于计算应为布隆过滤器分配多少位以及哈希函数的数量。通常将此值设置得很低(默认值:0.000000001),我们希望在磁盘空间上进行权衡以降低误报率
bloomIndexPruneByRanges(pruneRanges = true)
属性:hoodie.bloom.index.prune.by.ranges
仅在索引类型为BLOOM时适用。
为true时,从文件框定信息,可以加快索引查找的速度。 如果键具有单调递增的前缀,例如时间戳,则特别有用。
bloomIndexUseCaching(useCaching = true)
属性:hoodie.bloom.index.use.caching
仅在索引类型为BLOOM时适用。
为true时,将通过减少用于计算并行度或受影响分区的IO来缓存输入的RDD以加快索引查找
bloomIndexTreebasedFilter(useTreeFilter = true)
属性:hoodie.bloom.index.use.treebased.filter
仅在索引类型为BLOOM时适用。
为true时,启用基于间隔树的文件过滤优化。与暴力模式相比,此模式可根据键范围加快文件过滤速度
bloomIndexBucketizedChecking(bucketizedChecking = true)
属性:hoodie.bloom.index.bucketized.checking
仅在索引类型为BLOOM时适用。
为true时,启用了桶式布隆过滤。这减少了在基于排序的布隆索引查找中看到的偏差
bloomIndexKeysPerBucket(keysPerBucket = 10000000)
属性:hoodie.bloom.index.keys.per.bucket
仅在启用bloomIndexBucketizedChecking并且索引类型为bloom的情况下适用。
此配置控制“存储桶”的 大小,该大小可跟踪对单个文件进行的记录键检查的次数,并且是分配给执行布隆过滤器查找的每个分区的工作单位。
较高的值将分摊将布隆过滤器读取到内存的固定成本。
bloomIndexParallelism(0)
属性:hoodie.bloom.index.parallelism
仅在索引类型为BLOOM时适用。
这是索引查找的并行度,其中涉及Spark Shuffle。 默认情况下,这是根据输入的工作负载特征自动计算的
hbaseZkQuorum(zkString) [必须]
属性:hoodie.index.hbase.zkquorum
仅在索引类型为HBASE时适用。要连接的HBase ZK Quorum URL。
hbaseZkPort(port) [必须]
属性:hoodie.index.hbase.zkport
仅在索引类型为HBASE时适用。要连接的HBase ZK Quorum端口。
hbaseZkZnodeParent(zkZnodeParent) [必须]
属性:hoodie.index.hbase.zknode.path
仅在索引类型为HBASE时适用。这是根znode,它将包含HBase创建及使用的所有znode。
hbaseTableName(tableName) [必须]
属性:hoodie.index.hbase.table
仅在索引类型为HBASE时适用。HBase表名称,用作索引。Hudi将row_key和[partition_path, fileID, commitTime]映射存储在表中。
存储选项
控制有关调整parquet和日志文件大小的方面。
withStorageConfig (HoodieStorageConfig)
limitFileSize (size = 120MB)
属性:hoodie.parquet.max.file.size
Hudi写阶段生成的parquet文件的目标大小。对于DFS,这需要与基础文件系统块大小保持一致,以实现最佳性能。
parquetBlockSize(rowgroupsize = 120MB)
属性:hoodie.parquet.block.size
Parquet行组大小。最好与文件大小相同,以便将文件中的单个列连续存储在磁盘上
parquetPageSize(pagesize = 1MB)
属性:hoodie.parquet.page.size
Parquet页面大小。页面是parquet文件中的读取单位。 在一个块内,页面被分别压缩。
parquetCompressionRatio(parquetCompressionRatio = 0.1)
属性:hoodie.parquet.compression.ratio
当Hudi尝试调整新parquet文件的大小时,预期对parquet数据进行压缩的比例。
如果bulk_insert生成的文件小于预期大小,请增加此值
parquetCompressionCodec(parquetCompressionCodec = gzip)
属性:hoodie.parquet.compression.codec
Parquet压缩编解码方式名称。默认值为gzip。可能的选项是[gzip | snappy | uncompressed | lzo]
logFileMaxSize(logFileSize = 1GB)
属性:hoodie.logfile.max.size
LogFile的最大大小。这是在将日志文件移到下一个版本之前允许的最大大小。
logFileDataBlockMaxSize(dataBlockSize = 256MB)
属性:hoodie.logfile.data.block.max.size
LogFile数据块的最大大小。这是允许将单个数据块附加到日志文件的最大大小。
这有助于确保附加到日志文件的数据被分解为可调整大小的块,以防止发生OOM错误。此大小应大于JVM内存。
logFileToParquetCompressionRatio(logFileToParquetCompressionRatio = 0.35)
属性:hoodie.logfile.to.parquet.compression.ratio
随着记录从日志文件移动到parquet,预期会进行额外压缩的比例。
用于merge_on_read存储,以将插入内容发送到日志文件中并控制压缩parquet文件的大小。
parquetCompressionCodec(parquetCompressionCodec = gzip)
属性:hoodie.parquet.compression.codec
Parquet文件的压缩编解码方式
压缩配置
压缩配置用于控制压缩(将日志文件合并到新的parquet基本文件中)、清理(回收较旧及未使用的文件组)。
withCompactionConfig (HoodieCompactionConfig)
withCleanerPolicy(policy = KEEP_LATEST_COMMITS)
属性:hoodie.cleaner.policy