An Introduction to Apache, PySpark and Dataframe TransformationsA Comprehensive Guide to Master Big Data AnalysisVictor RomanBlockedUnblockFollowFollowingJun 12Introduction: The Big Data ProblemApache arises as a new engine and programming model for data analytics.
It’s origin goes back to 2009, and the main reasons why it has gained so much importance in the past recent years are due to changes in enconomic factors that underline computer applications and hardware.
Historically, the power of computers only grew with time.
Each year, new processors were able to perform operations faster and the applications that run on top of them automatically got faster.
All of this changed in 2005, when the limits in heat disipation caused the switch from making individual processors faster, to start exploring the parallelization of CPU cores.
This meant that applications and the code that run them must be changed too.
All of this is what layed out the ground of new models like Apache Spark.
In addition, the cost of sensors and storing units only had decreased on the last years.
Nowadays is completely unexpensive to collect and store vast amounts of information.
There is so much data available, that the way to process it and analyze it, must change radically too, by making large parallel computations on clusters of cumputers.
These clusters enable the synergic combination of those computers’ power, simultaneously, and make much easier tackling expensive computational tasks like data processing.
And this is where Apache Spark comes into play.
What is Apache SparkAs found on the great book: Spark — The Definitive Guide:“Apache Spark is a unified computing engine and a set of libraries for parallel data procesing on clusters of computers”Nowadays, Apache Spark is the most popular open source engine to Big Data processing.
And the main reasons are:It supports programming languages as widely used as: Python, Scala, Java and R.
It supports SQL tasks.
It enables data streaming.
It has libraries for Machine Learning and Deep Learning.
It can be run in a single machine or in a cluster of computers.
The following is an sketch that illustrates the different libraries available in the Spark ecosystem.
How to Set Up and Run Apache SparkThroughout these series of articles, we will focus on Apache Spark Python’s library, PySpark.
As stated before, Spark can be run both locally and in a cluster of computers.
There are several ways to configure our machines to run Spark locally, but are out of the scope of these articles.
One of the simplest and fastest ways to work with PsyPark and unlock its inmense processing power, is with the free website Databricks, concretely by using its Community Edition.
To get started we shoud simply go to:Try DatabricksUnlimited clusters that can scale to any size Job scheduler to execute jobs for production pipelines Fully interactive…databricks.
comAnd select its Community Edition:Then, we must create and account.
Running A Temporal ClusterOnce we have created an account, to be able to start working, we shoud create a temporary cluster.
As it is a free version, these clusters have a default of 6 Gb of RAM and can be run for 6 hours each.
In order to develop industrial projects or work with Data Pipelines, it is suggested to use the premiun platform.
But for the aim of these tutorials, the community edition will be more than enough.
Adding DataIn order to add data to work with:Click on the data tabThen add dataYou can work both with available data uploaded by other users or with data uploaded from your computer.
Once, done we can create a Table in a Notebook and we are all set up!Pyspark Applications & PartitionsTo understand how Apache Spark works we should talk about the core components of a Spark Application: The Driver, the Executors and the Cluster Manager.
The following is a very illustrative sketch of a Spark Application Architecture:DriverThe driver is located in a node of the cluster of computers and performs three main tasks:Holds information about the Spark ApplicationResponds to a input, for example a user’s programmAnalyzes, distributes and schedules the tasks to be done by the executers.
ExecutorsThe executors are the ones that actually perform the work assigned by the driver.
They do two things:Executing the code assigned to them.
Reporting the state of the computation to the driver.
Cluster ManagerThe cluster manager is the responsible for:Controlling the physical computersDistributing resources to Spark ApplicationsThere can be several Spark Applications running on the same cluster, at the same time, and all of them will be managed by the Cluster Manager.
PySpark DataframesApache Spark works with several data abstractions, each with an specific interface to work with.
The most common abstractions are:DatasetsDataframesSQL TablesResilient Distributed DatasetsThroughout these series we will focus on the most common unit to represent and store data in Apache Spark, Dataframes.
Dataframes are data tables with rows and columns, the closest analogy to understand them are spreadsheets with labeled columns.
One important feature of Dataframes is their schema.
A Dataframe’s schema is a list with its columns names and the type of data that each column stores.
Other relevant attribute of Dataframes is that they are not located in one simple computer, in fact they can be splitted through hundreds of machines.
This is due to optimize the processing of the information and when data is too large to fit a single machine.
Apache PartitionsAs stated before, the executors perform the work assigned by the driver, and they do it in a parallel fashion, in order to be able to do this, Spark split data into different partitions.
These partitions are collections of rows located in a single computer within a cluster.
When we talk about Dataframe’s partitions we are talking about how the data is distributed across all the machines on our cluster.
Most of the time we will not specify explicitly how the partitions will be done in our clusters, but with our code we will transmit high-level transformations of the data and Spark will realize by itself which is the optimal way to perform these partitions.
Always looking for obtaining the maximum processing efficiency.
Low level APIs to perform these operations are out of the scope of these series.
Dataframes TransformationsFirst of all, we have to understand that transformations are modifications that we specify to do to our dataframes.
These transformations are specified in a high-level fashion and will not be executed until we explicitely call for an action to be made.
This way of working is called lazy evaluation, and the aim is to improve efficiency.
When we call for transformations to be made, Spark will desing a plan to perform optimally these tasks, and will not execute it until the very last minute when we call an action (like .
show() or .
collect())Apple Stock PriceNow, we will explore some of the most common actions and transformations.
We are going to work with Apple stock price’s data, from 2010 to 2016.
We will perform some exploratory data analysis, data transformations, deal with missing values and perform grouping and aggregating.
Import DataframeTo initialize and display a datframe, the code will be the following:# File location and typefile_location = "/FileStore/tables/appl_stock.
csv"file_type = "csv"# CSV optionsinfer_schema = "true"first_row_is_header = "true"delimiter = ","# The applied options are for CSV files.
For other file types, these will be ignored.
df = spark.
read.
format(file_type) .
option("inferSchema", infer_schema) .
option("header", first_row_is_header) .
option("sep", delimiter) .
load(file_location)# Display Dataframedisplay(df)Get Dataframe’s SchemaThe schema of a dataframe is the description of the structure of the data, it is a collection of StructField objects and provides information about the type of the data in a dataframe.
To diplay the Dataframe’s Schema is as simple as:# Display Dataframe's Schemadf.
printSchema()Perform Filtering and TransformationsTo filter our data, to get only those rows that have a closing price smaller that $500, we could run the following line of code:# Filter data usign pysparkdf.
filter(" Close < 500").
show())We can also filter to only obtain certain columns:# Filter data by columnsdf.
filter("Close < 500").
select(['Open', 'Close']).
show()To filter by one column and showing other, we will use the .
select() mehtod.
# Filter by one column and show otherdf.
filter(df['Close'] < 500).
select('Volume').
show()To filter by multiple conditions:# Filter by multiple conditions: closing price < $200 and opening price > $200df.
filter( (df['Close'] < 200) & (df['Open'] > 200) ).
show()Obtain a Statistic Summary of the DataSimilarly to other libraries likePandas, we can obtain a statistic summary of the Dataframe by simply running the .
describe() method.
# Display Statistic Summarydf.
describe().
show()Add and Rename ColumnsTo add a new column to the dataframe, we will use the .
withColumn() method as follows.
# Display Dataframe with new columndf.
withColumn('Doubled Adj Close', df['Adj Close']*2).
select('Adj Close', 'Doubled Adj Close').
show()To rename an existing column, we will use the .
withColumnRenamed() method.
# Display Dataframe with renamed columndf.
withColumnRenamed('Adj Close', 'Adjusted Close Price').
show()Grouping and Aggregating DataNow, we will perform some gruping and aggretion of our data, in order to obtain meaningful insights.
But first, we should import some libraries# Import relevant librariesfrom pyspark.
sql.
functions import dayofmonth,hour,dayofyear,weekofyear,month,year,format_number,date_format,mean, date_format, datediff, to_date, litNow, let us create a new column, with the year of each row:# To know th average closing price per yearnew_df = df.
withColumn('Year', year(df['Date']))new_df.
show()Now, lets group by this recently created ‘Year’ column and aggreagate by the maximum, minimum and average prices of each year to obtain meaningful insights of the status and evolution of the price.
# Group and aggregate datanew_df.
groupBy('Year').
agg(f.
max('Close').
alias('Max Close'), f.
min('Close').
alias('Min Close'), f.
mean('Close').
alias('Average Close')).
orderBy('Year').
show()We have achieved our goal!.However, we still have some very difficult data to read.
In fact we have way more decimals than we need.
Taking into account that we are working with prices of hundreds of dollars, more than two decimals do not provide us with relevant information.
So let’s take advantage and learn to format the results to show us the number of decimals we want.
Formating Our DataTo format our data we will use the format_number() function as follows:# Import relevant functionsfrom pyspark.
sql.
functions import forman_number, col# Select the appropiate columns to formatcols = ['Max Close', 'Min Close', 'Average Close']# Format the columnsformatted_df = new_df.
select('Year', *[format_number(col(col_name), 2).
name(col_name) for col_name in cols])User Defined FunctionsLet’s learn now how to apply functions defined by us to our dataframes.
We will use it in this example to get a column with the month of the year in which each row was recorded.
# Import relevant functionsfrom pyspark.
sql.
functions import date_format, datediff, to_date, lit, UserDefinedFunction, monthfrom pyspark.
sql.
types import StringTypefrom pyspark.
sql import functions as F# Create month listmonth_lst = ['January', 'Feburary', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December']# Define the functionudf = UserDefinedFunction(lambda x: month_lst[int(x%12) – 1], StringType())# Add column to df with the number of the month of the yeardf = df.
withColumn('moy_number', month(df.
Date))# Apply function and generate a column with the name of the month of the yeardf = df.
withColumn('moy_name', udf("moy_number"))Success!ConclusionThroughout this article we have covered:The basis of Apache SparkWe have gained an intuition of why it is important and how it operatesPerform analysis operations with PySpark and DataframesOn the next articles we will learn how to apply Machine Learning in PySpark and apply this knowledge to some projects.
Stay tuned!.