Efficient Upserts into Data Lakes with Databricks Delta

A paradigm shift in building big data pipelines for change data capture (CDC) and GDPR use cases.

Databricks Delta, the next-generation unified analytics engine built on top of Apache Spark™, now supports the MERGE command, which allows you to efficiently upsert and delete records in your data lakes.

MERGE dramatically simplifies how a number of common data pipelines can be built; all the complicated multi-hop processes that inefficiently rewrote  entire partitions can now be replaced by simple MERGE queries.

This finer-grained update capability creates a paradigm shift on how you build your big data pipelines for various use cases ranging from change data capture to GDPR.

Need for upserts in various use cases There are a number of common use cases where existing data in a data lake needs to be updated or deleted: General Data Protection Regulation (GDPR) compliance: With the introduction of the right to be forgotten (also known as data erasure) in GDPR, organizations must remove a user’s information upon request.

This data erasure includes deleting user information in the data lakes as well.

Change data capture from traditional databases: In a service oriented architecture, typically web and mobile applications are served by microservices built on traditional SQL/NoSQL databases that are optimized for low latency.

One of biggest challenges organizations face is joining data across these various siloed data systems and hence data engineers build pipelines to consolidate all data sources into a central data lake to facilitate analytics.

These pipelines often have to periodically read changes made on a traditional SQL/NoSQL table and apply them to corresponding tables in the data lake.

Such changes can take various forms – table with slowly changing dimensions, change data capture of all inserted/updated/deleted rows, etc.

Sessionization: Grouping multiple events into a single session is a common use case in many areas ranging from product analytics, targeted advertising to predictive maintenance.

Building continuous applications to track sessions and recording the results that write into data lakes is difficult because data lakes have always been optimized for appending data.

Deduplication: A common data pipeline use case is to collect system logs into a Delta table by appending data to the table.

However, often the sources can generate duplicate records and downstream deduplication steps are needed to take care of them.

Challenges of upserts into data lakes Since data lakes are fundamentally based on files, they have always been optimized for appending data than for changing existing data.

Hence, building the above use case has always been challenging.

Users typically read the entire table (or subset of partitions) and then overwrite them.

Therefore, every organization tries to reinvent the wheel for their requirement by hand-writing complicated queries in SQL, Spark, etc.

This approach is: Inefficient: Reading and rewriting entire partitions (or entire tables) to update a few records causes pipelines to be slow and costly.

Hand-tuning the table layout and query is tedious and requires deep domain knowledge.

Possibly incorrect: Hand-written code modifying data is very prone to logical and human errors.

For example, multiple pipelines concurrently modifying the same table without any transactional support can lead to unpredictable data inconsistencies and in the worst case, data losses.

Often, even a single hand-written pipeline can easily cause data corruptions due to errors in encoding the business logic.

Hard to maintain: Fundamentally such hand-written code is hard to understand, keep track off, and maintain.

In the long term, this alone can significantly increase  the organizational and infrastructural costs.

Introducing Merge in Databricks Delta With Databricks Delta, you can easily address the use cases above without any of the aforementioned problems using the following MERGE command: MERGE INTO USING ON [ WHEN MATCHED [ AND ] THEN ] [ WHEN MATCHED [ AND ] THEN ] [ WHEN NOT MATCHED [ AND ] THEN ] where = DELETE | UPDATE SET * | UPDATE SET column1 = value1 [, column2 = value2 .

] = INSERT * | INSERT (column1 [, column2 .

]) VALUES (value1 [, value2 .

]) See our docs (Azure | AWS) for a more detailed explanation of the syntax.

Let’s understand how to use MERGE with a simple example.

Suppose you have a slowly changing dimension table that maintains user information like addresses.

Furthermore, you have a table of new addresses for both existing and new users.

To merge all the new addresses to the main user table, you can run the following: MERGE INTO users USING updates ON users.

userId = updates.

userId WHEN MATCHED THEN UPDATE address = updates.

