Hudi
Hudi with Spark
Apache Hudi (pronounced “hoodie”) is the next-generation streaming data lake platform. Apache Hudi brings core warehouse and database functionality directly to a data lake. Hudi provides tables, transactions, efficient upserts/deletes, advanced indexes, streaming ingestion services, data clustering/compaction optimizations, and concurrency all while keeping your data in open-source file formats.
HUDI Spark3 Support Matrix
Hudi | Supported Spark 3 Version |
---|---|
0.15.x | 3.5.x (default build), 3.4.x, 3.3.x, 3.2.x, 3.1.x, 3.0.x |
0.14.x | 3.4.x (default build), 3.3.x, 3.2.x, 3.1.x, 3.0.x |
0.13.x | 3.3.x (default build), 3.2.x, 3.1.x |
0.12.x | 3.3.x (default build), 3.2.x, 3.1.x |
0.11.x | 3.2.x (default build, Spark bundle only), 3.1.x |
0.10.x | 3.1.x (default build), 3.0.x |
0.7.0 - 0.9.0 | 3.0.x |
0.6.0 and prior | not supported |
Spark Shell
This command launches the Spark shell with specific configurations set to use Kryo for serialization and to integrate Apache Hudi as the catalog in Spark SQL, allowing for Hudi's features and tables to be accessed directly within Spark SQL queries.
bin/spark-shell --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
Import Statements
This code block imports necessary libraries for Spark and Hudi operations, and it initializes variables for the table name and base path to be used in subsequent Hudi data operations.
// spark-shell
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._
val tableName = "hudi_table"
val basePath = "hdfs:///warehouse/tablespace/external/hive/hudi_table"
Create Table, Insert Data, and Query Data
This code block demonstrates how to create a Hudi table, insert data into it, and then query that data using Spark SQL, showcasing a complete cycle of table creation and data manipulation.
spark.sql("CREATE TABLE if not exists hudi_table ( ts BIGINT, uuid STRING, rider STRING, driver STRING, fare DOUBLE, city STRING ) USING HUDI PARTITIONED BY (city)")
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo" ),
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.createOrReplaceTempView("trips_table")
spark.sql("SELECT uuid, fare, ts, rider, driver, city FROM trips_table WHERE fare > 20.0").show()
spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM trips_table").show()
Update Data and Read Data
This code block reads data from a Hudi table, modifies the 'fare' column for a specific rider, and updates the table with the new information.
// Lets read data from target Hudi table, modify fare column for rider-D and update it.
val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-D").withColumn("fare", col("fare") * 10)
updatesDf.write.format("hudi").
option(OPERATION_OPT_KEY, "upsert").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM hudi_table").show()
Merging Data and Read Data
This code block demonstrates how to merge data from a source Hudi table into a target Hudi table, illustrating the integration of datasets within the Hudi framework.
-- source table using Hudi for testing merging into target Hudi table
spark.sql("CREATE TABLE fare_adjustment (ts BIGINT, uuid STRING, rider STRING, driver STRING, fare DOUBLE, city STRING) USING HUDI")
spark.sql("INSERT INTO fare_adjustment VALUES (1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',-2.70 ,'san_francisco'),(1695530237068,'3f3d9565-7261-40e6-9b39-b8aa784f95e2','rider-K','driver-U',64.20 ,'san_francisco'),(1695241330902,'ea4c36ff-2069-4148-9927-ef8c1a5abd24','rider-H','driver-R',66.60 ,'sao_paulo' ), (1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',1.85,'chennai'))"
spark.sql("MERGE INTO hudi_table AS target USING fare_adjustment AS source ON target.uuid = source.uuid WHEN MATCHED THEN UPDATE SET target.fare = target.fare + source.fare WHEN NOT MATCHED THEN INSERT * ")
Delete Data
This code block loads data from a Hudi table, filters out records corresponding to a specific rider, and prepares these records for deletion from the table.
val deletesDF = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-F")
deletesDF.write.format("hudi").
option(OPERATION_OPT_KEY, "delete").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
Time Travel Query
Time travel queries in Hudi allow you to view and query data as it appeared at specific points in time, using different timestamp formats to access historical data snapshots.
spark.read.format("hudi").
option("as.of.instant", "20210728141108100").
load(basePath)
spark.read.format("hudi").
option("as.of.instant", "2021-07-28 14:11:08.200").
load(basePath)
// It is equal to "as.of.instant = 2021-07-28 00:00:00"
spark.read.format("hudi").
option("as.of.instant", "2021-07-28").
load(basePath)
// Same example in Spark SQL
-- time travel based on commit time, for eg: `20220307091628793`
SELECT * FROM hudi_table TIMESTAMP AS OF '20220307091628793' WHERE id = 1;
-- time travel based on different timestamp formats
SELECT * FROM hudi_table TIMESTAMP AS OF '2022-03-07 09:16:28.100' WHERE id = 1;
SELECT * FROM hudi_table TIMESTAMP AS OF '2022-03-08' WHERE id = 1;
Capture the Data Change Query
Hudi also exposes first-class support for Change Data Capture (CDC) queries. CDC queries are useful for applications that need to obtain all the changes, along with before or after images of records, given a commit time range.
// spark-shell
// Lets first insert data to a new table with cdc enabled.
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695158649187L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091544288L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-B","driver-L",27.70 ,"san_paulo"),
(1695046452379L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-C","driver-M",33.90 ,"san_francisco"),
(1695332056404L,"1dced545-862b-4ceb-8b43-d2a568f6616b","rider-D","driver-N",93.50,"chennai"));
var df = spark.createDataFrame(data).toDF(columns:_*)
// Insert data
df.write.format("hudi").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(CDC_ENABLED.key(), "true").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
// Update fare for riders: rider-A and rider-B
val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-A" || $"rider" === "rider-B").withColumn("fare", col("fare") * 10)
updatesDf.write.format("hudi").
option(OPERATION_OPT_KEY, "upsert").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(CDC_ENABLED.key(), "true").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
// Query CDC data
spark.read.option(BEGIN_INSTANTTIME.key(), 0).
option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(INCREMENTAL_FORMAT.key(), "cdc").
format("hudi").load(basePath).show(false)
The CDC queries are currently only supported on Copy-on-Write tables.
For more details, see Apache Quick Start Guide.
Hudi with Hive
Hudi | Supported Hive Version |
---|---|
0.14.x | Hive 4.x.x |
Hive Shell
- Update the Hive Environment Configuration:
- Edit the
hive-env.sh
file to include the Hudi JARs in theHIVE_AUX_JARS_PATH
:
- Edit the
export HIVE_AUX_JARS_PATH=${HIVE_AUX_JARS_PATH}:/usr/odp/3.3.6.2-1/hive/lib/hudi-hadoop-mr-bundle-0.14.2.jar
export HIVE_AUX_JARS_PATH=${HIVE_AUX_JARS_PATH}:/usr/odp/3.3.6.2-1/hive/lib/hudi-hive-sync-bundle-0.14.2.jar
- Configure the Hive Properties:
- Update the
hive-site.xml
file with the following properties:
- Update the
<property>
<name>hive.input.format</name>
<value>org.apache.hudi.hadoop.HoodieParquetInputFormat</value>
</property>
<property>
<name>hive.output.format</name>
<value>org.apache.hudi.hadoop.HoodieParquetOutputFormat</value>
</property>
<property>
<name>hive.serde</name>
<value>org.apache.hudi.hadoop.hive.HoodieParquetHiveSerDe</value>
</property>
<property>
<name>hive.tez.input.format</name>
<value>org.apache.hadoop.hive.ql.io.HiveInputFormat</value>
</property>
Restart the Hive Services:
- Restart the Hive services to apply the changes.
Create Hudi Tables:
- Open the Hive shell and create Hudi tables as needed.
CREATE EXTERNAL TABLE hudi_cow_table (
id STRING,
name STRING,
age INT,
ts TIMESTAMP
)
PARTITIONED BY (dt STRING)
STORED AS PARQUET
LOCATION 'hdfs:///tmp/hudi/hudi_cow_table'
TBLPROPERTIES (
'type' = 'cow',
'primaryKey' = 'id',
'preCombineField' = 'ts',
'hudi.table.name' = 'hudi_cow_table'
);
INSERT INTO hudi_cow_table
VALUES ('1', 'Alice', 30, '2024-01-01 00:00:00', '2024-01-01');
To perform operations on Hudi tables and use time travel features with Hive, follow these steps:
First, create new Hudi tables with unique names to avoid conflicts with existing tables. Here are the updated CREATE TABLE
statements:
Create Hudi Tables
Create Hudi COW Table
CREATE EXTERNAL TABLE hudi_cow_table_new (
id STRING,
name STRING,
age INT,
ts TIMESTAMP
)
PARTITIONED BY (dt STRING)
STORED AS PARQUET
LOCATION 'hdfs:///tmp/hudi/hudi_cow_table_new'
TBLPROPERTIES (
'type' = 'cow',
'primaryKey' = 'id',
'preCombineField' = 'ts',
'hudi.table.name' = 'hudi_cow_table_new'
);
Insert Data into Hudi COW Table
INSERT INTO hudi_cow_table_new
VALUES ('1', 'Alice', 30, '2024-01-01 00:00:00', '2024-01-01');
INSERT INTO hudi_cow_table_new
VALUES ('1', 'Alice', 31, '2024-01-02 00:00:00', '2024-01-01');
Create Hudi MOR Table
CREATE EXTERNAL TABLE hudi_mor_table_new (
id STRING,
name STRING,
age INT,
ts TIMESTAMP
)
PARTITIONED BY (dt STRING)
STORED AS PARQUET
LOCATION 'hdfs:///tmp/hudi/hudi_mor_table_new'
TBLPROPERTIES (
'type' = 'mor',
'primaryKey' = 'id',
'preCombineField' = 'ts',
'hudi.table.name' = 'hudi_mor_table_new'
);
Insert Data into Hudi MOR Table
INSERT INTO hudi_mor_table_new
VALUES ('1', 'Alice', 30, '2024-06-07 14:00:00', '2024-06-07');
Create Hudi Realtime Table
CREATE EXTERNAL TABLE hudi_test_realtime_new(
`_hoodie_commit_time` STRING,
`_hoodie_record_key` STRING,
`waybillid` STRING,
`quantity` INT,
`createtm` STRING)
PARTITIONED BY (
`sourcezonecode` STRING)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs:///tmp/hudi/hudi_table_realtime_new'
TBLPROPERTIES (
'last_commit_time_sync' = '20190430150335',
'transient_lastDdlTime' = 'CURRENT_TIMESTAMP');
Insert Data into Hudi Realtime Table
INSERT INTO hudi_test_realtime_new
VALUES
('2024-06-07 14:00:00', 'record_key_1', 'waybill_1', 10, '2024-06-07 14:05:00', 'source_zone_1'),
('2024-06-07 14:00:00', 'record_key_2', 'waybill_2', 15, '2024-06-07 14:10:00', 'source_zone_2');
- Perform Time Travel Queries
For time travel queries in Hudi, Hive does not natively support querying historical versions. Instead, you should use Spark for advanced time travel capabilities. However, you can still perform some operations to query the latest data or specific snapshots.
Query Latest Data (Hive)
SELECT * FROM hudi_cow_table_new WHERE dt = '2024-01-01';
SELECT * FROM hudi_mor_table_new WHERE dt = '2024-06-07';
Query Specific Snapshot (Using Spark)
If you need to query historical snapshots, use Spark to read from specific commit times or timestamps:
val df = spark.read.format("hudi")
.option("hoodie.snapshot.timestamp", "2024-01-01T00:00:00.000Z") // Example timestamp
.load("hdfs:///tmp/hudi/hudi_cow_table_new")
df.show()
For more details, see Hive Metastore.