Distributed Data Pre-processing using Dask, Amazon ECS and Python (Part 2)

Source: pixabay.

comDistributed Data Pre-processing using Dask, Amazon ECS and Python (Part 2)Using Dask for EDA and Hyperparameters Optimization (HPO)Will badrBlockedUnblockFollowFollowingJan 3In Part 1 of this series, I explained how to build a serverless cluster of Dask scheduler and workers on AWS Fargate.

Scaling the number of workers up and down is quite simple.

You can achieve that by running the below AWS CLI commands:bash~# aws ecs update-service — service Dask-Workers — desired-count 10 — cluster Fargate-Dask-Cluster > /dev/nullbash~# aws ecs update-service — service Dask-Scheduler — desired-count 1 — cluster Fargate-Dask-Cluster > /dev/nullNow that I scaled up the serverless Fargate cluster, let’s try some exploratory data analysis (EDA).

I am using the famous NY Yellow Taxi 2017 Dataset.

First, let’s load our dataframe into the cluster memory.

To do so, simply import the dataframe class from the dask library then load the files using read_csv() function:The client.

persist() function works by persisting the data into memory asynchronously then it returns right away.

It will submit the task graphs to the cluster and return a Future object.

We can then run fast queries on the output dataframe.

Let’s run some analytics:1.

1 Dataframe AnalysisLet’s get some descriptive statistics on the dataframe.

The code should look exactly the same except that with Dask dataframes, you need to add a compute() function in order to get the results right away on the notebook.

As you can see, it took about 7 seconds to do some complex computations on the dataframe such as: (Calculate quantiles, mean, count, min, max and standard deviation).


2 Count the total trip distance and count for each vendor:It took about 5.

5 seconds to finish this groupby() statement.

As you can see, vendor 2 did much more trips that vendor 1 and drove a lot more distance.


3 Count Missing Values for Each Feature:No missing values in the dataset.


4 Display the correlation between Features:There are some clear correlations like the high positive correlation between fare_amount and trip_distance, tip_amount and fare_amount.

I will need to remove the highly correlated feature (total_amount) in order to avoid some problems like Multicollinearity problems.

Let’s try 1.

2 again but this time, I will persist the data in memory and will let it work in the background while I do some other tasks.

I will also print out the progress bar to see when the task is finished.

Persist function progress barNotice that the prompt returns right away as the data is being loaded/computed in the background using the cluster workers.

When the progress bar is finished (all green), you can query the output of the data quickly.

Notice that the prompt returns right away as the data is being loaded/computed in the background using the cluster workers.

Using Dask for Machine Learning:One of the most computationally expensive tasks in machine learning is the hyperparameters optimization (HPO).

HPO is a technique that is used to tune the ML parameters that are not learned during the training process such as learning rate, optimizer, regularization or number of hidden layers in a neural network.

It works by exploring the search space of a pre-defined range for each of the hyperparameters you are going to tune.

There are many libraries and techniques to carry out the HPO process but in this article, I will focus on tuning the hyperparameters using GridSearch technique and Scikit-Learn library in Python.

HPO using Grid Search:The Grid Search technique is an exhaustive searching bymanually specifying a subset of the hyperparameter space of a learning algorithm.

The algorithm will then go through each and every combination of the hyperparameters aiming at (maximizing/minimizing) the objective metrics (Accuracy/Loss).

It will end up giving the best results but the more hyperparameters you tune, the more complex the tuning process will get.

2-dimensional Grid Search for HPOHPO on Dask Cluster using Grid Search:Since the HPO process is computationally expensive, we will run it on a Dask Cluster in order to take advantage of the scale and elasticity.

Scikit-learn uses a very powerful library called joblib to parallelize processes across multiple CPU cores.

Joblib also provides an interface for other parallel systems to become an execution engine.

We can achieve this by usingparallel_backend context manager to run with thousands of cores in a cluster:First, we need to import joblib from sklearn externals then register Dask Distributed as a parallel backend engine for joblib.

from sklearn.


joblib import _dask, parallel_backendfrom sklearn.

utils import register_parallel_backendfrom sklearn.


joblib import parallel_backendregister_parallel_backend('distributed',_dask.

DaskDistributedBackend)Then, we need to run the following line to start using the cluster as an execution engine:with parallel_backend('distributed', scheduler_host='dask-Scheduler.

local-dask:8786'): <Normal sklearn Code>Then your sklearn code logic stays exactly the same with no changes.

Here is the full code for using GridSearch HPO to find the best hyperparameters for a Random Forest Classifier that will classify the handwritten digits from the MNIST dataset:from sklearn import datasetsfrom sklearn.

model_selection import train_test_splitfrom sklearn.

model_selection import GridSearchCVfrom sklearn.

ensemble import RandomForestClassifierimport numpy as npfrom time import timefrom sklearn.


joblib import _dask, parallel_backendfrom sklearn.

utils import register_parallel_backendregister_parallel_backend('distributed', _dask.

DaskDistributedBackend)# Loading the Digits datasetdigits = datasets.

load_digits()# To apply an classifier on this data, we need to flatten the image, to# turn the data in a (samples, feature) matrix:n_samples = len(digits.

images)X = digits.


reshape((n_samples, -1))y = digits.

target# Split the dataset in two equal partsX_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.

3, random_state=0)clf = RandomForestClassifier(n_estimators=20)# use a full grid over all parametersparam_grid = {"max_depth": [3,4,5,6, None], "max_features": [1, 3, 10, None], "min_samples_split": [2, 3, 10], "bootstrap": [True, False], "criterion": ["gini", "entropy"]}# run grid searchgrid_search = GridSearchCV(clf, param_grid=param_grid, cv=8, iid=True)start = time()with parallel_backend('distributed', scheduler_host='dask-Scheduler.

local-dask:8786'): grid_search.

fit(X, y) clf.

fit(X, y)print("GridSearchCV took %.

2f seconds for %d candidate parameter settings.

" % (time() – start, len(grid_search.

cv_results_['params'])))results = grid_search.

cv_results_ # Return the index of the best validation scoreidx = np.

flatnonzero(results['rank_test_score'] == 1 )print("The best score is: " + str(results['mean_test_score'][idx[0]])) #print the parameters for the best job print("Parameters: {0}".

format(results['params'][idx[0]]))The output of the above codeIt took about 40 seconds on a 10 nodes cluster to find the best combination of hyperparameters for the classifier, whereas on a single machine, (even with multiple cores/ multiple CPUs), it would take many minutes without the parallelization feature.

Summary:From my experience using Dask, it is a great library to preprocess large datasets in a distributed fashion.

If you are a fan of Pandas and Numpy and have trouble fitting your data into memory then Dask should definitely be the way to go.

It is absolutely a great solution when it comes to time and cost sensitive machine learning tasks such as HPO, data imputation, data preprocessing and exploratory analysis.


. More details

Leave a Reply