Diving Into Delta Lake: Schema Enforcement & Evolution

Delta Lake uses schema validation on write, which means that all new writes to a table are checked for compatibility with the target table’s schema at write time.

If the schema is not compatible, Delta Lake cancels the transaction altogether (no data is written), and raises an exception to let the user know about the mismatch.

To determine whether a write to a table is compatible, Delta Lake uses the following rules.

The DataFrame to be written: Can not contain any additional columns that are not present in the target table’s schema.

Conversely, it’s OK if the incoming data doesn’t contain every column in the table – those columns will simply be assigned null values.

Cannot have column data types that differ from the column data types in the target table.

If a target table’s column contains StringType data, but the corresponding column in the DataFrame contains IntegerType data, schema enforcement will raise an exception and prevent the write operation from taking place.

Can not contain column names that differ only by case.

This means that you cannot have columns such as ‘Foo’ and  ‘foo’ defined in the same table.

While Spark can be used in case sensitive or insensitive (default) mode, Delta is case-preserving but insensitive when storing the schema.

Parquet is case sensitive when storing and returning column information.

To avoid potential mistakes, data corruption or loss issues (which we’ve personally experienced at Databricks), we decided to add this restriction.

To illustrate, take a look at what happens in the code below when we attempt to append some newly calculated columns to a Delta Lake table that isn’t yet set up to accept them.

# Generate a DataFrame of loans that well append to our Delta Lake table loans = sql(“”” SELECT addr_state, CAST(rand(10)*count as bigint) AS count, CAST(rand(10) * 10000 * count AS double) AS amount FROM loan_by_state_delta “””) # Show original DataFrames schema original_loans.

printSchema() “”” root |– addr_state: string (nullable = true) |– count: integer (nullable = true) “”” # Show new DataFrames schema loans.

printSchema() “”” root |– addr_state: string (nullable = true) |– count: integer (nullable = true) |– amount: double (nullable = true) # new column “”” # Attempt to append new DataFrame (with new column) to existing table loans.

write.

format(“delta”) .

mode(“append”) .

save(DELTALAKE_PATH) “”” Returns: A schema mismatch detected when writing to the Delta table.

To enable schema migration, please set: .

option(“mergeSchema”, “true”) Table schema: root — addr_state: string (nullable = true) — count: long (nullable = true) Data schema: root — addr_state: string (nullable = true) — count: long (nullable = true) — amount: double (nullable = true) If Table ACLs are enabled, these options will be ignored.

Please use the ALTER TABLE command for changing the schema.

“”” Rather than automatically adding the new columns, Delta Lake enforces the schema and stops the write from occurring.

To help identify which column(s) caused the mismatch, Spark prints out both schemas in the stack trace for comparison.

How Is Schema Enforcement Useful?.Because it’s such a stringent check, schema enforcement is an excellent tool to use as a gatekeeper of a clean, fully transformed data set that is ready for production or consumption.

It’s typically enforced on tables that directly feed: Machine learning algorithms BI dashboards Data analytics and visualization tools Any production system requiring highly structured, strongly typed, semantic schemas In order to prepare their data for this final hurdle, many users employ a simple “multi-hop” architecture that progressively adds structure to their tables.

To learn more, take a look at the post entitled Productionizing Machine Learning With Delta Lake.

Of course, schema enforcement can be used anywhere in your pipeline, but be aware that it can be a bit frustrating to have your streaming write to a table fail because you forgot that you added a single column to the incoming data, for example.

Preventing Data Dilution At this point, you might be asking yourself, what’s all the fuss about?.After all, sometimes an unexpected “schema mismatch” error can trip you up in your workflow, especially if you’re new to Delta Lake.

Why not just let the schema change however it needs to so that I can write my DataFrame no matter what?.As the old saying goes, “an ounce of prevention is worth a pound of cure.

” At some point, if you don’t enforce your schema, issues with data type compatibility will rear their ugly heads – seemingly homogenous sources of raw data can contain edge cases, corrupted columns, misformed mappings, or other scary things that go bump in the night.

