Automated Data Quality Testing at Scale using Apache Spark

Automated Data Quality Testing at Scale using Apache SparkWith An Open Source library from Amazon — DeequTanmay DeshpandeBlockedUnblockFollowFollowingJun 29Photo by Stephen Dawson on UnsplashI have been working as a Technology Architect, mainly responsible for the Data Lake/Hub/Platform kind of projects.

Every day we ingest data from 100+ business systems so that the data can be made available to the analytics and BI teams for their projects.

Problem StatementWhile ingesting data, we avoid any transformations.

The data is replicated as it is from the source.

The sources can be of type MySQL, SQL Server, Oracle, DB2, etc.

The target systems can be Hadoop/Hive or Big Query.

Even though there is no transformation done on the data since the source and target systems are different, sometimes this simple data ingestions could cause data quality issues.

Source and target systems can have different data types which might cause more issues.

Special characters in data might cause row/column shiftings.

Possible SolutionIn order to solve this problem, most of the developers use a manual approach for data quality testing after they built the data pipelines.

This can be done by running some simple tests likeSample data comparison between source and targetNull checks on primary key columnsNull checks on date columnsCount comparison for the categorical columnsetc.

This approach sometimes works well but it is time-consuming and error-prone.

Hence I started looking for some automated option.

Deequ at AmazonMy search for an open source data quality testing framework stopped at Deequ library from Amazon.

Deequ is being used at Amazon for verifying the quality of many large production datasets.

The system keeps on computing data quality metrics on a regular basis.

Source — https://aws.

amazon.

com/blogs/big-data/test-data-quality-at-scale-with-deequ/Deequ is built on top of Apache Spark hence it is naturally scalable for the huge amount of data.

The best part is, you don’t need to know Spark in detail to use this library.

Deequ provides features like —Constraint Suggestions — What to test.

Sometimes it might be difficult to find what to test for in a particular object.

Deequ provides built-in functionality to identify constraints to be tested.

Metrics Computation — Once we know what to test, we can use the suggestion given by the library and run the tests to compute the metrics.

Constraint Verification — Using Deequ, we can also put test cases and get results to be used for the reporting.

Let’s get into actionIn order to run Deequ, we need to first prepare our workstation.

You can try this out on a simple Windows/Linux/Mac machine.

Pre-requisitesInstall Scala — You can download and install Scala from — https://www.

scala-lang.

org/Install Apache Spark — You can download and install Spark from — https://spark.

apache.

org/Download Deequ library — You can download the Deequ JAR as shown below —wget http://repo1.

maven.

org/maven2/com/amazon/deequ/deequ/1.

0.

1/deequ-1.

0.

1.

jarPrepare the data to be tested — If you don’t have any data to be tested, you can prepare one.

For this tutorial, I have a MySQL instance installed and I have loaded some sample data from — http://www.

mysqltutorial.

org/mysql-sample-database.

aspxDownload JDBC Jars — For whichever type of database you want to run these tests, please make sure to add JDBC jars in $SPARK_HOME/jars .

Since I am going to run my tests on MySQL & Hive, I have added respective JDBC jars.

Start Spark in interactive modeIn order to run tests, we will start Spark in interactive mode using the library downloaded in the previous step as shown below —PS D:workDataTesting> spark-shell –conf spark.

jars=deequ-1.

0.

1.

jarSpark context Web UI available at http://localhost:4040Spark context available as 'sc' (master = local[*], app id = local-1561783362821).

Spark session available as 'spark'.

Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .

__/_,_/_/ /_/_ version 2.

4.

3 /_/Using Scala version 2.

11.

12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.

8.

0_171)Type in expressions to have them evaluated.

Type :help for more information.

scala>Constraint SuggestionsI am planning to run tests on customer table loaded in the earlier steps.

You can use MySQL Workbench/CLI to verify the data is loaded properly.

In order to run constraint suggestion, we need to first connect to the DB using Spark.

Please make a note, with this approach, we are doing a query push down to the underlying databases.

So please be careful while running on production systems directly.

import org.

apache.

spark.

sql.

SQLContextval datasource = sqlcontext.

read.

format("jdbc").

option("url", "jdbc:mysql://<IP>:3306/classicmodels").

