Modern Open Table Format integration with ODP
Apache Hive was the pioneering data lake table format popularized for warehousing within the big data ecosystem, but it had significant limitations due to its design and architecture. Recent advancements in new table formats have empowered data engineers and architects with superior capabilities in compliance and maintenance standards.
These features include enhanced ACID transactions, time traveling, historical snapshot creation, branching and tagging, and Change Data Capture (CDC) procedures that simplify upserts and deletions. With HDFS or object storage, these table formats support full CRUD operations, boasting improved performance and scalability through updated data organization strategies.
The following sections will offer a preliminary guide on using these open table formats with ODP Spark integration, featuring Spark Scala/SQL. This guide will provide a concise overview of the format, its capabilities, and usage, including code snippets for inserting, updating, deleting, and leveraging other available features.
Hudi
Apache Hudi (pronounced “hoodie”) is the next generation streaming data lake platform. Apache Hudi brings core warehouse and database functionality directly to a data lake. Hudi provides tables, transactions, efficient upserts or deletes, advanced indexes, streaming ingestion services, data clustering or compaction optimizations, and concurrency all while keeping your data in open source file formats.
HUDI Spark3 Support Matrix
Hudi | Supported Spark3 Version |
---|---|
0.14.x | 3.4.x (default build), 3.3.x, 3.2.x, 3.1.x, 3.0.x |
0.13.x | 3.3.x (default build), 3.2.x, 3.1.x |
0.12.x | 3.3.x (default build), 3.2.x, 3.1.x |
0.11.x | 3.2.x (default build, Spark bundle only), 3.1.x |
0.10.x | 3.1.x (default build), 3.0.x |
0.7.0 - 0.9.0 | 3.0.x |
0.6.0 and prior | not supported |
Working with Apache Hudi and Spark: Data Operations Guide
Spark Shell
This command launches the Spark shell with specific configurations set to use Kryo for serialization and to integrate Apache Hudi as the catalog in Spark SQL, allowing for Hudi's features and tables to be accessed directly within Spark SQL queries.
bin/spark-shell --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
Import Statements
This code block imports necessary libraries for Spark and Hudi operations, and it initializes variables for the table name and base path to be used in subsequent Hudi data operations.
// spark-shell
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._
val tableName = "hudi_table"
val basePath = "hdfs:///warehouse/tablespace/external/hive/hudi_table"
Create Table, Insert Data, and Query Data
This code block demonstrates how to create a Hudi table, insert data into it, and then query that data using Spark SQL, showcasing a complete cycle of table creation and data manipulation.
spark.sql("CREATE TABLE if not exists hudi_table ( ts BIGINT, uuid STRING, rider STRING, driver STRING, fare DOUBLE, city STRING ) USING HUDI PARTITIONED BY (city)");
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo" ),
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.createOrReplaceTempView("trips_table")
spark.sql("SELECT uuid, fare, ts, rider, driver, city FROM trips_table WHERE fare > 20.0").show()
spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM trips_table").show()
Update Data and Read Data
This code block reads data from a Hudi table, modifies the 'fare' column for a specific rider, and updates the table with the new information.
// Lets read data from target Hudi table, modify fare column for rider-D and update it.
val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-D").withColumn("fare", col("fare") * 10)
updatesDf.write.format("hudi").
option(OPERATION_OPT_KEY, "upsert").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM hudi_table").show()
Merging Data and Read Data
This code block demonstrates how to merge data from a source Hudi table into a target Hudi table, illustrating the integration of datasets within the Hudi framework.
-- source table using Hudi for testing merging into target Hudi table
spark.sql("CREATE TABLE fare_adjustment (ts BIGINT, uuid STRING, rider STRING, driver STRING, fare DOUBLE, city STRING) USING HUDI")
spark.sql("INSERT INTO fare_adjustment VALUES (1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',-2.70 ,'san_francisco'),(1695530237068,'3f3d9565-7261-40e6-9b39-b8aa784f95e2','rider-K','driver-U',64.20 ,'san_francisco'),(1695241330902,'ea4c36ff-2069-4148-9927-ef8c1a5abd24','rider-H','driver-R',66.60 ,'sao_paulo' ), (1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',1.85,'chennai'))"
spark.sql("MERGE INTO hudi_table AS target USING fare_adjustment AS source ON target.uuid = source.uuid WHEN MATCHED THEN UPDATE SET target.fare = target.fare + source.fare WHEN NOT MATCHED THEN INSERT * ")
Deleting Data
This code block loads data from a Hudi table, filters out records corresponding to a specific rider, and prepares these records for deletion from the table.
val deletesDF = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-F")
deletesDF.write.format("hudi").
option(OPERATION_OPT_KEY, "delete").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
Time Travel Query
Time travel queries in Hudi allow you to view and query data as it appeared at specific points in time, using different timestamp formats to access historical data snapshots.
spark.read.format("hudi").
option("as.of.instant", "20210728141108100").
load(basePath)
spark.read.format("hudi").
option("as.of.instant", "2021-07-28 14:11:08.200").
load(basePath)
// It is equal to "as.of.instant = 2021-07-28 00:00:00"
spark.read.format("hudi").
option("as.of.instant", "2021-07-28").
load(basePath)
// Same example in Spark SQL
-- time travel based on commit time, for eg: `20220307091628793`
SELECT * FROM hudi_table TIMESTAMP AS OF '20220307091628793' WHERE id = 1;
-- time travel based on different timestamp formats
SELECT * FROM hudi_table TIMESTAMP AS OF '2022-03-07 09:16:28.100' WHERE id = 1;
SELECT * FROM hudi_table TIMESTAMP AS OF '2022-03-08' WHERE id = 1;
Capturing Data Change Query
Hudi offers comprehensive support for Change Data Capture (CDC) queries, which are essential for applications requiring a detailed record of changes, including before and after snapshots of records, within a specified commit time range.
// spark-shell
// Lets first insert data to a new table with cdc enabled.
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695158649187L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091544288L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-B","driver-L",27.70 ,"san_paulo"),
(1695046452379L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-C","driver-M",33.90 ,"san_francisco"),
(1695332056404L,"1dced545-862b-4ceb-8b43-d2a568f6616b","rider-D","driver-N",93.50,"chennai"));
var df = spark.createDataFrame(data).toDF(columns:_*)
// Insert data
df.write.format("hudi").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(CDC_ENABLED.key(), "true").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
// Update fare for riders: rider-A and rider-B
val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-A" || $"rider" === "rider-B").withColumn("fare", col("fare") * 10)
updatesDf.write.format("hudi").
option(OPERATION_OPT_KEY, "upsert").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(CDC_ENABLED.key(), "true").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
// Query CDC data
spark.read.option(BEGIN_INSTANTTIME.key(), 0).
option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(INCREMENTAL_FORMAT.key(), "cdc").
format("hudi").load(basePath).show(false)
CDC queries are currently only supported on Copy-on-Write tables.
Iceberg
Apache Iceberg is an open table format designed for large-scale analytic datasets. Iceberg integrates with computing engines like Spark, Trino, PrestoDB, Flink, Hive, and Impala, offering a high-performance table format that functions similarly to a SQL table.
User Experience
Iceberg ensures a smooth and predictable user experience. Schema evolution is reliable and does not accidentally restore deleted data. Users can achieve fast queries without needing to understand partitioning.
- Schema evolution: Supports adding, dropping, updating, or renaming operations without unintended consequences.
- Hidden partitioning: Prevents user errors that could lead to silently incorrect results or dramatically slow queries.
- Partition layout evolution: Adapts the table's layout as data volumes or query patterns shift.
- Time travel: Facilitates reproducible queries using the exact same table snapshot and allows easy examination of historical changes.
- Version Rollback: Enables users to quickly resolve issues by reverting tables to a stable state.
Version | Lifecycle Stage | Initial Iceberg Support | Latest Iceberg Support | Latest Runtime Jar |
---|---|---|---|---|
2.4 | End of Life | 0.7.0-incubating | 1.2.1 | iceberg-spark-runtime-2.4 |
3.0 | End of Life | 0.9.0 | 1.0.0 | iceberg-spark-runtime-3.0_2.12 |
3.1 | End of Life | 0.12.0 | 1.3.1 | iceberg-spark-runtime-3.1_2.12 [1] |
3.2 | End of Life | 0.13.0 | 1.4.3 | iceberg-spark-runtime-3.2_2.12 |
3.3 | Maintained | 0.14.0 | 1.5.0 | iceberg-spark-runtime-3.3_2.12 |
3.4 | Maintained | 1.3.0 | 1.5.0 | iceberg-spark-runtime-3.4_2.12 |
3.5 | Maintained | 1.4.0 | 1.5.0 | iceberg-spark-runtime-3.5_2.12 |
Working with Apache Iceberg and Spark: Data Operations Guide
Spark Shell
Start the Spark shell with necessary configurations for Apache Iceberg integration.
bin/spark-shell --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive \
--conf spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hive_prod.type=hive \
--conf spark.sql.catalog.hive_prod.uri=thrift://<hive_metastore_host>:9083 \
--conf spark.sql.catalog.cdpqa.default-namespace=default
Import Statements
Include the necessary Apache Iceberg and Spark libraries to enable data operations.
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
Create Table, Insert Data, and Query Data
Example code to create an Iceberg table, insert records, and perform a query to retrieve data.
val schema = StructType( Array(
StructField("vendor_id", LongType,true),
StructField("trip_id", LongType,true),
StructField("trip_distance", FloatType,true),
StructField("fare_amount", DoubleType,true),
StructField("store_and_fwd_flag", StringType,true)
))
val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],schema)
df.writeTo("hive_prod.default.taxis").create()
val schema = spark.table("hive_prod.default.taxis").schema
val data = Seq(
Row(1: Long, 1000371: Long, 1.8f: Float, 15.32: Double, "N": String),
Row(2: Long, 1000372: Long, 2.5f: Float, 22.15: Double, "N": String),
Row(2: Long, 1000373: Long, 0.9f: Float, 9.01: Double, "N": String),
Row(1: Long, 1000374: Long, 8.4f: Float, 42.13: Double, "Y": String)
)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.writeTo("hive_prod.default.taxis").append()
spark.table("hive_prod.default.taxis").show()
Update and Read Data
Demonstrate updating records in an Iceberg table and reading the updated data.
spark.sql("update hive_prod.default.taxis set fare_amount=fare_amount*100 where vendor_id=2")
spark.table("hive_prod.default.taxis").show()
Merge and Read Data
Show how to merge data from one Iceberg table into another and read the merged data.
spark.sql("CREATE TABLE fare_adjustment_iceberg (vendor_id BIGINT, trip_id BIGINT, trip_distance DOUBLE, fare_amount DOUBLE, store_and_fwd_flag STRING) USING iceberg")
spark.sql("INSERT INTO fare_adjustment_iceberg VALUES (2,1000375,400,2.70 ,'Y')")
spark.sql("MERGE INTO hive_prod.default.taxis AS target USING fare_adjustment_iceberg AS source ON target.vendor_id = source.vendor_id WHEN MATCHED THEN UPDATE SET target.fare_amount = target.fare_amount + source.fare_amount WHEN NOT MATCHED THEN INSERT * ")
spark.table("hive_prod.default.taxis").show(false)
Delete Data
Example code for deleting specific records from an Iceberg table.
spark.sql("delete from hive_prod.default.taxis where trip_id=1000373")
Read Historical Snapshots
Retrieve data from historical snapshots of an Iceberg table using specific timestamps.
spark.sql("select * from hive_prod.default.taxis.history").show(false)
Time Travel Query
Query data from different points in time using Iceberg’s time travel capability to access historical table states.
// time travel to snapshot with ID 10963874102873L
spark.read
.option("snapshot-id", 10963874102873L)
.format("iceberg")
.load("hive_prod.default.taxis") //It is the table name, not path to table
.show(false)
Delta Lake
Delta Lake is an open source project that enables building a Lakehouse architecture on top of data lakes. Delta Lake provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing on top of existing data lakes, such as S3, ADLS, GCS, and HDFS.
Delta Lake offers the following:
- ACID transactions in Spark: Guarantees that all readers access consistent and accurate data through serializable isolation levels.
- Scalable Metadata Management: Utilizes Spark's distributed computing capabilities to efficiently manage extensive metadata for petabyte-scale tables containing billions of files.
- Unified Streaming and Batch Processing: Delta Lake allows tables to function both as batch tables and as streaming sources and sinks, seamlessly integrating streaming data ingest, batch historic backfill, and interactive queries.
- Schema Enforcement: Automatically manages schema variations to prevent the insertion of incorrect records during data ingestion.
- Time Travel: Enables data versioning for rollbacks, comprehensive historical audits, and reproducible machine learning experiments.
- Advanced Data Operations: Supports merges, updates, and deletes to facilitate complex scenarios such as change-data-capture, slowly-changing-dimension (SCD) operations, and streaming upserts.
Delta Lake Version | Apache Spark Version |
---|---|
3.1.x | 3.5.x |
3.0.x | 3.5.x |
2.4.x | 3.4.x |
2.3.x | 3.3.x |
2.2.x | 3.3.x |
2.1.x | 3.3.x |
2.0.x | 3.2.x |
1.2.x | 3.2.x |
1.1.x | 3.2.x |
Working with Delta Lake and Spark: Data Operations Guide
Spark Shell
Initialize the Spark shell with configurations optimized for working with Delta Lake.
bin/spark-shell --master local[*] \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
Import Statements
Import the necessary libraries for Delta Lake to enable data manipulation and querying.
import io.delta.tables._
Create Table, Insert Data, and Read Data
Example code to create a Delta Lake table, insert data into it, and perform queries to retrieve the data.
spark.sql("CREATE TABLE if not exists delta_table ( ts BIGINT, uuid STRING, rider STRING, driver STRING, fare DOUBLE, city STRING ) USING DELTA")
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"),
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"))
var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("delta").saveAsTable("default.delta_tablev2")
//Describe Table
val deltaTable = DeltaTable.forPath(spark, "<hdfs:///<table_path>>")
val detailDF = deltaTable.detail()
spark.table("default.delta_tablev2").show(false)
Update Data and Read Data
Show how to update records in a Delta Lake table.
spark.sql("delete from delta_tablev2 where rider='rider-A'") // Single quote within the external double quote
spark.table("default.delta_tablev2").show(false)
Merge Data and Read Data
Demonstrate merging data from one Delta Lake table into another and reading the resultant data.
spark.sql("CREATE TABLE fare_adjustment_delta (ts BIGINT, uuid STRING, rider STRING, driver STRING, fare DOUBLE, city STRING) USING delta")
spark.sql("INSERT INTO fare_adjustment_delta VALUES (1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',-2.70 ,'san_francisco'),(1695530237068,'3f3d9565-7261-40e6-9b39-b8aa784f95e2','rider-K','driver-U',64.20 ,'san_francisco'),(1695241330902,'ea4c36ff-2069-4148-9927-ef8c1a5abd24','rider-H','driver-R',66.60 ,'sao_paulo' ), (1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',1.85,'chennai')")
spark.sql("MERGE INTO delta_tablev2 AS target USING fare_adjustment_delta AS source ON target.uuid = source.uuid WHEN MATCHED THEN UPDATE SET target.fare = target.fare + source.fare WHEN NOT MATCHED THEN INSERT * ")
spark.table("default.delta_tablev2").show(false)
Delete Data
Example code for deleting specific records from a Delta Lake table, demonstrating data management capabilities.
spark.sql("delete from delta_tablev2 where rider='rider-K'")
spark.table("default.delta_tablev2").show(false)
Time Travel Query
Use Delta Lake's time travel feature to query data from different historical versions of a table.
val history = spark.sql("DESCRIBE HISTORY default.delta_tablev2")
val latest_version = history.selectExpr("max(version)").collect()
spark.sql(s"select * from default.delta_tablev2 version as of ${latest_version(0)(0)}").show(false)