Skip to main content
Version: 0.6.0

配置

该页面介绍了几种配置写入或读取Hudi数据集的作业的方法。 简而言之,您可以在几个级别上控制行为。

  • Spark数据源配置 : 这些配置控制Hudi Spark数据源,提供如下功能: 定义键和分区、选择写操作、指定如何合并记录或选择要读取的视图类型。
  • WriteClient 配置 : 在内部,Hudi数据源使用基于RDD的HoodieWriteClient API 真正执行对存储的写入。 这些配置可对文件大小、压缩(compression)、并行度、压缩(compaction)、写入模式、清理等底层方面进行完全控制。 尽管Hudi提供了合理的默认设置,但在不同情形下,可能需要对这些配置进行调整以针对特定的工作负载进行优化。
  • RecordPayload 配置 : 这是Hudi提供的最底层的定制。 RecordPayload定义了如何根据传入的新记录和存储的旧记录来产生新值以进行插入更新。 Hudi提供了诸如OverwriteWithLatestAvroPayload的默认实现,该实现仅使用最新或最后写入的记录来更新存储。 在数据源和WriteClient级别,都可以将其重写为扩展HoodieRecordPayload类的自定义类。

Spark数据源配置

可以通过将以下选项传递到option(k,v)方法中来配置使用数据源的Spark作业。 实际的数据源级别配置在下面列出。

写选项

另外,您可以使用options()option(k,v)方法直接传递任何WriteClient级别的配置。

inputDF.write()
.format("org.apache.hudi")
.options(clientOpts) // 任何Hudi客户端选项都可以传入
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.mode(SaveMode.Append)
.save(basePath);

用于通过write.format.option(...)写入数据集的选项

TABLE_NAME_OPT_KEY

属性:hoodie.datasource.write.table.name [必须]

Hive表名,用于将数据集注册到其中。#### OPERATION_OPT_KEY 属性:`hoodie.datasource.write.operation`, 默认值:`upsert`
是否为写操作进行插入更新、插入或批量插入。使用`bulkinsert`将新数据加载到表中,之后使用`upsert`或`insert`。 批量插入使用基于磁盘的写入路径来扩展以加载大量输入,而无需对其进行缓存。#### STORAGE_TYPE_OPT_KEY 属性:`hoodie.datasource.write.storage.type`, 默认值:`COPY_ON_WRITE`
此写入的基础数据的存储类型。两次写入之间不能改变。#### PRECOMBINE_FIELD_OPT_KEY 属性:`hoodie.datasource.write.precombine.field`, 默认值:`ts`
实际写入之前在preCombining中使用的字段。 当两个记录具有相同的键值时,我们将使用Object.compareTo(..)从precombine字段中选择一个值最大的记录。

PAYLOAD_CLASS_OPT_KEY

属性:hoodie.datasource.write.payload.class, 默认值:org.apache.hudi.OverwriteWithLatestAvroPayload

使用的有效载荷类。如果您想在插入更新或插入时使用自己的合并逻辑,请重写此方法。 这将使得`PRECOMBINE_FIELD_OPT_VAL`设置的任何值无效#### RECORDKEY_FIELD_OPT_KEY 属性:`hoodie.datasource.write.recordkey.field`, 默认值:`uuid`
记录键字段。用作`HoodieKey`中`recordKey`部分的值。 实际值将通过在字段值上调用.toString()来获得。可以使用点符号指定嵌套字段,例如:`a.b.c`

PARTITIONPATH_FIELD_OPT_KEY

属性:hoodie.datasource.write.partitionpath.field, 默认值:partitionpath

分区路径字段。用作`HoodieKey`中`partitionPath`部分的值。 通过调用.toString()获得实际的值

HIVE_STYLE_PARTITIONING_OPT_KEY

属性:hoodie.datasource.write.hive_style_partitioning, 默认值:false

