Cleaning PySpark DataFrames

Yes, there is an empty cell in literally every row.

Here's where we benefit from passing column names to subset:df = df.

dropna(subset=['postal_code', 'city', 'country', 'address_1'])display(df)Things are looking cleaner already:Replacing N/A ValuesWhile N/A values can hurt our analysis, sometimes dropping these rows altogether is even more problematic.

Consider the case where we want to gain insights to aggregated data: dropping entire rows will easily skew aggregate stats by removing records from the total pool and removing records which should have been counted.

In these cases, fillna() is here to help.

fillna() accepts a value, and will replace any empty cells it finds with that value instead of dropping rows:df = df.

fillna(0)display(df)fillna() also accepts an optional subset argument, much like dropna().

Dropping Duplicate RowsAnother top-10 method for cleaning data is the dropduplicates() method.

By itself, calling dropduplicates() on a DataFrame drops rows where all values in a row are duplicated by another row.

Like the other two methods we've covered so far, dropduplicates() also accepts the subset argument:df = df.

dropduplicates(subset="recall_number")display(df)Selecting Data From a DataFrameThere’s another problem: it has a lot of useless columns.

No, seriously, check out what happens when I run df.

printSchema():root |– classification: string (nullable = true) |– center_classification_date: timestamp (nullable = true) |– report_date: timestamp (nullable = true) |– postal_code: string (nullable = true) |– termination_date: timestamp (nullable = true) |– recall_initiation_date: timestamp (nullable = true) |– recall_number: string (nullable = true) |– city: string (nullable = true) |– more_code_info: string (nullable = true) |– event_id: integer (nullable = true) |– distribution_pattern: string (nullable = true) |– openfda_application_number: string (nullable = true) |– openfda_brand_name: string (nullable = true) |– openfda_dosage_form: string (nullable = true) |– openfda_generic_name: string (nullable = true) |– openfda_manufacturer_name: string (nullable = true) |– openfda_product_ndc: string (nullable = true) |– openfda_product_type: string (nullable = true) |– openfda_route: string (nullable = true) |– openfda_substance_name: string (nullable = true) |– openfda_spl_id: string (nullable = true) |– openfda_spl_set_id: string (nullable = true) |– openfda_pharm_class_moa: string (nullable = true) |– openfda_pharm_class_cs: string (nullable = true) |– openfda_pharm_class_pe: string (nullable = true) |– openfda_pharm_class_epc: string (nullable = true) |– openfda_upc: string (nullable = true) |– openfda_unii: string (nullable = true) |– openfda_rxcui: string (nullable = true) |– recalling_firm: string (nullable = true) |– voluntary_mandated: string (nullable = true) |– state: string (nullable = true) |– reason_for_recall: string (nullable = true) |– initial_firm_notification: string (nullable = true) |– status: string (nullable = true) |– product_type: string (nullable = true) |– country: string (nullable = true) |– product_description: string (nullable = true) |– code_info: string (nullable = true) |– address_1: string (nullable = true) |– address_2: string (nullable = true) |– product_quantity: string (nullable = true)I don’t even know what some of these columns are.

Even if I were to drop every column I didn’t recognize, I’d still be wasting a ton of time going down this list.

Luckily, we have select().

When we call select() on a DataFrame, we can explicitly call out which columns to keep:df = df.

select('classification', 'report_date', 'termination_date', 'city', 'distribution_pattern', 'status', 'product_type', 'country', 'product_description', 'address_1', 'address_2', 'product_quantity')display(df)Our data is starting to look digestible by humans (unlike the presumably horrible drugs listed in our table):Filtering DataSometimes we’ll work with data which has a number of outliers which skew our results.

Other times, we’ll only want to work with rows which belong to a particular subset of information.

Here’s how we’d clean our data to only contain cases which occurred in South San Francisco:df = df.

filter(df.

city == "South San Francisco")The contents of filter() will always be a conditional where we compare the values in a certain column against an intended value.

The easiest way to access a DataFrame's column is by using the df.

column_name syntax.

In our case, we’re comparing a column holding strings against a provided string, South San Francisco (for numerical values, we could use the greater-than and less-than operators as well).

Filtering by String ValuesAside from filtering by a perfect match, there are plenty of other powerful ways to filter by strings in PySpark.

Take a look:df.

filter(df.

city.

contains('San Francisco'): Returns rows where strings of a column contain a provided substring.

In our example, filtering by rows which contain the substring "San Francisco" would be a good way to get all rows in San Francisco, instead of just "South San Francisco".

df.

filter(df.

city.

startswith('San')): Returns rows where a string starts with a provided substring.

df.

filter(df.

city.

endswith('ice')): Returns rows where a string starts with a provided substring.

df.

filter(df.

city.

isNull()): Returns rows where values in a provided column are null.

df.

filter(df.

city.

isNotNull()): Opposite of the above.

df.

filter(df.

city.

like('San%')): Performs a SQL-like query containing the LIKE clause.

df.

filter(df.

city.

rlike('[A-Z]*ice$')): Performs a regexp filter.

df.

filter(df.

city.

isin('San Francisco', 'Los Angeles')): Looks for rows where the string value of a column matches any of the provided strings exactly.

In addition to filtering by strings, we can also filter by columns where the values are stored as dates or datetimes.

Perhaps the most useful way to filter dates is by using the between() method, which allows us to find results within a certain date range.

Here we find all the results which were reported in the years 2013 and 2014:df = df.

filter(df.

report_date.

between('2013-01-01 00:00:00','2015-01-11 00:00:00'))Filtering via where().

where() is another way of achieving the same effect that we accomplished with filter():df = df.

where((df.

city == "South San Francisco"))Sorting Our DataFrameFinally, there are a few ways we can sort the data in our DataFrame to our liking.

My preferred method is by using orderBy():df = df.

orderBy('report_date', ascending=False)orderBy() sorts results in the way we'd expect: string columns are sorted alphabetically, numerical columns are sorted chronologically, etc.

The ascending keyword parameter allows us to display these results descending order when ascending is equal to False.

Notice how we passed ‘report_date’ as a string, as opposed to df.

report_date?.PySpark allows us to do this for some reason.

This won’t work when filtering, however, because df = df.

filter("city" == "South San Francisco") looks like we're trying to evaluate a string against a string.

A shittier way of achieving the same effect is by using the sort() method.

sort() is shittier than orderBy() because the syntax is uglier, and because it requires us to import something just to list our results in descending order:from pyspark.

sql.

functions import descdf = df.

sort(desc("published_at"))That’s all the time we have for today folks.

Join us next time when we explore the magical world of transforming DataFrames in PySpark.

Originally published at https://hackersandslackers.

com on April 26, 2019.

.. More details

Leave a Reply