addresses WHEN NOT MATCHED THEN INSERT (userId, address) VALUES (updates.

userId, updates.

address) This will perform exactly what the syntax says – for existing users (i.

e.

MATCHED clause), it will update the address column, and for new users (i.

e.

NOT MATCHED clause) it will insert all the columns.

For large tables with TBs of data, this Delta MERGE operation can be orders of magnitude faster than overwriting entire partitions or tables since Delta reads only relevant files and updates them.

Specifically, Delta’s MERGE has the following advantages: Fine-grained: The operation rewrites data at the granularity of files and not partitions.

This eliminates all the complications of rewriting partitions, updating the Hive metastore with MSCK, and so on.

Efficient: Delta’s data skipping makes the MERGE efficient at finding files to rewrite thus eliminating the need to hand optimize your pipeline.

Furthermore, Delta with all its I/O and processing optimizations makes all the reading and writing data by MERGE significantly faster than similar operations in Apache Spark.

Transactional: Delta uses optimistic concurrency control to ensure that concurrent writers update the data correctly with ACID transactions, and concurrent readers always see a consistent snapshot of the data.

Here is a visual explanation of how MERGE compares with hand-written pipelines.

Figure showing the steps involved to merge records into data lake with and without Delta Simplifying use cases with MERGE Deleting data due to GDPR Complying with the right to be forgotten clause of GDPR for data in data lakes cannot get easier.

You can set up a simple scheduled job with an example code like below to delete all the users who have opted out of your service.

MERGE INTO users USING opted_out_users ON opted_out_users.

userId = users.

userId WHEN MATCHED THEN DELETE DELETE FROM users AS u1 WHERE EXISTS (SELECT uid FROM opted_out_users WHERE u1.

uid = uid) Applying change data from databases You can easily apply all data changes – updates, deletes, inserts – generated from an external database into a Delta table with the MERGE syntax as follows: MERGE INTO users USING ( SELECT userId, latest.

address AS address, latest.

deleted AS deleted FROM ( SELECT userId, MAX(struct(TIME, address, deleted)) AS latest FROM changes GROUP BY userId ) ) latestChange ON latestChange.

userId = users.

userId WHEN MATCHED AND latestChange.

deleted = TRUE THEN DELETE WHEN MATCHED THEN UPDATE SET address = latestChange.

address WHEN NOT MATCHED AND latestChange.

deleted = FALSE THEN INSERT (userId, address) VALUES (userId, address) If you have streaming event data flowing in and if you want to sessionize the streaming event data and incrementally update and store sessions in a Delta table, you can accomplish using the foreachBatch in Structured Streaming and MERGE.

Here is an example.

Suppose you have a Structured Streaming DataFrame that computes updated session information for each user.

You can start a streaming query that applies all the sessions update to a Delta table as follows: streamingSessionUpdatesDF.

writeStream .

foreachBatch { (microBatchOutputDF: DataFrame, batchId: Long) => microBatchOutputDF.

createOrReplaceTempView(“updates”) microBatchOutputDF.

sparkSession.

sql(s””” MERGE INTO sessions USING updates ON sessions.

sessionId = updates.

sessionId WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * “””) }.

start() For a complete working example of foreachBatch and MERGE, see this notebook (Azure | AWS).

Conclusion The fine-grained update capability in Delta creates a paradigm shift on how you build your big data pipelines.

You no longer need to write complicated logic to overwrite tables and overcome lack of snapshot isolation.

With fine-grained update, your pipelines will also be more efficient since you don’t need to read and overwrite entire tables.

With changing data, another critical capability required is ability to roll back in case of bad writes.

Delta also offers rollback capabilities with the time travel feature, so that if you do a bad merge, you can easily roll back.

Read more (Azure | AWS) about the fine-grained updates feature.

To see the feature in action, sign up for a free trial of Databricks and try it out.

Try Databricks for free.

Get started today.. More details

Leave a Reply