PySpark Macro DataFrame Methods: join() and groupBy()

PySpark Macro DataFrame Methods: join() and groupBy()Perform SQL-like joins and aggregations on your PySpark DataFrames.

Todd BirchardBlockedUnblockFollowFollowingJun 24We’ve had quite a journey exploring the magical world of PySpark together.

After covering DataFrame transformations, structured streams, and RDDs, there are only so many things left to cross off the list before we’ve gone too deep.

To round things up for this series, we’re going to take a look back at some powerful DataFrame operations we missed.

In particular, we’ll be focusing on operations which modify DataFrames as a whole, such as Joins and Aggregations.

Let’s start with Joins then we can visit Aggregation and close out with some Visualization thoughts.

Joining DataFrames in PySparkI’m going to assume you’re already familiar with the concept of SQL-like joins.

To demonstrate these in PySpark, I’ll create two simple DataFrames:-A customers DataFrame ( designated DataFrame 1 );An orders DataFrame ( designated DataFrame 2).

Our code to create the two DataFrames follows# DataFrame 1valuesA = [(1, 'bob', 3462543658686), (2, 'rob', 9087567565439), (3, 'tim', 5436586999467), (4, 'tom', 8349756853250)]customersDF = spark.

createDataFrame(valuesA,['id', 'name', 'credit_card_number']) # DataFrame 2valuesB = [(1, 'ketchup', 'bob', 1.

20), (2, 'rutabaga', 'bob', 3.

35), (3, 'fake vegan meat', 'rob', 13.

99), (4, 'cheesey poofs', 'tim', 3.

99), (5, 'ice cream', 'tim', 4.

95), (6, 'protein powder', 'tom', 49.

95)]ordersDF = spark.

createDataFrame(valuesB,['id', 'product_name', 'customer', 'price']) # Show tablescustomersDF.


show()Here’s how they look:The DataFrames we just created.

Now we have two simple data tables to work with.

Before we join these two tables it’s important to realize that table joins in Spark are relatively “expensive” operations, which is to say that they utilize a fair amount of time and system resources.

Inner JoinsWithout specifying the type of join we’d like to execute, PySpark will default to an inner join.

Joins are possible by calling the join() method on a DataFrame:joinedDF = customersDF.

join(ordersDF, customersDF.

name == ordersDF.

customer)The join() method operates on an existing DataFrame and we join other DataFrames to an already existing DataFrame.

The first argument in the join() method is the DataFrame to be added or joined.

Next, we specify the “ on” of our join.

In our example, we’re telling our join to compare the “name” column of customersDF to the “customer” column of ordersDF.

Here’s how it turned out:Right, Left, and Outer JoinsWe can pass the keyword argument “ how” into join(), which specifies the type of join we'd like to execute.

The how parameter accepts inner, outer, left, and right, as you might imagine.

We can also pass a few redundant types like leftOuter (same as left) via the how parameter.

Cross JoinsThe last type of join we can execute is a cross join, also known as a Cartesian join.

Cross joins are a bit different from the other types of joins, thus cross joins get their very own DataFrame method:joinedDF = customersDF.

crossJoin(ordersDF)Cross joins create a new row in DataFrame #1 per record in DataFrame #2:Anatomy of a cross join.

Using our simple example you can see that PySpark supports the same type of join operations as the traditional, persistent database systems such as Oracle, IBM DB2, Postgres and MySQL.

PySpark creates Resilient Distributed DataFrames ( RDD ) using an in-memory approach.

As we mentioned performing these kind of join operations will be expensive and time consuming within the Cluster.

Next, we will discuss Aggregating Data which is a core strength of Spark.

Aggregating DataSpark allows us to perform powerful aggregate functions on our data, similar to what you’re probably already used to in either SQL or Pandas.

The data I’ll be aggregating is a dataset of NYC motor vehicle collisions because I’m a sad and twisted human being!We’re going to become familiar with two functions here: agg() and groupBy().

These are typically used in tandem, but agg() can be used on a dataset without groupBy():df.

agg({"*": "count"}).

show()Aggregating without performing groupBy() typically isn't entirely useful:+——–+|count(1)|+——–+| 1000 |+——–+Let’s derive some deeper meaning from our data by combining agg() with groupby().

Using groupBy()Let’s see which Boroughs lead the way in terms of the number of accidents:import pyspark.


functions as fdf.





show()The results:+————-+—–+| borough |count|+————-+—–+| QUEENS | 241 || BROOKLYN | 182 || BRONX | 261 || MANHATTAN | 272 ||STATEN ISLAND| 44 |+————-+—–+Manhattan leads the way with 272 accidents from our sample!.Get your shit together, Manhattan.

Let’s see which borough is the deadliest:df.





orderBy('injuries', ascending=False).

show()Here we go:+————-+——–+| borough |injuries|+————-+——–+| MANHATTAN | 62 || QUEENS | 59 || BRONX | 57 || BROOKLYN | 47 ||STATEN ISLAND| 14 |+————-+——–+Well… alright then.

Let’s avoid Manhattan!Grouping By Multiple ColumnsOften times we’ll want to group by multiple columns to see more complex breakdowns.

Here we group by both Borough and “main contributing factor”:aggDF = df.

groupby('borough', 'contributing_factor_vehicle_1').




orderBy('borough', 'injuries', ascending=False)aggDF = aggDF.


injuries > 1)display(aggDF)This will show us the most common type of accidents per borough:Drivers in Manhattan need to pay attention!.Get off your phones!!So far we’ve aggregated by using the count and sum functions.

As you might imagine, we could also aggregate by using the min, max, and avg functions.

There's one additional function worth special mention as well called corr().

The corr function helps us determine the strength of correlations between columns.

Determining Column CorrelationIf you’re the Data Science type, you’re going to love aggregating using corr().

corr() determines the correlation strength of two columns, and outputs an integer which represents the correlation:df.

agg(corr("a", "b").


collect()Example output:[Row(correlation=1.

0)]You will find, using the Aggregation functions of PySpark, that you can get into powerful aggregation pipelines and really answer complicated questions.

The answers to those questions need to be presented in a pleasing and easy to understand Visual form.

Let us consider Visualizations over those Aggregates.

Databricks Visualizations on AggregationsIf you’re following along in a Databricks notebook, there are tonnes of cool visualizations that come standard with the display() command to compliment any aggregations we perform.

These are especially useful when trying to understand the distribution of aggregates we create.

I went ahead and pieced together a breakdown of people injured in accidents below.

We’re splitting our results by Borough, and then seeing the distribution of people injured between cyclists and motorists:Creating a visualization in Databricks.

While creating a bar plot, “ keys” determines the values across the x-axis.

I’m measuring by a number of “ values” here, which is to say that multiple measurements across the y-axis will be shown.

This particular chart lends itself well to a stacked bar chart, which we create by specifying bar chart as our display type, and then specifying stacked in the additional options.

Databricks allows for all sorts of additional cool visualizations like geographical charts, scatter plots, and way more.

It seems it is far safer to walk in Manhattan!Happy TrailsWe’ve been through a lot on this PySpark journey together.

As much as I’d love to keep you here forever, every good parent knows when it’s time for their children to leave the nest and fly on their own.

I’ll leave you with some advice my parents gave me: go get a job and get out of my god-damn house.

Originally published at https://hackersandslackers.

com on June 24, 2019.


. More details

Leave a Reply