Open table formats are emerging in the rapidly evolving domain of big data management, fundamentally altering the landscape of data storage and analysis. These formats, exemplified by Apache Iceberg, Apache Hudi, and Delta Lake, addresses persistent challenges in traditional data lake structures by offering an advanced combination of flexibility, performance, and governance capabilities. By providing a standardized framework for data representation, open table formats break down data silos, enhance data quality, and accelerate analytics at scale.
As organizations grapple with exponential data growth and increasingly complex analytical requirements, these formats are transitioning from optional enhancements to essential components of competitive data strategies. Their ability to resolve critical issues such as data consistency, query efficiency, and governance renders them indispensable for data- driven organizations. The adoption of open table formats is a crucial consideration for organizations looking to optimize their data management practices and extract maximum value from their data.
In earlier posts, we discussed AWS Glue 5.0 for Apache Spark. In this post, we highlight notable updates on Iceberg, Hudi, and Delta Lake in AWS Glue 5.0.
Apache Iceberg highlights
AWS Glue 5.0 supports Iceberg 1.6.1. We highlight its notable updates in this section. For more details, refer to Iceberg Release 1.6.1.
Branching
Branches are independent lineage of snapshot history that point to the head of each lineage. These are useful for flexible data lifecycle management. An Iceberg table’s metadata stores a history of snapshots, which are updated with each transaction. Iceberg implements features such as table versioning and concurrency control through the lineage of these snapshots. To expand an Iceberg table’s lifecycle management, you can define branches that stem from other branches. Each branch has an independent snapshot lifecycle, allowing separate referencing and updating.
When an Iceberg table is created, it has only a main branch, which is created implicitly. All transactions are initially written to this branch. You can create additional branches, such as an audit branch, and configure engines to write to them. Changes on one branch can be fast-forwarded to another branch using Spark’s fast_forward procedure.
The following diagram illustrates this setup.
To create a new branch, use the following query:
After creating a branch, you can run queries on the data in the branch by specifying branch_<branch_name>
. To write data to a specific branch, use the following query:
To query a specific branch, use the following query:
You can run the fast_forward
procedure to publish the sample table data from the audit branch into the main branch using the following query:
Tagging
Tags are logical pointers to specific snapshot IDs, useful for managing important historical snapshots for business purposes. In Iceberg tables, new snapshots are created for each transaction, and you can query historical snapshots using time travel queries by specifying either a snapshot ID or timestamp. However, because snapshots are created for every transaction, it can be challenging to distinguish the important ones. Tags help address this by allowing you to point to specific snapshots with arbitrary names.
For example, you can set event
tag for snapshot 2 with the following code:
You can query to the tagged snapshot by using the following code:
Lifecycle management with branching and tagging
Branching and tagging are useful for flexible table maintenance with the independent snapshot lifecycle management configuration. When data changes in an Iceberg table, each modification is preserved as a new snapshot. Over time, this creates multiple data files and metadata files as changes accumulate. Although these files are essential for Iceberg features like time travel queries, maintaining too many snapshots can increase storage costs. Additionally, they can impact query performance due to the overhead of handling large amounts of metadata. Therefore, organizations should plan regular deletion for snapshots no longer needed.
The AWS Glue Data Catalog addresses these challenges through its managed storage optimization feature. Its optimization job automatically deletes snapshots based on two configurable parameters: the number of snapshots to retain and the maximum days to keep snapshots. Importantly, you can set independent lifecycle policies for both branches and tagged snapshots.
For branches, you can control the maximum days to keep the snapshot and the minimum number of snapshots that must be retained, even if they’re older than the maximum age limit. This setting is independent for each branch.
For example, to keep snapshots 7 days and keep at least 10 snapshots, run the following query:
Tags act as permanent references to specific snapshots of your data. Without setting an expiration time, tagged snapshots persist indefinitely and prevent optimization jobs from cleaning up the associated data files. You can set a time limit for how long to keep a reference when you create it.
For example, to keep snapshots tagged with event
for 360 days, run the following query:
This combination of branching and tagging capabilities enables flexible snapshot lifecycle management that can accommodate various business requirements and use cases. For more information about the Data Catalog’s automated storage optimization feature, refer to The AWS Glue Data Catalog now supports storage optimization of Apache Iceberg tables.
Change log view
The create_changelog_view Spark procedure helps track table modifications by generating a comprehensive change history view. It captures all data alterations, from insert to updates and deletions. This makes it simple to analyze how your data has evolved and audit changes over time.
The change log view created by the create_changelog_view
procedure contains all the information about changes, including the modified record content, type of operation performed, order of changes, and the snapshot ID where the change was committed. In addition, it can show the original and modified versions of records by passing designated key columns. These selected columns typically serve as distinct identifiers or primary keys that uniquely identify each record. See the following code:
By running the procedure, the change log view test_changes
is created. When you query the change log view using SELECT * FROM test_changes
, you can obtain the following output, which includes the history of record changes in the Iceberg table.
The create_changelog_view
procedure helps you monitor and understand data changes. This feature proves valuable for many use cases, including change data capture (CDC), monitoring audit records, and live analysis.
Storage partitioned join
Storage partitioned join is a join optimization technique provided by Iceberg, which enhances both read and write performance. This feature uses existing storage layout to eliminate expensive data shuffles, and significantly improves query performance when joining large datasets that share compatible partitioning schemes. It operates by taking advantage of the physical organization of data on disk. When both datasets are partitioned using a compatible layout, Spark can perform join operations locally by directly reading matching partitions, completely avoiding the need for data shuffling.
To enable and optimize storage partitioned joins, you need to set the following Spark config properties through SparkConf
or an AWS Glue job parameter. The following code lists the properties for the Spark config:
To use an AWS Glue job parameter, set the following:
- Key:
--conf
- Value:
spark.sql.sources.v2.bucketing.enabled=true --conf
spark.sql.sources.v2.bucketing.pushPartValues.enabled=true --conf
spark.sql.requireAllClusterKeysForCoPartition=false --conf
spark.sql.adaptive.enabled=false --conf
spark.sql.adaptive.autoBroadcastJoinThreshold=-1 --conf
spark.sql.iceberg.planning.preserve-data-grouping=true
The following examples compare sample physical plans obtained by the EXPLAIN
query, with and without storage partitioned join. In these plans, both tables product_review
and customer
have the same bucketed partition keys, such as review_year
and product_id
. When storage partitioned join is enabled, Spark joins the two tables without a shuffle operation.
The following is a physical plan without storage partitioned join:
The following is a physical plan with storage partitioned join:
In this physical plan, we don’t see the Exchange
operation that is present in physical plan without storage partitioned join. This indicates that no shuffle operation will be performed.
Delta Lake highlights
AWS Glue 5.0 supports Delta Lake 3.2.1. We highlight its notable updates in this section. For more details, refer to Delta Lake Release 3.2.1.
Deletion vectors
Deletion vectors are a feature in Delta Lake that implements a merge-on-read (MoR) paradigm, providing an alternative to the traditional copy-on-write (CoW) approach. This feature fundamentally changes how DELETE, UPDATE, and MERGE operations are processed in Delta Lake tables. In the CoW paradigm, modifying even a single row requires rewriting entire Parquet files. With deletion vectors, changes are recorded as soft deletes, allowing the original data files to remain untouched while maintaining logical consistency. This approach results in improved write performance.
When deletion vectors are enabled, changes are recorded as soft deletes in a compressed bitmap format during write operations. During read operations, these changes are merged with the base data. Additionally, changes recorded by deletion vectors can be physically applied by rewriting files to purge soft deleted data using the REORG
command.
To enable deletion vectors, set the table parameter to delta.enableDeletionVectors="true"
.
When deletion vector is enabled, you can confirm the deletion vector file is created. The file is highlighted in the following screenshot.
MoR with deletion vectors is especially useful in scenarios requiring efficient write operations to tables with frequent updates and data scattered across multiple files. However, you should consider the read overhead required to merge these files. For more information, refer to What are deletion vectors?
Optimized writes
Delta Lake’s optimized writes feature addresses the small file problem, a common performance challenge in data lakes. This issue typically occurs when numerous small files are created through distributed operations. When reading data, processing many small files creates substantial overhead due to extensive metadata management and file handling.
The optimized writes feature solves this by combining multiple small writes into larger, more efficient files before they are written to disk. The process redistributes data across executors before writing and colocates similar data within the same partition. You can control the target file size using the spark.databricks.delta.optimizeWrite.binSize
parameter, which defaults to 512 MB. With optimized writes enabled, the traditional approach of using coalesce(n)
or repartition(n)
to control output file counts becomes unnecessary, because file size optimization is handled automatically.
To enable deletion vectors, set the table parameter to delta.autoOptimize.optimizeWrite="true"
.
The optimized writes feature isn’t enabled by default, and you should be aware of potentially higher write latency due to data shuffling before files are written to the table. In some cases, combining this with auto compaction can effectively address small file issues. For more information, refer to Optimizations.
UniForm
Delta Lake Universal Format (UniForm) introduces an approach to data lake interoperability by enabling seamless access to Delta Lake tables through Iceberg and Hudi. Although these formats differ primarily in their metadata layer, Delta Lake UniForm bridges this gap by automatically generating compatible metadata for each format alongside Delta Lake, all referencing a single copy of the data. When you write to a Delta Lake table with UniForm enabled, UniForm automatically and asynchronously generates metadata for other formats.
Delta UniForm allows organizations to use the most suitable tool for each data workload while operating on a single delta lake-based data source. UniForm is read-only from an Iceberg and Hudi perspective, and some features of each format are not available. For more details about limitations, refer to Limitations. To learn more about how to use UniForm on AWS, visit Expand data access through Apache Iceberg using Delta Lake UniForm on AWS.
Apache Hudi highlights
AWS Glue 5.0 supports Hudi 0.15.0. We highlight its notable updates in this section. For more details, refer to Hudi Release 0.15.0.
Record Level Index
Hudi provides indexing mechanisms to map record keys to their corresponding file locations, enabling efficient data operations. To use these indexes, you first need to enable the metadata table using MoR by setting hoodie.metadata.enable=true
in your table parameters. Hudi’s multi-modal indexing feature allows it to store various types of indexes. These indexes give you the flexibility to add different index types as your needs evolve.
Record Level Index enhances both write and read operations by maintaining precise mappings between record keys and their corresponding file locations. This mapping enables quick determination of record locations, reducing the number of files that need to be scanned during data retrieval.
During the write workflow, when new records arrive, Record Level Index tags each record with location information if it exists in any file group. This tagging process realizes efficient update operations by directly reducing write latency. For the read workflow, Record Level Index eliminates the need to scan through all files by enabling writers to quickly locate files containing specific data. By tracking which files contain which records, Record Level Index accelerates queries, particularly when performing exact matches on record key columns.
To enable Record Level Index, set the following table parameters:
When Record Level Index is enabled, the record_index
partition is created on the metadata table storing indexes, as shown in the following screenshot.
For more information, refer to Record Level Index: Hudi’s blazing fast indexing for large-scale datasets on Hudi’s blog.
Auto generated keys
Traditionally, Hudi required explicit configuration of primary keys for every table. Users needed to specify the record key field using the hoodie.datasource.write.recordkey.field
configuration. This requirement sometimes posed challenges for datasets lacking natural unique identifiers, such as in log ingestion scenarios.
With auto generated primary keys, Hudi now offers the flexibility to create tables without explicitly configuring primary keys. When you omit the hoodie.datasource.write.recordkey.field
configuration, Hudi automatically generates efficient primary keys that optimize compute, storage, and read operations while maintaining uniqueness requirements. For more details, refer to Key Generation.
CDC queries
In some use cases like streaming ingestion, it’s important to track all changes for the records that belong to a single commit. Although Hudi has provided the incremental query that enables you to obtain a set of records that changed between a start and end commit time, it doesn’t contain before and after images of records. Instead, a CDC query in Hudi allows you to capture and process all mutating operations, including inserts, updates, and deletes, making it possible to track the complete evolution of data over time.
To enable CDC queries, set the table parameter to hoodie.table.cdc.enabled = 'true'
.
To perform a CDC query, set the following query option:
The following screenshot shows a sample output from a CDC query. In the op column, we can see which operation was performed on each record. The output also displays the before and after images of the modified records.
This feature is currently available for CoW tables; MoR tables are not yet supported at the time of writing. For more information, refer to Change Data Capture Query.
Conclusion
In this post, we discussed the key upgrades on Iceberg, Delta Lake, and Hudi in AWS Glue 5.0. You can take advantage of the new version right away by creating new jobs and transferring your current ones to use the enhanced features.
About the Authors
Sotaro Hikita is an Analytics Solutions Architect. He supports customers across a wide range of industries in building and operating analytics platforms more effectively. He is particularly passionate about big data technologies and open source software.
Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.