Iceberg
Iceberg with Spark
Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to compute engines, including Spark, Trino, PrestoDB, Flink, Hive, and Impala, using a high-performance table format that works just like an SQL table.
User Experience
Iceberg avoids unpleasant surprises. Schema evolution works and won't inadvertently un-delete data. Users don't need to know about partitioning to get fast queries.
- Schema evolution: Supports adding, dropping, updating, or renaming operations without unintended consequences.
- Hidden partitioning: Prevents user errors that can lead to silently incorrect results or dramatically slow queries.
- Partition layout evolution: Updates the layout of a table as data volume or query patterns change.
- Time travel: Enables reproducible queries that use the same table snapshot or lets users easily examine changes.
- Version Rollback: Allows users to quickly correct problems by resetting tables to a good state.
Version | Lifecycle Stage | Initial Iceberg Support | Latest Iceberg Support | Latest Runtime Jar |
---|---|---|---|---|
2.4 | End of Life | 0.7.0-incubating | 1.2.1 | iceberg-spark-runtime-2.4 |
3.0 | End of Life | 0.9.0 | 1.0.0 | iceberg-spark-runtime-3.0_2.12 |
3.1 | End of Life | 0.12.0 | 1.3.1 | iceberg-spark-runtime-3.1_2.12 [1] |
3.2 | End of Life | 0.13.0 | 1.4.3 | iceberg-spark-runtime-3.2_2.12 |
3.3 | Maintained | 0.14.0 | 1.5.0 | iceberg-spark-runtime-3.3_2.12 |
3.4 | Maintained | 1.3.0 | 1.5.0 | iceberg-spark-runtime-3.4_2.12 |
3.5 | Maintained | 1.4.0 | 1.6.0 | iceberg-spark-runtime-3.5_2.12 |
Spark Shell
Start the Spark shell with necessary configurations for Apache Iceberg integration.
bin/spark-shell --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive \
--conf spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hive_prod.type=hive \
--conf spark.sql.catalog.hive_prod.uri=thrift://<hive_metastore_host>:9083 \
--conf spark.sql.catalog.cdpqa.default-namespace=default
Import Statements
Include the necessary Apache Iceberg and Spark libraries to enable data operations.
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
Create a Table, Insert Data, and Query Data
Example code to create an Iceberg table, insert records, and perform a query to retrieve data.
val schema = StructType( Array(
StructField("vendor_id", LongType,true),
StructField("trip_id", LongType,true),
StructField("trip_distance", FloatType,true),
StructField("fare_amount", DoubleType,true),
StructField("store_and_fwd_flag", StringType,true)
))
val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],schema)
df.writeTo("hive_prod.default.taxis").create()
val schema = spark.table("hive_prod.default.taxis").schema
val data = Seq(
Row(1: Long, 1000371: Long, 1.8f: Float, 15.32: Double, "N": String),
Row(2: Long, 1000372: Long, 2.5f: Float, 22.15: Double, "N": String),
Row(2: Long, 1000373: Long, 0.9f: Float, 9.01: Double, "N": String),
Row(1: Long, 1000374: Long, 8.4f: Float, 42.13: Double, "Y": String)
)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.writeTo("hive_prod.default.taxis").append()
spark.table("hive_prod.default.taxis").show()
Update and Read Data
Demonstrate updating records in an Iceberg table and reading the updated data.
spark.sql("update hive_prod.default.taxis set fare_amount=fare_amount*100 where vendor_id=2")
spark.table("hive_prod.default.taxis").show()
Merge and Read Data
Show how to merge data from one Iceberg table into another and read the merged data.
spark.sql("CREATE TABLE fare_adjustment_iceberg (vendor_id BIGINT, trip_id BIGINT, trip_distance DOUBLE, fare_amount DOUBLE, store_and_fwd_flag STRING) USING iceberg")
spark.sql("INSERT INTO fare_adjustment_iceberg VALUES (2,1000375,400,2.70 ,'Y')")
spark.sql("MERGE INTO hive_prod.default.taxis AS target USING fare_adjustment_iceberg AS source ON target.vendor_id = source.vendor_id WHEN MATCHED THEN UPDATE SET target.fare_amount = target.fare_amount + source.fare_amount WHEN NOT MATCHED THEN INSERT * ")
spark.table("hive_prod.default.taxis").show(false)
Delete Data
Example code for deleting specific records from an Iceberg table.
spark.sql("delete from hive_prod.default.taxis where trip_id=1000373")
Read Historical Snapshots
Retrieve data from historical snapshots of an Iceberg table using specific timestamps.
spark.sql("select * from hive_prod.default.taxis.history").show(false)
Time Travel Query
Query data from different points in time using Iceberg’s time travel capability to access historical table states.
// time travel to snapshot with ID 10963874102873L
spark.read
.option("snapshot-id", 10963874102873L)
.format("iceberg")
.load("hive_prod.default.taxis")
For more details, see Apache Iceberg Getting Started Guide.
Iceberg with Hive
Feature Support | Hive 2 / 3 | Hive 4 |
---|---|---|
[SQL create table](SQL create table) | ✔️ | ✔️ |
[SQL create table as select (CTAS)](SQL create table as select (CTAS)) | ✔️ | ✔️ |
[SQL create table like table (CTLT)](SQL create table like table (CTLT)) | ✔️ | ✔️ |
[SQL drop table](SQL drop table) | ✔️ | ✔️ |
[SQL insert into](SQL insert into) | ✔️ | ✔️ |
[SQL insert overwrite](SQL insert overwrite) | ✔️ | ✔️ |
[SQL delete from](SQL delete from) | ✔️ | |
[SQL update](SQL update) | ✔️ | |
[SQL merge into](SQL merge into) | ✔️ | |
[Branches and tags](Branches and tags) | ✔️ |
Custom Iceberg catalog
To globally register different catalogs, set the following Hadoop configurations:
Here are some examples using the Hive CLI:
Configuration Key | Description |
---|---|
iceberg.catalog.<catalog_name>.type | iceberg.catalog.<catalog_name>.type |
iceberg.catalog.<catalog_name>.catalog-impl | catalog implementation, must not be null if type is empty |
iceberg.catalog.<catalog_name>.<key> | any config key and value pairs for the catalog |
Register a HiveCatalog
:
SET iceberg.catalog.him_catalog.type=hive;
SET iceberg.catalog.him_catalog.uri=thrift://adi2.acceldata.ce:9083;
SET iceberg.catalog.him_catalog.clients=10;
SET iceberg.catalog.him_catalog.warehouse=hdfs://adi1.acceldata.ce:8020/warehouse;
Register a HadoopCatalog
:
SET iceberg.catalog.hadoop.type=hadoop;
SET iceberg.catalog.hadoop.warehouse=hdfs://adi1.acceldata.ce:8020/warehouse;
Feature Support
The following features are supported across Hive for Iceberg tables:
- SQL create table
- SQL create table as select (CTAS)
- SQL create table like table (CTLT)
- SQL drop table
- SQL insert into
- SQL insert overwrite
- SQL delete from
- SQL update
- SQL merge into
To enable Hive support globally for an application, set iceberg.engine.hive.enabled=true
in its Hadoop configuration. For example, setting this in the hive-site.xml
loaded by Spark will enable the storage handler for all tables created by Spark.
Create Table
Non partitioned tables
The Hive CREATE EXTERNAL TABLE
command creates an Iceberg table when you specify the storage handler as follows:
0: jdbc:hive2://adi2.acceldata.ce:2181,adi1.a> CREATE EXTERNAL TABLE x (i int) STORED BY ICEBERG;
No rows affected (0.46 seconds)
You can specify the default file format (Avro, Parquet, ORC) at the time of the table creation. The default is Parquet:
0: jdbc:hive2://adi2.acceldata.ce:2181,adi1.a> CREATE TABLE j (i int) STORED BY ICEBERG STORED AS AVRO;
No rows affected (13.58 seconds)
Partitioned tables
You can create Iceberg partitioned tables using a command familiar to those who create non-Iceberg tables:
0: jdbc:hive2://adi2.acceldata.ce:2181,adi1.a> CREATE TABLE q (i int) PARTITIONED BY (j int) STORED BY ICEBERG;
No rows affected (9.119 seconds)
The resulting table does not create partitions in HMS, but instead, converts partition data into Iceberg identity partitions.
Use the DESCRIBE command to get information about the Iceberg identity partitions:
0: jdbc:hive2://adi2.acceldata.ce:2181,adi1.a> DESCRIBE q;
+------------------------------------+-----------------+--------------------+
| col_name | data_type | comment |
+------------------------------------+-----------------+--------------------+
| i | int | from deserializer |
| j | int | from deserializer |
| | NULL | NULL |
| # Partition Transform Information | NULL | NULL |
| # col_name | transform_type | NULL |
| j | IDENTITY | NULL |
+------------------------------------+-----------------+--------------------+
6 rows selected (28.447 seconds)
Create Table as Select
The CREATE TABLE AS SELECT
operation resembles the native Hive operation with a single important difference. The Iceberg table and the corresponding Hive table are created at the beginning of the query execution. The data is inserted / committed when the query finishes. So for a transient period the table already exists but contains no data.
0: jdbc:hive2://adi2.acceldata.ce:2181,adi1.a> CREATE TABLE target PARTITIONED BY SPEC (year(year_field), identity_field) STORED BY ICEBERG AS
. . . . . . . . . . . . . . . . . . . . . . .> SELECT * FROM source;
No rows affected (26.508 seconds)
Create Table like Table
Hive with Iceberg storage handler attempts to create a new table named target
that has the same schema and properties as an existing table named source.
CREATE TABLE target LIKE source STORED BY ICEBERG;
0: jdbc:hive2://adi2.acceldata.ce:2181,adi1.a> CREATE TABLE target LIKE source STORED BY ICEBERG;
+-----------+------------+--------------------+
| col_name | data_type | comment |
+-----------+------------+--------------------+
| i | int | from deserializer |
+-----------+------------+--------------------+
1 row selected (0.155 seconds)
0: jdbc:hive2://adi2.acceldata.ce:2181,adi1.a> describe target;
+-----------+------------+--------------------+
| col_name | data_type | comment |
+-----------+------------+--------------------+
| i | int | from deserializer |
+-----------+------------+--------------------+
1 row selected (0.107 seconds)
0: jdbc:hive2://adi2.acceldata.ce:2181,adi1.a>
Drop Table
Tables can be dropped using the DROP TABLE
command:
DROP TABLE [IF EXISTS] table_name [PURGE];
Insert Into
Hive supports the standard single-table INSERT INTO operation:
INSERT INTO table_a
VALUES ('a', 1);
INSERT INTO table_a
SELECT...;
The Multi-table insert is also supported, but it will not be atomic. Commits occur one table at a time. Partial changes will be visible during the commit process and failures can leave partial changes committed. Changes within a single table will remain atomic.
Insert-into operations on branches also work similar to the table level select operations. However, the branch must be provided as follows.
-- Branches should be specified as <database_name>.<table_name>.branch_<branch_name>
INSERT INTO default.test.branch_branch1
VALUES ('a', 1);
INSERT INTO default.test.branch_branch1
SELECT...;
Here is an example of inserting into multiple tables at once in Hive SQL:
FROM customers
INSERT INTO target1 SELECT customer_id, first_name
INSERT INTO target2 SELECT last_name, customer_id;
Insert Into Partition
Hive supports partition-level INSERT INTO operation:
INSERT INTO table_a PARTITION (customer_id = 1, first_name = 'sourabh')
VALUES (1,2);
INSERT INTO table_a PARTITION (customer_id = 1, first_name = 'sourabh')
SELECT...;
The partition specification supports only identity-partition columns. Transform columns in partition specification are not supported.
Insert Overwrite
INSERT OVERWRITE can replace data in the table with the result of a query. Overwrites are atomic operations for Iceberg tables. For non partitioned tables the content of the table is always removed. For partitioned tables the partitions that have rows produced by the SELECT query will be replaced.
INSERT OVERWRITE TABLE target SELECT * FROM source;
Insert Overwrite Partition
Hive supports partition-level INSERT OVERWRITE operation:
INSERT OVERWRITE TABLE target PARTITION (customer_id = 1, first_name = 'sourabh') SELECT * FROM source;
The partition specification supports only identity-partition columns. Transform columns in partition specification are not supported.
Delete From
Hive supports DELETE FROM queries to remove data from tables.
Delete queries accept a filter to match rows to delete.
DELETE FROM target WHERE id > 1 AND id < 10;
DELETE FROM target WHERE id IN (SELECT id FROM source);
DELETE FROM target WHERE id IN (SELECT min(customer_id) FROM source);
If the delete filter matches entire partitions of the table, Iceberg will perform a metadata-only delete. If the filter matches individual rows of a table, then Iceberg will rewrite only the affected data files.
Update
Hive supports UPDATE queries which accept a filter to match rows to update.
UPDATE target SET first_name = 'him' WHERE id > 1 AND id < 10;
UPDATE target SET first_name = 'him' WHERE id IN (SELECT id FROM source);
UPDATE target SET first_name = 'him' WHERE id IN (SELECT min(customer_id) FROM source);
For more complex row-level updates based on incoming data, see the section on MERGE INTO.
Merge Into
Hive support for MERGE INTO queries that can express row-level updates.
MERGE INTO updates a table, called the target table, using a set of updates from another query, called the source. The update for a row in the target table is found using the ON clause that is like a join condition.
MERGE INTO target AS t -- a target table
USING source s -- the source updates
ON t.id = s.id -- condition to find updates for target rows
WHEN ... -- updates
Updates to rows in the target table are listed using WHEN MATCHED ... THEN .... Multiple MATCHED clauses can be added with conditions that determine when each match should be applied. The first matching expression is used.
WHEN MATCHED AND s.op = 'delete' THEN DELETE
WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1
Source rows (updates) that do not match can be inserted:
WHEN NOT MATCHED THEN INSERT VALUES (s.a, s.b, s.c)
Only one record in the source data can update any given row of the target table, or else an error will be thrown.
Hive Table Operations using Iceberg
Creating an Iceberg Table
You created a table named iceberg_table
in Hive using the Iceberg storage format. Here is the command used:
CREATE TABLE iceberg_table (
id INT,
name STRING,
ts TIMESTAMP
) STORED BY ICEBERG;
Inserting Data into the Iceberg Table
You inserted data into the iceberg_table
as follows:
INSERT INTO iceberg_table VALUES
(1, 'Alice', '2024-08-08 10:00:00'),
(2, 'Bob', '2024-08-08 11:00:00');
Describing the Iceberg Table
To view the schema of the iceberg_table
, you used the DESCRIBE
command:
DESCRIBE iceberg_table;
Summary of Commands
1. Querying the Table
SELECT * FROM iceberg_table;
This worked successfully, returning the rows as expected.
2. Updating the Table
UPDATE iceberg_table SET name = 'Eve' WHERE id = 1;
The update operation was successful and affected the rows.
3. Deleting from the Table
DELETE FROM iceberg_table WHERE id = 2;
The delete operation was executed, but it seems no rows were affected.
4. Describing the Table Formatted
DESCRIBE FORMATTED iceberg_table;
This command successfully described the table and its metadata.
SHOW TABLE EXTENDED LIKE 'iceberg_table';
5. Showing Extended Table Information
SHOW TABLE EXTENDED LIKE 'iceberg_table';
6. Showing Functions
SHOW FUNCTIONS LIKE 'iceberg%';
Time Travel Queries
Since DESCRIBE HISTORY
and the snapshots
and current_snapshot
tables aren’t available, here are alternative steps to check time travel functionality if supported by your Hive/Iceberg setup:
Since DESCRIBE HISTORY
and the snapshots
and current_snapshot
tables aren’t available, here are alternative steps to check time travel functionality if supported by your Hive/Iceberg setup:
- Create Snapshot: Insert data and create a new snapshot.
INSERT INTO iceberg_table VALUES (3, 'Bob', '2024-08-08 16:00:00');
- Query Specific Snapshot: If you have snapshots, you can specify timestamps or snapshot IDs in your queries to check historical data.
SELECT * FROM iceberg_table FOR SYSTEM_TIME AS OF '2024-08-08 15:00:00';
- Check Table Versions: If available, you can check table versions directly.
SHOW TABLES;
For more details, see Hive - Apache Iceberg.