Delta Lake
Delta Lake is an open source project that enables building a Lakehouse architecture on top of data lakes. Delta Lake provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing on top of existing data lakes, such as S3, ADLS, GCS, and HDFS.
Delta Lake offers the following:
- ACID transactions in Spark: The serializable isolation levels ensure that readers never see inconsistent data.
- Scalable Metadata Management: Leverages Spark distributed processing power to handle all the metadata for petabyte-scale tables with billions of files at ease.
- Streaming and Batch Processing: A table in Delta Lake is a batch table as well as a streaming source and sink. Streaming data ingest, batch historic backfill, and interactive queries all just work out of the box.
- Schema Enforcement: Automatically manages schema variations to prevent the insertion of bad records during data ingestion.
- Time Travel: Data versioning enables rollbacks, full historical audit trails, and reproducible machine learning experiments.
- Upserts and deletes: Supports merge, update, and delete operations to enable complex use cases like change-data-capture, slowly-changing-dimension (SCD) operations, streaming upserts, and so on.
Delta Lake Version | Apache Spark Version |
---|---|
3.3.x | 3.5.x |
3.2.x | 3.5.x |
3.1.x | 3.5.x |
3.0.x | 3.5.x |
2.4.x | 3.4.x |
2.3.x | 3.3.x |
2.2.x | 3.3.x |
2.1.x | 3.3.x |
2.0.x | 3.2.x |
1.2.x | 3.2.x |
1.1.x | 3.2.x |
Spark Shell
Initialize the Spark shell with configurations optimized for working with Delta Lake.
bin/spark-shell --master local[*] \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
Import Statements
Import the necessary libraries for Delta Lake to enable data manipulation and querying.
import io.delta.tables._
Create Table, Insert Data, and Read Data
Example code to create a Delta Lake table, insert data into it, and perform queries to retrieve the data.
spark.sql("CREATE TABLE if not exists delta_table ( ts BIGINT, uuid STRING, rider STRING, driver STRING, fare DOUBLE, city STRING ) USING DELTA")
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"),
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"))
var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("delta").saveAsTable("default.delta_tablev2")
//Describe Table
val deltaTable = DeltaTable.forPath(spark, "<hdfs:///<table_path>>")
val detailDF = deltaTable.detail()
spark.table("default.delta_tablev2").show(false)
Update Data and Read Data
Show how to update records in a Delta Lake table.
spark.sql("update default.delta_tablev2 set fare=fare*10 where rider="rider-A"")
spark.table("default.delta_tablev2").show(false)
Merge Data and Read Data
Demonstrate merging data from one Delta Lake table into another and reading the resultant data.
spark.sql("CREATE TABLE fare_adjustment_delta (ts BIGINT, uuid STRING, rider STRING, driver STRING, fare DOUBLE, city STRING) USING delta")
spark.sql("INSERT INTO fare_adjustment_delta VALUES (1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',-2.70 ,'san_francisco'),(1695530237068,'3f3d9565-7261-40e6-9b39-b8aa784f95e2','rider-K','driver-U',64.20 ,'san_francisco'),(1695241330902,'ea4c36ff-2069-4148-9927-ef8c1a5abd24','rider-H','driver-R',66.60 ,'sao_paulo' ), (1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',1.85,'chennai')")
spark.sql("MERGE INTO delta_tablev2 AS target USING fare_adjustment_delta AS source ON target.uuid = source.uuid WHEN MATCHED THEN UPDATE SET target.fare = target.fare + source.fare WHEN NOT MATCHED THEN INSERT * ")
spark.table("default.delta_tablev2").show(false)
Delete Data
Example code for deleting specific records from a Delta Lake table, demonstrating data management capabilities.
spark.sql("delete from delta_tablev2 where rider='rider-K'")
spark.table("default.delta_tablev2").show(false)
Time Travel Query
Use Delta Lake's time travel feature to query data from different historical versions of a table.
val history = spark.sql("DESCRIBE HISTORY default.delta_tablev2")
val latest_version = history.selectExpr("max(version)").collect()
spark.sql(s"select * from default.delta_tablev2 version as of ${latest_version(0)(0)}").show(false)
For more details, see Delta Lake Documentation.