option("driver", "com.

mysql.

jdbc.

Driver").

option("dbtable", "customers").

option("user", "<username>").

option("password", "<password>").

option("useSSL", "false").

load()On a valid connection, you can check the schema of the table —scala> datasource.

printSchema()root |– customerNumber: integer (nullable = true) |– customerName: string (nullable = true) |– contactLastName: string (nullable = true) |– contactFirstName: string (nullable = true) |– phone: string (nullable = true) |– addressLine1: string (nullable = true) |– addressLine2: string (nullable = true) |– city: string (nullable = true) |– state: string (nullable = true) |– postalCode: string (nullable = true) |– country: string (nullable = true) |– salesRepEmployeeNumber: integer (nullable = true) |– creditLimit: decimal(10,2) (nullable = true)scala>Now, let’s run the constraint suggestions —import com.

amazon.

deequ.

suggestions.

{ConstraintSuggestionRunner, Rules}import spark.

implicits.

_ // for toDS method// We ask deequ to compute constraint suggestions for us on the dataval suggestionResult = { ConstraintSuggestionRunner() // data to suggest constraints for .

onData(datasource) // default set of rules for constraint suggestion .

addConstraintRules(Rules.

DEFAULT) // run data profiling and constraint suggestion .

run()}// We can now investigate the constraints that Deequ suggested.

val suggestionDataFrame = suggestionResult.

constraintSuggestions.

flatMap { case (column, suggestions) => suggestions.

map { constraint => (column, constraint.

description, constraint.

codeForConstraint) } }.

toSeq.

toDS()Once the execution is complete, you can print the suggestions as shown below —scala> suggestionDataFrame.

toJSON.

collect.