A much better approach is to stop these enemies at the gates – using schema enforcement – and deal with them in the daylight rather than later on, when they’ll be lurking in the shadowy recesses of your production code.

Schema enforcement provides peace of mind that your table’s schema will not change unless you make the affirmative choice to change it.

It prevents data “dilution,” which can occur when new columns are appended so frequently that formerly rich, concise tables lose their meaning and usefulness due to the data deluge.

By encouraging you to be intentional, set high standards, and expect high quality, schema enforcement is doing exactly what it was designed to do – keeping you honest, and your tables clean.

If, upon further review, you decide that you really did mean to add that new column, it’s an easy, one line fix, as discussed below.

The solution is schema evolution!.What Is Schema Evolution?.Schema evolution is a feature that allows users to easily change a table’s current schema to accommodate data that is changing over time.

Most commonly, it’s used when performing an append or overwrite operation, to automatically adapt the schema to include one or more new columns.

How Does Schema Evolution Work?.Following up on the example from the previous section, we can easily use schema evolution to add the new columns that were previously rejected due to a schema mismatch.

Schema evolution is activated by adding  .

option(mergeSchema, true) to your .

write or .

writeStream Spark command.

# Add the mergeSchema option loans.

write.

format(“delta”) .

option(“mergeSchema”, “true”) .

mode(“append”) .

save(DELTALAKE_SILVER_PATH) To view the plot, execute the following Spark SQL statement.

# Create a plot with the new column to confirm the write was successful %sql SELECT addr_state, sum(`amount`) AS amount FROM loan_by_state_delta GROUP BY addr_state ORDER BY sum(`amount`) DESC LIMIT 10 Alternatively, you can set this option for the entire Spark session by adding spark.

databricks.

delta.

schema.

autoMerge = True to your Spark configuration.

Use with caution, as schema enforcement will no longer warn you about unintended schema mismatches.

By including the mergeSchema option in your query, any columns that are present in the DataFrame but not in the target table are automatically added on to the end of the schema as part of a write transaction.

Nested fields can also be added, and these fields will get added to the end of their respective struct columns as well.

Data engineers and scientists can use this option to add new columns (perhaps a newly tracked metric, or a column of this month’s sales figures) to their existing machine learning production tables without breaking existing models that rely on the old columns.

The following types of schema changes are eligible for schema evolution during table appends or overwrites: Adding new columns (this is the most common scenario) Changing of data types from NullType -> any other type, or upcasts from ByteType -> ShortType -> IntegerType Other changes, which are not eligible for schema evolution, require that the schema and data are overwritten by adding .

option("mergeSchema", "true").

Those changes include: Dropping a column Changing an existing column’s data type (in place) Renaming column names that differ only by case (e.

g.

“Foo” and “foo”) Finally, with the upcoming release of Spark 3.

0, explicit DDL (using ALTER TABLE) will be fully supported, allowing users to perform the following actions on table schemas: Adding  columns Changing column comments Setting table properties that define the behavior of the table, such as setting the retention duration of the transaction log How is Schema Evolution Useful?.Schema evolution can be used anytime you intend   to change the schema of your table (as opposed to where you accidentally added columns to your DataFrame that shouldn’t be there).

It’s the easiest way to migrate your schema because it automatically adds the correct column names and data types, without having to declare them explicitly.

Summary Schema enforcement rejects any new columns or other schema changes that aren’t compatible with your table.

By setting and upholding these high standards, analysts and engineers can trust that their data has the highest levels of integrity, and reason about it with clarity, allowing them to make better business decisions.

On the flip side of the coin, schema evolution complements enforcement by making it easy for intended schema changes to take place automatically.

After all, it shouldn’t be hard to add a column.

Schema enforcement is the yin to schema evolution’s yang.

When used together, these features make it easier than ever to block out the noise, and tune in to the signal.

  We’d also like to thank Mukul Murthy and Pranav Anand for their contributions to this blog.

Related Articles Productionizing Machine Learning With Delta Lake Other articles in this series: Diving Into Delta Lake: Unpacking the Transaction Log Try Databricks for free.

Get started today.

. More details

Leave a Reply