如果设置为true,则生成基于Hive格式的partition目录:<= END_INSTANTTIME`写入的新数据。

WriteClient 配置

直接使用RDD级别api进行编程的Jobs可以构建一个HoodieWriteConfig对象,并将其传递给HoodieWriteClient构造函数。 HoodieWriteConfig可以使用以下构建器模式构建。

HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.forTable(tableName)
.withSchema(schemaStr)
.withProps(props) // 从属性文件传递原始k、v对。
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withXXX(...).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withXXX(...).build())
...
.build();

以下各节介绍了写配置的不同方面,并解释了最重要的配置及其属性名称和默认值。

withPath(hoodie_base_path)

属性:hoodie.base.path [必须]

创建所有数据分区所依据的基本DFS路径。 始终在前缀中明确指明存储方式(例如hdfs://,s3://等)。 Hudi将有关提交、保存点、清理审核日志等的所有主要元数据存储在基本目录下的.hoodie目录中。

withSchema(schema_str)

属性:hoodie.avro.schema [必须]

这是数据集的当前读取器的avro模式(schema)。 这是整个模式的字符串。HoodieWriteClient使用此模式传递到HoodieRecordPayload的实现,以从源格式转换为avro记录。 在更新过程中重写记录时也使用此模式。

forTable(table_name)

属性:hoodie.table.name [必须]

数据集的表名,将用于在Hive中注册。每次运行需要相同。

withBulkInsertParallelism(bulk_insert_parallelism = 1500)

属性:hoodie.bulkinsert.shuffle.parallelism

批量插入旨在用于较大的初始导入,而此处的并行度决定了数据集中文件的初始数量。 调整此值以达到在初始导入期间所需的最佳尺寸。

withParallelism(insert_shuffle_parallelism = 1500, upsert_shuffle_parallelism = 1500)

属性:hoodie.insert.shuffle.parallelism, hoodie.upsert.shuffle.parallelism

最初导入数据后,此并行度将控制用于读取输入记录的初始并行度。 确保此值足够高,例如:1个分区用于1 GB的输入数据

combineInput(on_insert = false, on_update=true)

属性:hoodie.combine.before.insert, hoodie.combine.before.upsert

在DFS中插入或更新之前先组合输入RDD并将多个部分记录合并为单个记录的标志

withWriteStatusStorageLevel(level = MEMORY_AND_DISK_SER)

属性:hoodie.write.status.storage.level

HoodieWriteClient.insert和HoodieWriteClient.upsert返回一个持久的RDD[WriteStatus], 这是因为客户端可以选择检查WriteStatus并根据失败选择是否提交。这是此RDD的存储级别的配置

withAutoCommit(autoCommit = true)

属性:hoodie.auto.commit

插入和插入更新后,HoodieWriteClient是否应该自动提交。 客户端可以选择关闭自动提交,并在"定义的成功条件"下提交

withAssumeDatePartitioning(assumeDatePartitioning = false)

属性:hoodie.assume.date.partitioning

HoodieWriteClient是否应该假设数据按日期划分,即从基本路径划分为三个级别。 这是支持 < 0.3.1版本创建的表的一个补丁。最终将被删除

withConsistencyCheckEnabled(enabled = false)

属性:hoodie.consistency.check.enabled

HoodieWriteClient是否应该执行其他检查,以确保写入的文件在基础文件系统/存储上可列出。 将其设置为true可以解决S3的最终一致性模型,并确保作为提交的一部分写入的所有数据均能准确地用于查询。

索引配置

以下配置控制索引行为,该行为将传入记录标记为对较旧记录的插入或更新。

withIndexConfig (HoodieIndexConfig)

可插入以具有外部索引(HBase)或使用存储在Parquet文件中的默认布隆过滤器(bloom filter)

withIndexClass(indexClass = "x.y.z.UserDefinedIndex")

属性:hoodie.index.class

用户自定义索引的全路径名,索引类必须为HoodieIndex的子类,当指定该配置时,其会优先于`hoodie.index.type`配置#### withIndexType(indexType = BLOOM) 属性:`hoodie.index.type`
要使用的索引类型。默认为布隆过滤器。可能的选项是[BLOOM | HBASE | INMEMORY]。 布隆过滤器消除了对外部系统的依赖,并存储在Parquet数据文件的页脚中

bloomFilterNumEntries(numEntries = 60000)

属性:hoodie.index.bloom.num_entries

仅在索引类型为BLOOM时适用。
这是要存储在布隆过滤器中的条目数。 我们假设maxParquetFileSize为128MB,averageRecordSize为1024B,因此,一个文件中的记录总数约为130K。 默认值(60000)大约是此近似值的一半。[HUDI-56](https://issues.apache.org/jira/browse/HUDI-56) 描述了如何动态地对此进行计算。 警告:将此值设置得太低,将产生很多误报,并且索引查找将必须扫描比其所需的更多的文件;如果将其设置得非常高,将线性增加每个数据文件的大小(每50000个条目大约4KB)。

bloomFilterFPP(fpp = 0.000000001)

属性:hoodie.index.bloom.fpp

仅在索引类型为BLOOM时适用。
根据条目数允许的错误率。 这用于计算应为布隆过滤器分配多少位以及哈希函数的数量。通常将此值设置得很低(默认值:0.000000001),我们希望在磁盘空间上进行权衡以降低误报率

bloomIndexPruneByRanges(pruneRanges = true)

属性:hoodie.bloom.index.prune.by.ranges

仅在索引类型为BLOOM时适用。
为true时,从文件框定信息,可以加快索引查找的速度。 如果键具有单调递增的前缀,例如时间戳,则特别有用。

bloomIndexUseCaching(useCaching = true)

属性:hoodie.bloom.index.use.caching

仅在索引类型为BLOOM时适用。
为true时,将通过减少用于计算并行度或受影响分区的IO来缓存输入的RDD以加快索引查找

bloomIndexTreebasedFilter(useTreeFilter = true)

属性:hoodie.bloom.index.use.treebased.filter

仅在索引类型为BLOOM时适用。
为true时,启用基于间隔树的文件过滤优化。与暴力模式相比,此模式可根据键范围加快文件过滤速度

bloomIndexBucketizedChecking(bucketizedChecking = true)

属性:hoodie.bloom.index.bucketized.checking

仅在索引类型为BLOOM时适用。
为true时,启用了桶式布隆过滤。这减少了在基于排序的布隆索引查找中看到的偏差

bloomIndexKeysPerBucket(keysPerBucket = 10000000)

属性:hoodie.bloom.index.keys.per.bucket

仅在启用bloomIndexBucketizedChecking并且索引类型为bloom的情况下适用。
此配置控制“存储桶”的大小,该大小可跟踪对单个文件进行的记录键检查的次数,并且是分配给执行布隆过滤器查找的每个分区的工作单位。 较高的值将分摊将布隆过滤器读取到内存的固定成本。

bloomIndexParallelism(0)

属性:hoodie.bloom.index.parallelism

仅在索引类型为BLOOM时适用。
这是索引查找的并行度,其中涉及Spark Shuffle。 默认情况下,这是根据输入的工作负载特征自动计算的

hbaseZkQuorum(zkString) [必须]

属性:hoodie.index.hbase.zkquorum

仅在索引类型为HBASE时适用。要连接的HBase ZK Quorum URL。

hbaseZkPort(port) [必须]

属性:hoodie.index.hbase.zkport

仅在索引类型为HBASE时适用。要连接的HBase ZK Quorum端口。

hbaseZkZnodeParent(zkZnodeParent) [必须]

属性:hoodie.index.hbase.zknode.path

仅在索引类型为HBASE时适用。这是根znode,它将包含HBase创建及使用的所有znode。

hbaseTableName(tableName) [必须]

属性:hoodie.index.hbase.table

仅在索引类型为HBASE时适用。HBase表名称,用作索引。Hudi将row_key和[partition_path, fileID, commitTime]映射存储在表中。
bloomIndexUpdatePartitionPath(updatePartitionPath = false)

属性:hoodie.bloom.index.update.partition.path

仅在索引类型为GLOBAL_BLOOM时适用。
为true时,当对一个已有记录执行包含分区路径的更新操作时,将会导致把新记录插入到新分区,而把原有记录从旧分区里删除。为false时,只对旧分区的原有记录进行更新。

存储选项

控制有关调整parquet和日志文件大小的方面。

withStorageConfig (HoodieStorageConfig)

limitFileSize (size = 120MB)

属性:hoodie.parquet.max.file.size

Hudi写阶段生成的parquet文件的目标大小。对于DFS,这需要与基础文件系统块大小保持一致,以实现最佳性能。

parquetBlockSize(rowgroupsize = 120MB)

属性:hoodie.parquet.block.size

Parquet行组大小。最好与文件大小相同,以便将文件中的单个列连续存储在磁盘上

parquetPageSize(pagesize = 1MB)

属性:hoodie.parquet.page.size

Parquet页面大小。页面是parquet文件中的读取单位。 在一个块内,页面被分别压缩。

parquetCompressionRatio(parquetCompressionRatio = 0.1)

属性:hoodie.parquet.compression.ratio

当Hudi尝试调整新parquet文件的大小时,预期对parquet数据进行压缩的比例。 如果bulk_insert生成的文件小于预期大小,请增加此值

parquetCompressionCodec(parquetCompressionCodec = gzip)

属性:hoodie.parquet.compression.codec

Parquet压缩编解码方式名称。默认值为gzip。可能的选项是[gzip | snappy | uncompressed | lzo]

logFileMaxSize(logFileSize = 1GB)

属性:hoodie.logfile.max.size

LogFile的最大大小。这是在将日志文件移到下一个版本之前允许的最大大小。

logFileDataBlockMaxSize(dataBlockSize = 256MB)

属性:hoodie.logfile.data.block.max.size

LogFile数据块的最大大小。这是允许将单个数据块附加到日志文件的最大大小。 这有助于确保附加到日志文件的数据被分解为可调整大小的块,以防止发生OOM错误。此大小应大于JVM内存。

logFileToParquetCompressionRatio(logFileToParquetCompressionRatio = 0.35)

属性:hoodie.logfile.to.parquet.compression.ratio

随着记录从日志文件移动到parquet,预期会进行额外压缩的比例。 用于merge_on_read存储,以将插入内容发送到日志文件中并控制压缩parquet文件的大小。#### parquetCompressionCodec(parquetCompressionCodec = gzip) 属性:`hoodie.parquet.compression.codec`
Parquet文件的压缩编解码方式

压缩配置

压缩配置用于控制压缩(将日志文件合并到新的parquet基本文件中)、清理(回收较旧及未使用的文件组)。 withCompactionConfig (HoodieCompactionConfig)

withCleanerPolicy(policy = KEEP_LATEST_COMMITS)

属性:hoodie.cleaner.policy

要使用的清理政策。Hudi将删除旧版本的parquet文件以回收空间。 任何引用此版本文件的查询和计算都将失败。最好确保数据保留的时间超过最大查询执行时间。

retainCommits(no_of_commits_to_retain = 24)

属性:hoodie.cleaner.commits.retained

保留的提交数。因此,数据将保留为num_of_commits * time_between_commits(计划的)。 这也直接转化为您可以逐步提取此数据集的数量

archiveCommitsWith(minCommits = 96, maxCommits = 128)

属性:hoodie.keep.min.commits, hoodie.keep.max.commits

每个提交都是`.hoodie`目录中的一个小文件。由于DFS通常不支持大量小文件,因此Hudi将较早的提交归档到顺序日志中。 提交通过重命名提交文件以原子方式发布。

withCommitsArchivalBatchSize(batch = 10)

属性:hoodie.commits.archival.batch

这控制着批量读取并一起归档的提交即时的数量。

compactionSmallFileSize(size = 0)

属性:hoodie.parquet.small.file.limit

该值应小于maxFileSize,如果将其设置为0,会关闭此功能。 由于批处理中分区中插入记录的数量众多,总会出现小文件。 Hudi提供了一个选项,可以通过将对该分区中的插入作为对现有小文件的更新来解决小文件的问题。 此处的大小是被视为“小文件大小”的最小文件大小。

insertSplitSize(size = 500000)

属性:hoodie.copyonwrite.insert.split.size

插入写入并行度。为单个分区的总共插入次数。 写出100MB的文件,至少1kb大小的记录,意味着每个文件有100K记录。默认值是超额配置为500K。 为了改善插入延迟,请对其进行调整以匹配单个文件中的记录数。 将此值设置为较小的值将导致文件变小(尤其是当compactionSmallFileSize为0时)

autoTuneInsertSplits(true)

属性:hoodie.copyonwrite.insert.auto.split

Hudi是否应该基于最后24个提交的元数据动态计算insertSplitSize。默认关闭。

approxRecordSize(size = 1024)

属性:hoodie.copyonwrite.record.size.estimate

平均记录大小。如果指定,hudi将使用它,并且不会基于最后24个提交的元数据动态地计算。 没有默认值设置。这对于计算插入并行度以及将插入打包到小文件中至关重要。如上所述。

withInlineCompaction(inlineCompaction = false)

属性:hoodie.compact.inline

当设置为true时,紧接在插入或插入更新或批量插入的提交或增量提交操作之后由摄取本身触发压缩

withMaxNumDeltaCommitsBeforeCompaction(maxNumDeltaCommitsBeforeCompaction = 10)

属性:hoodie.compact.inline.max.delta.commits

触发内联压缩之前要保留的最大增量提交数

withCompactionLazyBlockReadEnabled(true)

属性:hoodie.compaction.lazy.block.read

当CompactedLogScanner合并所有日志文件时,此配置有助于选择是否应延迟读取日志块。 选择true以使用I/O密集型延迟块读取(低内存使用),或者为false来使用内存密集型立即块读取(高内存使用)

withCompactionReverseLogReadEnabled(false)

属性:hoodie.compaction.reverse.log.read

HoodieLogFormatReader会从pos=0到pos=file_length向前读取日志文件。 如果此配置设置为true,则Reader会从pos=file_length到pos=0反向读取日志文件

withCleanerParallelism(cleanerParallelism = 200)

属性:hoodie.cleaner.parallelism

如果清理变慢,请增加此值。

withCompactionStrategy(compactionStrategy = org.apache.hudi.io.compact.strategy.LogFileSizeBasedCompactionStrategy)

属性:hoodie.compaction.strategy

用来决定在每次压缩运行期间选择要压缩的文件组的压缩策略。 默认情况下,Hudi选择具有累积最多未合并数据的日志文件

withTargetIOPerCompactionInMB(targetIOPerCompactionInMB = 500000)

属性:hoodie.compaction.target.io

LogFileSizeBasedCompactionStrategy的压缩运行期间要花费的MB量。当压缩以内联模式运行时,此值有助于限制摄取延迟。

withTargetPartitionsPerDayBasedCompaction(targetPartitionsPerCompaction = 10)

属性:hoodie.compaction.daybased.target

由org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy使用,表示在压缩运行期间要压缩的最新分区数。

withPayloadClass(payloadClassName = org.apache.hudi.common.model.HoodieAvroPayload)

属性:hoodie.compaction.payload.class

这需要与插入/插入更新过程中使用的类相同。 就像写入一样,压缩也使用记录有效负载类将日志中的记录彼此合并,再次与基本文件合并,并生成压缩后要写入的最终记录。

指标配置

配置Hudi指标报告。 withMetricsConfig (HoodieMetricsConfig)

Hudi会发布有关每次提交、清理、回滚等的指标。

GRAPHITE

on(metricsOn = false)

属性:hoodie.metrics.on

打开或关闭发送指标。默认情况下处于关闭状态。
withReporterType(reporterType = GRAPHITE)

属性:hoodie.metrics.reporter.type

指标报告者的类型。默认使用graphite。
toGraphiteHost(host = localhost)

属性:hoodie.metrics.graphite.host

要连接的graphite主机
onGraphitePort(port = 4756)

属性:hoodie.metrics.graphite.port

要连接的graphite端口
usePrefix(prefix = "")

属性:hoodie.metrics.graphite.metric.prefix

适用于所有指标的标准前缀。这有助于添加如数据中心、环境等信息

JMX

on(metricsOn = false)

属性:hoodie.metrics.on

打开或关闭发送指标。默认情况下处于关闭状态。
withReporterType(reporterType = JMX)

属性:hoodie.metrics.reporter.type

指标报告者的类型。
toJmxHost(host = localhost)

属性:hoodie.metrics.jmx.host

要连接的Jmx主机
onJmxPort(port = 1000-5000)

属性:hoodie.metrics.graphite.port

要连接的Jmx端口

DATADOG

on(metricsOn = false)

属性:hoodie.metrics.on

打开或关闭发送指标。默认情况下处于关闭状态。
withReporterType(reporterType = DATADOG)

属性: hoodie.metrics.reporter.type

指标报告者的类型。
withDatadogReportPeriodSeconds(period = 30)

属性: hoodie.metrics.datadog.report.period.seconds

Datadog报告周期,单位为秒,默认30秒。
withDatadogApiSite(apiSite)

属性: hoodie.metrics.datadog.api.site

Datadog API站点:EU 或者 US
withDatadogApiKey(apiKey)

属性: hoodie.metrics.datadog.api.key

Datadog API密匙
withDatadogApiKeySkipValidation(skip = false)

属性: hoodie.metrics.datadog.api.key.skip.validation

在通过Datadog API发送指标前,选择是否跳过验证API密匙。默认不跳过。
withDatadogApiKeySupplier(apiKeySupplier)

属性: hoodie.metrics.datadog.api.key.supplier

Datadog API 密匙提供者,用来在运行时提供密匙。只有当`hoodie.metrics.datadog.api.key`未设定的情况下才有效。
withDatadogApiTimeoutSeconds(timeout = 3)

属性: hoodie.metrics.datadog.metric.prefix

Datadog API超时时长,单位为秒,默认3秒。
withDatadogPrefix(prefix)

属性: hoodie.metrics.datadog.metric.prefix

Datadog指标前缀。将被加在所有指标名称前,以点间隔。例如:如果设成`foo`,`foo.`将被用作实际前缀。
withDatadogHost(host)

属性: hoodie.metrics.datadog.metric.host

Datadog指标主机,将和指标数据一并发送。
withDatadogTags(tags)

属性: hoodie.metrics.datadog.metric.tags

Datadog指标标签(逗号分隔),将和指标数据一并发送。

用户自定义发送器

on(metricsOn = false)

属性: hoodie.metrics.on

打开或关闭发送指标。默认情况下处于关闭状态。
withReporterClass(className = "")

属性: hoodie.metrics.reporter.class

用于处理发送指标的用户自定义类,必须是AbstractUserDefinedMetricsReporter类的子类.

内存配置

控制由Hudi内部执行的压缩和合并的内存使用情况 withMemoryConfig (HoodieMemoryConfig)

内存相关配置

withMaxMemoryFractionPerPartitionMerge(maxMemoryFractionPerPartitionMerge = 0.6)

属性:hoodie.memory.merge.fraction

该比例乘以用户内存比例(1-spark.memory.fraction)以获得合并期间要使用的堆空间的最终比例

withMaxMemorySizePerCompactionInBytes(maxMemorySizePerCompactionInBytes = 1GB)

属性:hoodie.memory.compaction.fraction

HoodieCompactedLogScanner读取日志块,将记录转换为HoodieRecords,然后合并这些日志块和记录。 在任何时候,日志块中的条目数可以小于或等于相应的parquet文件中的条目数。这可能导致Scanner出现OOM。 因此,可溢出的映射有助于减轻内存压力。使用此配置来设置可溢出映射的最大允许inMemory占用空间。

withWriteStatusFailureFraction(failureFraction = 0.1)

属性:hoodie.memory.writestatus.failure.fraction

此属性控制报告给驱动程序的失败记录和异常的比例

写提交回调配置

控制写提交的回调。 如果用户启用了回调并且回调过程发生了错误,则会抛出异常。 当前支持HTTP, Kafka 两种回调方式。 withCallbackConfig (HoodieWriteCommitCallbackConfig)

写提交回调相关配置
writeCommitCallbackOn(callbackOn = false)

Property: hoodie.write.commit.callback.on

打开或关闭回调功能. 默认关闭.
withCallbackClass(callbackClass)

Property: hoodie.write.commit.callback.class

回调类的完全限定名,必须实现HoodieWriteCommitCallback接口。默认 org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback

HTTP CALLBACK

通过 HTTP 发送写提交回调信息. 这是默认的实现方式,用户不需要显式指定。

withCallbackHttpUrl(url)

Property: hoodie.write.commit.callback.http.url

Http回调主机,回调信息将会发送到该主机
withCallbackHttpTimeoutSeconds(timeoutSeconds = 3)

Property: hoodie.write.commit.callback.http.timeout.seconds

Http回调超时时间(单位秒),默认3秒
withCallbackHttpApiKey(apiKey)

Property: hoodie.write.commit.callback.http.api.key

Http 回调秘钥. 默认 hudi_write_commit_http_callback

KAFKA CALLBACK

使用Kafka发送写提交回调信息, 用户需要配置 hoodie.write.commit.callback.class = org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback

CALLBACK_KAFKA_BOOTSTRAP_SERVERS

Property: hoodie.write.commit.callback.kafka.bootstrap.servers

Kafka 集群地址
CALLBACK_KAFKA_TOPIC

Property: hoodie.write.commit.callback.kafka.topic

发送回调信息的topic
CALLBACK_KAFKA_PARTITION

Property: hoodie.write.commit.callback.kafka.partition

指定发送的分区, 默认 0
CALLBACK_KAFKA_ACKS

Property: hoodie.write.commit.callback.kafka.acks

Acks 级别, 默认 `all`
CALLBACK_KAFKA_RETRIES

Property: hoodie.write.commit.callback.kafka.retries

Kafka 发送数据失败重试次数. 默认 3 次