foreach(println){"_1":"addressLine1","_2":"'addressLine1' is not null","_3":".

isComplete("addressLine1")"}{"_1":"city","_2":"'city' is not null","_3":".

isComplete("city")"}{"_1":"contactFirstName","_2":"'contactFirstName' is not null","_3":".

isComplete("contactFirstName")"}{"_1":"state","_2":"'state' has less than 69% missing values","_3":".

hasCompleteness("state", _ >= 0.

31, Some("It should be above 0.

31!"))"}{"_1":"salesRepEmployeeNumber","_2":"'salesRepEmployeeNumber' has less than 25% missing values","_3":".

hasCompleteness("salesRepEmployeeNumber", _ >= 0.

75, Some("It should be above 0.

75!"))"}{"_1":"salesRepEmployeeNumber","_2":"'salesRepEmployeeNumber' has no negative values","_3":".

isNonNegative("salesRepEmployeeNumber")"}{"_1":"customerName","_2":"'customerName' is not null","_3":".

isComplete("customerName")"}{"_1":"creditLimit","_2":"'creditLimit' is not null","_3":".

isComplete("creditLimit")"}{"_1":"creditLimit","_2":"'creditLimit' has no negative values","_3":".

isNonNegative("creditLimit")"}{"_1":"country","_2":"'country' is not null","_3":".

isComplete("country")"}{"_1":"country","_2":"'country' has value range 'USA', 'Germany', 'France', 'Spain', 'UK', 'Australia', 'Italy', 'New Zealand', 'Switzerland', 'Singapore', 'Finland', 'Canada', 'Portugal', 'Ireland', 'Norway ', 'Austria', 'Sweden', 'Belgium' for at least 84.

0% of values","_3":".

isContainedIn("country", Array("USA", "Germany", "France", "Spain", "UK", "Australia", "Italy", "New Zealand", "Switzerland", "Singapore", "Finland", "Canada", "Portugal","Ireland", "Norway ", "Austria", "Sweden", "Belgium"), _ >= 0.

84, Some("It should be above 0.

84!"))"}{"_1":"postalCode","_2":"'postalCode' has less than 9% missing values","_3":".

hasCompleteness("postalCode", _ >= 0.

9,Some("It should be above 0.

9!"))"}{"_1":"customerNumber","_2":"'customerNumber' is not null","_3":".

isComplete("customerNumber")"}{"_1":"customerNumber","_2":"'customerNumber' has no negative values","_3":".

isNonNegative("customerNumber")"}{"_1":"contactLastName","_2":"'contactLastName' is not null","_3":".

isComplete("contactLastName")"}{"_1":"phone","_2":"'phone' is not null","_3":".

isComplete("phone")"}This means your test cases are ready.

Now let’s run the metrics computation.

Metrics ComputationLooking at the columns and suggestions, now I want to run the metrics computations.

Here is how you can do so —import com.

amazon.

deequ.

analyzers.

runners.

{AnalysisRunner, AnalyzerContext}import com.

amazon.

deequ.

analyzers.

runners.

AnalyzerContext.

successMetricsAsDataFrameimport com.

amazon.

deequ.

analyzers.

{Compliance, Correlation, Size, Completeness, Mean, ApproxCountDistinct, Maximum, Minimum, Entropy, GroupingAnalyzer}val analysisResult: AnalyzerContext = { AnalysisRunner // data to run the analysis on .

onData(datasource) // define analyzers that compute metrics .

addAnalyzer(Size()) .

addAnalyzer(Completeness("customerNumber")) .

addAnalyzer(ApproxCountDistinct("customerNumber")) .

addAnalyzer(Minimum("creditLimit")) .

addAnalyzer(Mean("creditLimit")) .

addAnalyzer(Maximum("creditLimit")) .

addAnalyzer(Entropy("creditLimit")) .

run()}On a successful run, you can see the results// retrieve successfully computed metrics as a Spark data frameval metrics = successMetricsAsDataFrame(spark, analysisResult)metrics.

show()scala> metrics.

show()+——-+————–+——————-+—————–+| entity| instance| name| value|+——-+————–+——————-+—————–+| Column| creditLimit| Entropy|4.

106362796873961|| Column|customerNumber| Completeness| 1.

0|| Column|customerNumber|ApproxCountDistinct| 119.

0|| Column| creditLimit| Minimum| 0.

0|| Column| creditLimit| Mean|67659.

01639344262|| Column| creditLimit| Maximum| 227600.

0||Dataset| *| Size| 122.

0|+——-+————–+——————-+—————–+You can also store these numbers for further verifications or to even show trends.

In this example, we are running ApproxCountDistinct, this is calculated using HyperLogLog algorithm.

This reduces the burden on the source system by approximating the distinct count.

Full list of available Analyzers can be found at — https://github.

com/awslabs/deequ/tree/master/src/main/scala/com/amazon/deequ/analyzersConstraint VerificationNow let’s run test cases using the verification suites.

import com.

amazon.

deequ.

{VerificationSuite, VerificationResult}import com.

amazon.

deequ.

VerificationResult.

checkResultsAsDataFrameimport com.

amazon.

deequ.

checks.

{Check, CheckLevel}val verificationResult: VerificationResult = { VerificationSuite() // data to run the verification on .

onData(datasource) // define a data quality check .

addCheck( Check(CheckLevel.

Error, "Data Validation Check") .

hasSize(_ == 122 ) .

isComplete("customerNumber") // should never be NULL .

isUnique("customerNumber") // should not contain duplicates .

isNonNegative("creditLimit")) // should not contain negative values // compute metrics and verify check conditions .

run()}Once the run is complete, you can look at the results// convert check results to a Spark data frameval resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)resultDataFrame.

show()scala> resultDataFrame.

show()+——————-+———–+————+——————–+—————–+——————+| check|check_level|check_status| constraint|constraint_status|constraint_message|+——————-+———–+————+——————–+—————–+——————+|Data Validate Check| Error| Success|SizeConstraint(Si.

| Success| ||Data Validate Check| Error| Success|CompletenessConst.

| Success| ||Data Validate Check| Error| Success|UniquenessConstra.

| Success| ||Data Validate Check| Error| Success|ComplianceConstra.

| Success| |+——————-+———–+————+——————–+—————–+——————+If a particular case is failed, you can take a look at the details as shown belowresultDataFrame.

filter(resultDataFrame("constraint_status")==="Failure").

toJSON.

collect.

foreach(println)Data Validation on Incremental DataDeequ also provides a way to validate incremental data loads.

You can read more about this approach at — https://github.

com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/algebraic_states_example.

mdAnomaly DetectionDeequ also provided an approach to detect anomalies.

The GitHub page lists down some approaches and strategies.

Details can be found here — https://github.

com/awslabs/deequ/tree/master/src/main/scala/com/amazon/deequ/anomalydetectionConclusionOverall, I see Deequ as a great tool be used for data validation and quality testing in Data Lakes/ Hub/Data Warehouse kind of use cases.

Amazon has even published a research paper about this approach.

This can be viewed at — http://www.

vldb.

org/pvldb/vol11/p1781-schelter.

pdfIf you try this out, do let me know your experience.

If you have some interesting ideas to take this further, please don’t forget to mention those in comments.

.. More details

Leave a Reply