Apache Hudi Technical Specification
Syntax | Description |
---|---|
Last Updated | Dec 2024 |
Table Version | 8 |
Please check the specification for versions prior to 1.0 here.
Hudi brings database capabilities (tables, transactions, mutability, indexes, storage layouts) along with an incremental stream processing model (incremental merges, change streams, out-of-order data) over very large collections of files/objects, turning immutable cloud/file storage systems into transactional data lakes.
Overview
This document is a specification for the Hudi's storage format along with protocols for correctly implementing readers and writers to accomplish the following goals.
- Unified Computation Model - a unified way to combine large batch style operations and frequent near real time incremental operations over large datasets.
- Self-Optimizing Storage - automatically handle all the table maintenance and optimizations such as compaction, clustering, cleaning asynchronously and in most cases non-blocking to actual data changes.
- Cloud Native Database - abstracts Table/Schema from actual storage and ensures up-to-date metadata and indexes unlocking multi-fold read and write performance optimizations.
- Cross-Engine Compatibility - designed to be neutral and compatible with different computation engines. Hudi will manage metadata, and provide common abstractions and pluggable interfaces to most/all common compute/query engines.
This document is intended as reference guide for any compute engines, that aim to write/read Hudi tables, by interacting with the storage format directly.
Storage Layout
Hudi organizes a table as a collection of files (objects in cloud storage) that can be spread across different paths on storage which can be local filesystem, distributed filesystem or object storage. These different paths that contain a table's files are called partitions. Some common ways of organizing files can be as follows
-
Hierarchical folder tree: files are organized under a folder path structure like conventional filesystems, for ease of access and navigation. Hive-style partitioning is a special case of this organization, where the folder path names indicate the field values that partition the data. However, note that, unlike Hive style partitioning, partition columns are not removed from data files and partitioning is a mere organizational construct.
-
Cloud-optimized with random-prefixes: files are distributed across different paths (of even varying depth) across cloud storage, to circumvent scaling/throttling issues that plague cloud storage systems, at the cost of losing easy navigation of storage folder tree using standard UI/CLI tools.
-
Unpartitioned flat structure: tables can also be totally unpartitioned, where a single folder contains all the files that constitute a table.
Metadata about the table is stored at a location on storage, referred to as basepath, which contains a special reserved .hoodie directory under the base path
is used to store transaction logs, metadata and indexes. A special file hoodie.properties
under basepath persists table level configurations, shared by writers
and readers of the table. These configurations are explained here, and any config without a default value needs to be specified during table creation.
/data/hudi_trips/ <-- Base Path
├── .hoodie/ <-- Meta Path
| └── hoodie.properties <-- Table Configs
│ └── metadata/ <-- Metadata
| └── files/ <-- Files that make up the table
| └── col_stats/ <-- Statistics on columns for each file
| └── record_index/ <-- Unique index on record key
| └── [index_type]_[index_name]/ <-- secondary indexes of different types
| └── .index_defs/index.json <-- index definitions
│ └── timeline/ <-- active timeline
| └── history/ <-- timeline history
├── americas/ <-- Data stored as folder tree
│ ├── brazil/
│ │ └── sao_paulo <-- Partition Path
│ │ ├── [data_files]
│ └── united_states/
│ └── san_francisco/
│ ├── [data_files]
└── asia/
└── india/
└── chennai/
├── [data_files]
Data files can be either base files or log files. Base files contain records stored in columnar or SST table like file formats depending on use-cases. Log files store deltas (partial or complete), deletes, change logs and other record level operations performed on the records in the base file. Data files are organized into logical concepts called file groups, uniquely identified by a file id. Each record in the table is identified by an unique key and mapped to a single file group at any given time. Within a file group, data files are further split into file slices, where each file slice contains an optional base file and a list of log files, that constitute the state of all records in the file group at a given time. These constructs allows Hudi to efficiently support incremental operations, as will be evident later.
Timeline
Hudi stores all actions performed on a table into a Log Structured Merge (LSM) tree structure called the Timeline. Unlike typical LSM implementations, the memory component and the write-ahead-log are at once replaced by avro serialized files containing individual actions (active timeline) for high durability and inter-process co-ordination. Every action on the Hudi table creates a new entry (instant) in the active timeline and periodically, actions move from the active timeline to the LSM structure. Each new actions and state changes need to be atomically published to the timeline (see Appendix for storage specific guidelines to achieve that).
Time
Each action on the timeline is stamped with a time at which it began and completed. The notion of time can be logical or physical timestamps, but it's required that each timestamp generated by a process is monotonically increasing with respect to the previous time generated by the same or another process. This requirement can be satisfied by implementing a TrueTime generator with an external time generator or rely on system epoch times with assumed bounds on clock skews. See appendix for more.
Actions
Actions have a plan (optional) and completion metadata associated with them that capture how the action alters the table state. The metadata is serialized as Avro, and the schema for each of these actions is described in avro here. The following are the key actions on the Hudi timeline.
Action type | Description | Action Metadata |
---|---|---|
commit | A write operation that produces new base files containing either new records or modified values of existing records. | HoodieCommitMetadata |
deltacommit | A write operation that produces new log files contains either new records or deltas/deletes for existing records. Optionally, it can also produce new log files. | HoodieCommitMetadata |
replacecommit | A write operation that replaces a set of file groups with another atomically. It can be used to cluster the data for better query performance or rewrite files to enforce time-to-live. The requirement for this action is that the table state before and after are logically equivalent. | HoodieRequestedReplaceMetadata HoodieReplaceCommitMetadata |
clean | Management activity that cleans up older versions of data files that no longer will be accessed, to keep the storage size of the table in check. | HoodieCleanerPlan HoodieCleanMetadata |
compaction | Management activity that applies deltas/deletes from a set of log files to records on a base file and produces new base files. This amortizes the update costs, re-optimizes the file group to ultimately improve query performance. | HoodieCompactionOperation HoodieCompactionMetadata |
logcompaction | Management activity that consolidates a set of (typically smaller) log files into another log file within the same file group, to improve query performance between compaction actions. | HoodieCompactionOperation HoodieCompactionMetadata |
indexing | Management activity to build a new index from the table state. This action is used to update/build the index asynchronously even as write actions above happen on the table. | HoodieIndexPlan HoodieIndexCommitMetadata |
rollback | Denotes that the changes made by the corresponding commit/delta commit were unsuccessful & hence rolled back, removing any partial files produced during such a write. | HoodieRollbackPlan HoodieRollbackMetadata |
savepoint | Savepoint is a special marker action to ensure a particular commit is not automatically cleaned. It helps restore the table to a point on the timeline, in case of disaster/data recovery scenarios | HoodieSavepointMetadata |
restore | Restore denotes that the table was restored to a particular savepoint, physically removing data written after that savepoint. | HoodieRestorePlan HoodieRestoreMetadata |
States
An action can be in any one of the following states on the active timeline.
State | Description |
---|---|
requested | begin time for the action is generated and the action is requested along with any metadata to "plan" the action. Stored in the active timeline as [begin_instant_time].[action_type].[state] |
inflight | A process has attempted to execute a requested action. Note that this process could be still executing or failed midway. Actions are idempotent, and could fail many times in this state. Stored in the active timeline as [begin_instant_time].[action_type].[state] |
completed | completion time is generated and the action has completed successfully by publishing a file with both begin and completion time on the timeline. |
All the actions in requested/inflight states are stored in the active timeline as files named [begin_instant_time].[action_type].[requested|inflight]. Completed actions are stored along with a time that denotes when the action was completed, in a file named [begin_instant_time]_[completion_instant_time].[action_type].
LSM Timeline
Completed actions, their plans and completion metadata are stored in a more scalable LSM tree based timeline organized in an timeline/history storage location under the .hoodie metadata path. It consists of Apache Parquet files with action instant data and bookkeeping metadata files, in the following manner.
/.hoodie/timeline/history
├── _version_ <-- stores the manifest version that is current
├── manifest_1 <-- manifests store list of files in timeline
├── manifest_2 <-- compactions, cleaning, writes produce new manifest files
├── ...
├── manifest_[N] <-- there can be many manifest files at any given time
├── [min_time]_[max_time]_[level].parquet <-- files storing actual action details
The schema of the individual files are as follows.
File type | File naming | Description |
---|---|---|
version | _version_ | UTF-8 encoded string representing the manifest version to read. Version file should be atomically updated on storage when new manifests are created. |
manifests | manifest_[N] | Contains a json string, with a list of file name and file length for all LSM files part of the timeline. |
LSM files | [min_time]_[max_time]_[level].parquet | where : min_time is the minimum begin time of all actions in the file max_time is the maximum completion time of all actions in the file. level is an integer starting from 0, indicating the level in the LSM tree. |
The actual parquet file schema is:
Field name | Type | Description |
---|---|---|
instantTime | string | the begin time of the action |
completionTime | string | the completion time of the action |
action | string | the action type |
metadata | bytes | json string representing the completed metadata of the action |
plan | bytes | Optional, avro serialized plan for the action, same as its requested/plan metadata |
Table Types
Hudi storage format supports two table types offering different trade-offs between write and query performance (see appendix for more details) and records are stored differently based on the chosen table type.
Table Type | Description |
---|---|
Copy-on-Write (CoW) | Data is stored entirely in base files, optimized for read performance and ideal for slow changing datasets. Any updates, inserts, deletes accordingly produce new base files for each write operation. Change data is still stored as log files associated with the base files. |
Merge-on-Read (MoR) | Data is stored in a combination of base and log files, optimized to balance the write and read performance and ideal for frequently changing datasets |
Readers need to then satisfy different query types on these tables.
- Snapshot queries - query the table for latest committed state of each record.
- Time-Travel queries - snapshot query performed as of a point in timeline or equivalent on an older snapshot of the table.
- Incremental queries - obtain the latest merged state of all records that have been updated/inserted between two points in the timeline.
- Change Data Capture - obtain a change log of all modifications (updates, inserts, deletes) with before/after images for each record, between two points in the timeline.
Data Files
Data Files have naming conventions that allows to easily track history of files with respect to the timeline as well as serving practical operational needs. For e.g it's quite possible to recover the table to a known good state, when operational accidents cause the timeline and other metadata are deleted.
Base Files
Base files are standard well-known file formats like Apache Parquet, Apache Orc and Apache HBase's HFile, named as [file_id]_[write_token]_[begin_time].[extension].
where,
- file_id - Id of the file group that the base file belong to.
- write_token - Monotonically increasing token for every attempt to write the base file within a given transaction. This should help uniquely identifying the base file when there are failures and retries. Cleaning can remove partial/uncommitted base files by comparing with the successful write token.
- requested_time - Time when this action was requested on the timeline.
- extension - base file extension matching the file format such as .parquet, .orc.
Note that a single file group can contain base files with different extensions.
Log Files
The log files contain different type of blocks, that encode delta updates, deletes or change logs against the base file. They are named with the convention
.[file_id]_[requested_instant_time].log.[version]_[write_token].
- file_id - File Id of the base file in the slice
- requested_instant_time - Time at which the write operation that produced this log file was requested.
- version - Current version of the log file format, to order deltas against the base file.
- write_token - Monotonically increasing token for every attempt to write the log file. This should help uniquely identifying the log file when there are failures and retries. Cleaner can clean-up partial log files if the write token is not the latest in the file slice.
Log Format
The Log file format structure is a Hudi native format. The actual content bytes are serialized into one of Apache Avro, Apache Parquet or Apache HFile file formats based on configuration and the other metadata in the block is serialized using primitive types and byte arrays.
Hudi Log format specification is as follows.
Section | #Bytes | Description |
---|---|---|
magic | 6 | 6 Characters '#HUDI#' stored as a byte array. Sanity check for block corruption to assert start 6 bytes matches the magic byte[]. |
LogBlock length | 8 | Length of the block excluding the magic. |
version | 4 | Version of the Log file format, monotonically increasing to support backwards compatibility |
type | 4 | Represents the type of the log block. Id of the type is serialized as an Integer. |
header length | 8 | Length of the header section to follow |
header | variable | Custom serialized map of header metadata entries. 4 bytes of map size that denotes number of entries, then for each entry 4 bytes of metadata type, followed by length/bytearray of variable length utf-8 string. |
content length | 8 | Length of the actual content serialized |
content | variable | The content contains the serialized records in one of the supported file formats (Apache Avro, Apache Parquet or Apache HFile) |
footer length | 8 | Length of the footer section to follow |
footer | variable | Similar to Header. Map of footer metadata entries. |
total block length | 8 | Total size of the block including the magic bytes. This is used to determine if a block is corrupt by comparing to the block size in the header. Each log block assumes that the block size will be last data written in a block. Any data if written after is just ignored. |