Basic usage of Spark RDDs and Data frames.

Basic usage of Spark RDDs and Data frames.

Ramesh GanesanBlockedUnblockFollowFollowingMay 31Today’s cluster computing arena spark is getting used for its fast and scalable application model.

while comparing spark with traditional map-reduce, it provides In-memory computing which is 10x faster and provides real-time data processing with Spark streams.

RDDsSpark provides a distributed collection object which is immutable and called Resilient distributed data.

RDDs are one of the core components of Spark and it is split into multiple partitions and processed in multiple nodes of the cluster.

Spark contains 4 main integrated components as below.

Spark Core — RDDsSpark — SQLSpark — StreamingMLib — Machine Learning Libraries.

Instead of going thru the detailed architecture of Spark, Let’s go thru a small python code example which shows how Spark RDDs & Spark SQL getting used.

In this example, we have UBER data, which contains the number of trips and vehicles per location by date.

We have the file stored in Hadoop cluster as “uber.

txt”.

It’s a comma delimited text file with the following columns.

dispatching_base_number (location number)date (trip date)active_vehicles (available vehicles for the date)trips (no of trips for the day)Before proceeding to the code, we need to learn the two important aspects of Spark operations.

They are1.

Transformation and Actions.

2.

Lazy Evaluation.

Transformation and ActionsTransformations help to create an RDD from an existing RDD.

Since RDDs are immutable we can’t update or modify the RDD but we can create a new RDD from an existing RDD.

Actions are the ones help to create the results from RDDs.

Lazy EvaluationLazy Evaluation processes the RDD transformations only when there is an RDD action occurs.

While in the sequence of transformation data is getting created by RDD, Spark creates the RDDs only it is needed by an Action.

It is one of the best ideas and makes sense for the big data which we process, otherwise it is a big impact on the memory which not really needed and they are intermediate phases of an RDD.

Having the above brief idea let’s go thru the program and find out the total no of trips by days from our data.

Let’s use Spark SQL to ease our job for group functions.

For Any Spark program, we need to create an application name and it is to track the Spark job in the Spark UI.

conf = SparkConf().

setAppName("UBER_APP")Creating the spark context from the spark configurationsc = SparkContext(conf=conf)Creating the first main RDD from the uber.

txt file.

uber_main_rdd = sc.

textFile("uber.

txt")The above file contains a header line for columns, so let’s store the first line in a new RDD.

Split command helps to split and store the text as columnuber_main_rdd_head = uber_main_rdd.

map(lambda x:(x.

split(","))).

first()Below code snippet help to get the data without header line into a new RDD.

By specifying a filter condition (not in) we are ignoring the first header line and converting the columns by datatype.

uber_main_rdd_wo_head = uber_main_rdd.

filter(lambda x: "dispatching_base_number" not in x).

map(lambda x:(x.

split(",")[0], datetime.

strptime(x.

split(",")[1],'%m/%d/%Y').

strftime('%A'), int(x.

split(",")[2]),int(x.

split(",")[3])))To make this RDD as Schema RDD (also called data frames or tables in regular SQL), we need to use Spark SQL.

Hence we need to create the SQL Context from Spark Context.

sqlctx = SQLContext(sc)By joining the header RDD and data RDD we can create the data frame and register the data frame as a temporary table named “uber_tab”df1 = uber_main_rdd_wo_head.

toDF(uber_main_rdd_head)df1.

registerTempTable("uber_tab")In the above created “uber_tab” can be used as a table for querying.

We can write a query to get the sum of trips by day using group by clause in spark SQL.

results1 = sqlctx.

sql("select a.

date, a.

tot_trips from (Select date, sum(trips) tot_trips from uber_tab group by date) a order by a.

tot_trips desc")below is the final action to display the data from the result set.

results1.

show()From the above action, Lazy evaluation starts the physical execution by creating the dependent RDD from transformation.

Here is the final result of the code.

I hope the above code gives an idea about basic spark programming by understanding the RDD execution and Spark SQL Spark RDDs provides a lot of APIs for Key-value pairs and multiple file support beginning from JSON and Hadoop files.

It’s all showcases that Spark is the best useful tool for big data analytics.

Full code snippet for ready execution.

from pyspark import SparkConf, SparkContextfrom pyspark.

sql import SQLContext, Rowfrom datetime import datetimeimport calendarimport sysif __name__ == "__main__": conf = SparkConf().

setAppName("UBER_APP") #conf = conf.

setMaster("local[*]") sc = SparkContext(conf=conf) uber_main_rdd = sc.

textFile("uber.

txt") uber_main_rdd_head = uber_main_rdd.

map(lambda x:(x.

split(","))).

first() uber_main_rdd_wo_head = uber_main_rdd.

filter(lambda x: "dispatching_base_number" not in x).

map(lambda x:(x.

split(",")[0], datetime.

strptime(x.

split(",")[1],'%m/%d/%Y').

strftime('%A'), int(x.

split(",")[2]),int(x.

split(",")[3]))) sqlctx = SQLContext(sc) df1 = uber_main_rdd_wo_head.

toDF(uber_main_rdd_head) df1.

registerTempTable("uber_tab") results1 = sqlctx.

sql("select a.

date, a.

tot_trips from (Select date, sum(trips) tot_trips from uber_tab group by date) a order by a.

tot_trips desc") results1.

show().

. More details

Leave a Reply