Speeding Up and Perfecting Your Work Using Parallel Computing

Speeding Up and Perfecting Your Work Using Parallel ComputingA detailed guide of Python multiprocessing vs.

PySpark mapPartitionYitong RenBlockedUnblockFollowFollowingMar 18In science, behind every achievement is grinding, rigorous work.

And success is unlikely to happen with one attempt.

As a data scientist, you probably deal with huge amount of data and computations, perform repeated tests and experiments on your day-to-day work.

Though you don’t want to turn your rewarding, stimulating job into tedious one by waiting the time-consuming operation repeating again and again, observation after observation.

Photo from UnsplashThe computers today have multiple processors to allow running multiple functions simultaneously.

However, this doesn’t help if the programmer isn’t aware of it, or doesn’t know how to use it, which leads to me writing this post to demonstrate how to parallelize your process using Python multiprocessing and PySpark mapPartition.

Machine learning modelData science is a varied field that includes a wide range of work.

One important application is to build machine learning pipelines and personalized data products to better target customers and enable more accurate decision making.

In this post, I choose the step of cross-validation with stratified sampling in building a machine learning model as an example to parallelize it in both Python and Spark environment.

The data I use is bank marketing data set from UCI machine learning repository for its relatively clean nature.

Since this post focuses on parallelism, I skip the steps of data explorative analysis, feature engineering, and feature selection.

After some pre-processing, I obtained the data on the left that is ready for modeling, column y is our target variable which is whether the client subscribed to a term deposit during the campaign.

Now let’s get into details of the model selection step with cross validation.

As a common practice, I include most classifiers from scikit-learn into a list called classifiers, and use StratifiedShuffleSplit method from sklearn.

model_selection to perform this step.

For model comparison purpose, I calculated accuracy score, log loss and AUC.

This step took 46s, below is the code:sss = StratifiedShuffleSplit(n_splits = 10, test_size = 0.

1, random_state = 0)start = time.

time()res = []for train_index, test_index in sss.

split(X, y): X_train, X_test = X[train_index], X[test_index] y_train, y_test = y[train_index], y[test_index] for clf in classifiers: name = clf.

__class__.

__name__ clf.

fit(X_train, y_train) y_pred = clf.

predict(X_test) y_pred_probas = clf.

predict_proba(X_test) acc = accuracy_score(y_test, y_pred) loss = log_loss(y_test, y_pred) fpr, tpr, thresholds = roc_curve(y_test, y_pred_probas[:,1]) auc_score = auc(fpr, tpr) res.

append([name, acc, loss, auc_score])print('The cross-validation with stratified sampling took time (s): {}'.

format(time.

time() – start))Just like in many Kaggle competitions, the winner is again gradient boosting:Python multiprocessingIn Python, the multiprocessing module is designed to divide work between multiple processes to improve the performance.

Here I used the pool class with map method to split the iterable to separate tasks.

Below code runs the same process using 4 processors, it completed within 20s.

import itertoolsimport multiprocessingfrom multiprocessing import Poolcv_index = [(i, j) for i, j in sss.

split(X, y)]params = list(itertools.

product(cv_index, classifiers))def cv_test(params): global X global y train_index = params[0][0] test_index = params[0][1] clf = params[1] X_train, X_test = X[train_index], X[test_index] y_train, y_test = y[train_index], y[test_index] name = clf.

__class__.

__name__ clf.

fit(X_train, y_train) y_pred = clf.

predict(X_test) y_pred_probas = clf.

predict_proba(X_test) acc = accuracy_score(y_test, y_pred) loss = log_loss(y_test, y_pred) fpr, tpr, thresholds = roc_curve(y_test, y_pred_probas[:,1]) auc_score = auc(fpr, tpr) return [name, acc, loss, auc_score]p = Pool(processes = 4)start = time.

time()res = p.

map(cv_test, params)p.

close()p.

join()print('The cross-validation with stratified sampling on 5 cores took time (s): {}'.

format(time.

time() – start))Spark mapPartitionI am always a big fun of Spark.

Rdd is the fundamental data structure of Spark.

Imagine that rdd as a group of many rows, and Spark converts these rows into multiple partitions.

mapPartition enables to call the function on each partition whose content is available as a sequential stream of values via the input argument iterator.

Below code shows how to rewrite the same process in PySpark environment.

It completed within 20s as well.

import pysparkfrom pyspark.

sql import SparkSessionfrom pyspark import SparkContext, SparkConfspark = SparkSession.

builder.

appName("pyspark-test").

getOrCreate()bc_X = spark.

sparkContext.

broadcast(X)bc_y = spark.

sparkContext.

broadcast(y)def map_partitions_exec(X, y): def cv_test(iterator): for i in iterator: train_index = i[0][0] test_index = i[0][1] clf = i[1] X_train, X_test = X[train_index], X[test_index] y_train, y_test = y[train_index], y[test_index] name = clf.

__class__.

__name__ clf.

fit(X_train, y_train) y_pred = clf.

predict(X_test) y_pred_probas = clf.

predict_proba(X_test) acc = accuracy_score(y_test, y_pred) loss = log_loss(y_test, y_pred) fpr, tpr, thresholds = roc_curve(y_test, y_pred_probas[:,1]) auc_score = auc(fpr, tpr) yield [name, acc, loss, auc_score] return cv_testparams_spark = spark.

sparkContext.

parallelize(params,4)res = params_spark.

mapPartitions(map_partitions_exec(bc_X.

value, bc_y.

value))start = time.

time()res = res.

collect()print('The cross-validation with stratified sampling using spark took time (s): {}'.

format(time.

time() – start))Hopefully, these little coding tricks can help be more productive in your daily work.

References:https://www.

dummies.

com/programming/big-data/data-science/running-in-parallel-python-for-data-science/https://www.

programcreek.

com/python/example/91149/sklearn.

model_selection.

StratifiedShuffleSplithttps://stackoverflow.

com/questions/21185092/apache-spark-map-vs-mappartitions/21194559.

. More details

Leave a Reply