How to Run Parallel Data Analysis in Python using Dask Dataframes

I set out to try the Dask Dataframes out for this Article, and ran a couple benchmarks on them.

Reading the docsWhat I did first was read the official documentation, to see what exactly was recommended to do in Dask’s instead of regular Dataframes.

Here are the relevant parts from the official docs:Manipulating large datasets, even when those datasets don’t fit in memoryAccelerating long computations by using many coresDistributed computing on large datasets with standard Pandas operations like groupby, join, and time series computationsAnd then below that, it lists some of the things that are really fast if you use Dask Dataframes:Arithmetic operations (multiplying or adding to a Series)Common aggregations (mean, min, max, sum, etc.

)Calling apply (as long as it’s along the index -that is, not after a groupby(‘y’) where ‘y’ is not the index-)Calling value_counts(), drop_duplicates() or corr()Filtering with loc, isin, and row-wise selectionJust a small brush up on filtering Dataframes, in case you find it useful.

How to use Dask DataframesDask Dataframes have the same API as Pandas Dataframes, except aggregations and applys are evaluated lazily, and need to be computed through calling the compute method.

In order to generate a Dask Dataframe you can simply call the read_csv method just as you would in Pandas or, given a Pandas Dataframe df, you can just calldd = ddf.

from_pandas(df, npartitions=N)Where ddf is the name you imported Dask Dataframes with, and npartitions is an argument telling the Dataframe how you want to partition it.

According to StackOverflow, it is advised to partition the Dataframe in about as many partitions as cores your computer has, or a couple times that number, as each partition will run on a different thread and communication between them will become too costly if there are too many.

Getting dirty: Let’s benchmark!I made a Jupyter Notebook to try out the framework, and made it available on Github in case you want to check it out or even run it for yourself.

The benchmarking tests I ran are available in the notebook at Github, but here are the main ones:Here df3 is a regular Pandas Dataframe with 25 million rows, generated using the script from the previous article (columns are name, surname and salary, sampled randomly from a list).

I took a 50 rows Dataset and concatenated it 500000 times, since I wasn’t too interested in the analysis per se, but only in the time it took to run it.

dfn is simply the Dask Dataframe based on df3.

First batch of results: not too optimisticI first tried the test with 3 partitions, as I only have 4 cores and didn’t want to overwork my PC.

I had pretty bad results with Dask and had to wait a lot to get them too, but I feared it may had been because I’d made too few partitions:204.

313940048 seconds for get_big_mean39.

7543280125 seconds for get_big_mean_old131.

600986004 seconds for get_big_max43.

7621600628 seconds for get_big_max_old120.

027213097 seconds for get_big_sum7.

49701309204 seconds for get_big_sum_old0.

581165790558 seconds for filter_df226.

700095892 seconds for filter_df_oldYou can see most of the operations turned a lot slower when I used Dask.

That gave me the hint that I may have had to use more partitions.

The amount that generating the lazy evaluations took was negligible as well (less than half a second in some cases), so it’s not like it would have got amortized over time if I reused them.

I also tried this test with the apply method:And had pretty similar results:369.

541605949 seconds for apply_random157.

643756866 seconds for apply_random_oldSo generally, most operations became twice as slow as the original, though filter was a lot faster.

I am worried maybe I should have called compute on that one as well, so take that result with a grain of salt.

More partitions: amazing speed upAfter such discouraging results, I decided maybe I was just not using enough partitions.

The whole point of this is running things in parallel, after all, so maybe I just needed to parallelize more?.So I tried the same tests with 8 partitions, and here’s what I got (I omitted the results from the non-parallel dataframe, since they were basically the same):3.

08352184296 seconds for get_big_mean1.

3314101696 seconds for get_big_max1.

21639800072 seconds for get_big_sum0.

228978157043 seconds for filter_df112.

135010004 seconds for apply_random50.

2007009983 seconds for value_count_testThat’s right!.Most operations are running over ten times faster than the regular Dataframe’s, and even the apply got faster!.I also ran the value_count test, which just calls the value_count method on the salary Series.

For context, keep in mind I had to kill the process when I ran this test on a regular Dataframe after ten whole minutes of waiting.

This time it only took 50 seconds! So basically I was just using the tool wrong, and it’s pretty darn fast.

A lot faster than regular Dataframes.

Final take-awayGiven we just operated with 25 million rows in under a minute on a pretty old 4-core PC, I can see how this would be huge in the industry.

So my advice is try this Framework out next time you have to process a Dataset locally or from a single AWS instance.

It’s pretty fast.

I hope you found this article interesting or useful!.It took a lot more time to write it than I anticipated, as some of the benchmarks took so long.

Please tell me if you’d ever heard of Dask before reading this, and whether you’ve ever used it in your job or for a project.

Also tell me if there are any other cool features I didn’t cover, or some things I did plain wrong!.Your feedback and comments are the biggest reason I write, as I am also learning from this.

Follow me for more Python tutorials, tips and tricks!.If you really liked this article, please consider supporting my writing.


. More details

Leave a Reply