How to handle large datasets in Python with Pandas and Dask

-f “.

/data/clickstream_data.

tsv.

gz” ] then wget https://dumps.

wikimedia.

org/other/clickstream/2018-12/clickstream-enwiki-2018-12.

tsv.

gz -O .

/data/clickstream_data.

tsv.

gz fi gunzip .

/data/clickstream_data.

tsv.

gzfiNow let’s have a look at what kind of data we have here and import it into the dataframe.

The very first memory optimization step we can perform already at this point (assuming we know our table structure by now) is specifying the columns data types during the import (via the dtype= input parameter).

That way, we can force Pandas to convert some values into types with a significantly lower memory footprint.

That may not make much sense if you’re dealing with a few thousand rows, but will make a noticeable difference in a few millions!For example, if you know that a column should only have positive integers, use unsigned integer type (uint32) instead of the regular int type (or worse — float, which may sometimes happen automatically).

df = pd.

read_csv(‘data/clickstream_data.

tsv’, delimiter=’ ’, names=[‘coming_from’, ‘article’, ‘referrer_type’, ‘n’], dtype={ ‘referrer_type’: ‘category’, ’n’: ‘uint32’})Finally, let’s limit the data frame size to the first 100k rows for the sake of speed.

Note that this is usually a bad idea; when sampling a subset, it’s far more appropriate to sample every nth row to get as much uniform sampling as possible.

But since we’re only using it to demonstrate the analysis process, we’re not going to bother:df = df.

iloc[:100000]Q1: Which links do people click on most often in a given article?To answer this question, we need to create a table where we can see the aggregated sum of visitors per article and per source of origin (coming_from column).

So, let’s aggregate the table over the article, the coming_from columns, sum up the ’n’ values, and then order the rows according to the ’n’ sums.

Here’s how we approach it in Pandas:top_links = df.

loc[df[‘referrer_type’].

isin([‘link’[‘coming_from’,’article’, ‘n’]] .

groupby([‘coming_from’, ‘article’]) .

sum() .

sort_values(by=’n’, ascending=False)And the resulting table:Pandas + DaskNow let’s recreate this data using the Dask library.

from dask import dataframe as dddfd = dd.

read_csv( ‘data/clickstream_data.

tsv’, delimiter=’ ’, names=[‘coming_from’, ‘article’, ‘referrer_type’, ‘n’], dtype={ ‘referrer_type’: ‘category’, ’n’: ‘uint32’}, blocksize=64000000 # = 64 Mb chunks)Note that the read_csv function is pretty similar to the Pandas one, except here we specify the byte-size per chunks.

We perform the aggregation logic which is also almost identical to Pandas:top_links_grouped_dask = dfd.

loc[ dfd[‘referrer_type’].

isin([‘link’]), [‘coming_from’,’article’, ‘n’]] .

groupby([‘coming_from’, ‘article’])That won’t do any calculations yet, the top_links_grouped_dask will be a Dask delayed dataframe object.

We can then launch it to be computed via the .

compute() method.

But we don’t want to clog our memory, so let’s save it directly to hard drive.

We will use the hdf5 file format to do that.

Let’s declare the hdf5 store then:store = pd.

HDFStore(‘.

/data/clickstream_store.

h5’)And compute the data frame into it.

Note that ordering column values with Dask isn’t that easy (after all, the data is read one chunk at a time), so we cannot use the sort_values() method like we did in the Pandas example.

Instead, we need to use the nlargest() Dask method and specify the number of top values we’d like to determine:top_links_dask = top_links_grouped_dask.

sum().

nlargest(20, ‘n’)It too returns a delayed Dask object, so to finally compute it (and save it to the store) we run the following:store.

put(‘top_links_dask’, top_links_dask.

compute(), format=’table’, data_columns=True)In this case, the result is different from the values in the Pandas example since here we work on the entire dataset, not just the first 100k rows:Q2: What are the most popular articles users access from all the external search engines?That one is easy.

All we need to do is to filter out the rows that contain the ‘external’ referrer_type and ‘other-search’ coming_from values:external_searches = df.

loc[ (df[‘referrer_type’].

isin([‘external’])) & (df[‘coming_from’].

isin([‘other-search’])), [‘article’, ‘n’]]We then only have to sort the values according to the number of visitors:most_popular_articles = external_searches.

sort_values( by=’n’, ascending=False).

head(40)and voila!Pandas + DaskHow about doing the same, but on the full dataset with Dask this time?external_searches_dask = dfd.

loc[ (dfd[‘referrer_type’].

isin([‘external’])) & (dfd[‘coming_from’].

isin([‘other-search’])), [‘article’, ‘n’]]Since we only need to store the top 40 results, we can simply store them directly in Pandas dataframe:external_searches_dask = external_searches_dask.

nlargest(40, ‘n’).

compute()which returns this (showing only the first 5 rows here):This is a great question to be answered graphically, so lets plot the first 40 top values:sns.

barplot(data=external_searches_dask, y=’article’, x=’n’)plt.

gca().

set_ylabel(‘’)Q3: What percentage of visitors to a given article page have clicked on a link to get there?The framing of this question suggests that we need to be able to calculate the fraction for a specific article title.

So let’s create a function that will take a dataframe and the desired article title, and then return the percentage value.

The function will have to filter the rows for a given article, sum up all the visitors count, and then find a cumulative sum of n for a subset of visits with the ‘link’ value in the the referrer_type column:def visitors_clicked_link_pandas(dataframe, article): df_article = dataframe.

loc[dataframe[‘article’].

isin([article])] a = df_article[‘n’].

sum() l = df_article.

loc[df_article[‘referrer_type’].

isin([‘link’]), ‘n’].

sum() return round((l*100)/a, 2)And let’s test in on one of the articles, say one with the title ”Jehangir_Wadia”:>>> visitors_clicked_link_pandas(df, ‘Jehangir_Wadia’)81.

1Which suggests that ~81% of the ”Jehangir_Wadia” article visitors arrive there by clicking on an external link.

Pandas + DaskHow can we extend that to the entire dataset using Dask?.Quite easily.

All we have to do is using the dask-dataframe instead of the Pandas ones and adding the .

compute() methods to two of the inner statements in the function, like that:def visitors_clicked_link_dask(dataframe, article): df_article = dataframe.

loc[dataframe[‘article’].

isin([article])] a = df_article[‘n’].

sum().

compute() l = df_article.

loc[df_article[‘referrer_type’].

isin([‘link’]), ‘n’].

sum().

compute() return round((l*100)/a, 2)Running the function will return the same result:>>> visitors_clicked_link_dask(dfd, ‘Jehangir_Wadia’)81.

1Q4: What is the most common source of visits for each article?To answer this question, we require two columns: one for the destination article and the origin title, as well as the sum of the number of visits.

Furthermore, we have to filter out the rows with the highest number of visitors per article.

First, let’s get rid of all the unnecessary extra columns by aggregating and summing up all the ’n’ counts over the referrer_type for every coming_from/article combination:summed_articles = df.

groupby([‘article’, ‘coming_from’]).

sum()Next, let’s find the referrers (coming_from) that generated the highest number of visitors for each article page.

One way to do that is using a filter table with the desired rows indices via the df.

iloc[] method.

So let’s find those for the summed_articles table which correspond to the highest ’n’ per article.

We’ll use a nifty Pandas method called idxmax which returns the indices of the grouped column with max values.

Aggregating the summed_articles again, this time over the coming_from column, we can run it like this:max_n_filter = summed_articles.

reset_index().

groupby(‘article’).

idxmax()Let’s preview the filter first:Now we can filter out the summed_articles rows with this table:summed_articles.

iloc[max_n_filter[‘n’]].

head(4)Finally, we need to sort the values by the highest number of visitors:summed_articles.

iloc[max_n_filter[‘n’]] .

sort_values(by=’n’, ascending=False) .

head(10)Done!Pandas + DaskNow, let’s try recreating this moderately-complex task in Dask on the full data set.

The first step is easy.

We can create a table with summed_articles like this without any issues:summed_articles = dfd.

groupby([‘article’, ‘coming_from’]) .

sum() .

reset_index() .

compute()But it’s best not to store it in memory — we’ll have to perform the aggregation later on and that will be memory-demanding.

So let’s write it down (as its being calculated) directly to hard drive instead, for example hdf5 or a parquet file:dfd.

groupby([‘article’, ‘coming_from’]) .

sum() .

reset_index() .

to_parquet(‘.

/summed_articles.

parquet’, engine=’pyarrow’)So far so good.

Step two is creating the filter table.

That’s where the problems begin: at the time of writing this article, Dask dataframes have no idxmax() implementation available.

We’d have to improvise somehow.

1st methodFor example, we could copy the summed_articles index into a new column and output it via a custom apply function.

However, there’s another problem — Dask partitioning of the data means that we can’t use iloc to filter specific rows (it requires the “:” value for all rows).

We could try using a loc method and select rows by checking if their indices are present in the list of previously-determined filter table, but that would be a huge computational overhead.

What a bummer.

2nd methodHere’s another approach: we could write a custom function for processing aggregated data and use it with the groupby-apply combination.

That way, we can overcome all the above issues quite easily.

But then… the apply method works by concatenating all of the data output from the individually processed subsets of rows into one final table, which means it will have to be transiently stored in one piece in memory, unfortunately…Depending on our luck with the data, it can be small enough or not.

I tried that a couple of times and found it clogs my (16BG RAM laptop) memory, forcing the notebook kernel to restart eventually.

3rd methodNot giving up, I resorted to the dark side of solutions by attempting to iterate over individual groups, find the right row, and append it to an hdf5/parquet storage on disk.

First problem: DaskGroupBy object has no implementation of iteritem method (at the time of writing), so we can’t use the for-in logic.

Finally, we can find all the article/coming_from unique combinations and iterate over these values to group the summed_articles rows ourselves with the get_group() method:dfd[[‘article’, ‘coming_from’]] .

drop_duplicates() .

to_parquet(‘.

/uniques.

parquet’)for item in pd.

read_parquet(‘.

/uniques.

parquet’, engine=’pyarrow’).

itertuples(): t = dfd.

groupby([‘article’,‘coming_from’]) .

get_group(item) .

compute() .

That should work, but the process would be incredibly slow.

That’s why I gave up on using Dask for this problem.

The point I’m trying to make here is that not all data-oriented problems can be solved (easily) with Pandas.

Sure, one can invest in massive amounts of RAM, but most of the time, that’s just not the way to go — certainly not for a regular data-guy with a laptop.

That type of problems are still best tackled with the good old SQL and a relational database where even a simple SQLite could perform better and in a very reasonable time.

We can solve this problem in several ways.

Here’s one of them.

My solution is based on storing data in a PostgreSQL database and performing a composite query with the help of PARTITION BY and ROW_NUMBER functions.

I use the PostgreSQL database here, but it could just as well be the latest SQLite3 too (version 3.

25 or later) as it now supports the partition by functionality — and that’s what we need for our solution.

To enable saving results I created a new PostgreSQL database ‘clickstream’ running locally in a Docker container and connected to it from the Jupyter Notebook via an SQLAlchemy interface engine:import psycopg2from sqlalchemy import create engineengine = create_engine(‘postgres://<db hostname>/clickstream’)conn = psycopg2.

connect( dbname=”clickstream”, user=”postgres”, password=”<secure-password>”, host=”0.

0.

0.

0")cur = conn.

cursor()We then perform the summation of the Dask dataframe on the group by article and coming_from columns, and clean up the string data from tabs and return characters, that would interfere with the PostgreSQL upload:summed_articles = dfd.

groupby([‘article’, ‘coming_from’]) .

sum() .

reset_index() .

compute()for c in [' ', '.', '']: summed_articles[‘article’] = summed_articles[‘article’].

str.

replace(c, ‘ ‘)summed_articles[‘coming_from’] = summed_articles[‘coming_from’].

str.

replace(‘ ’, ‘ ‘)Again, at this point we still haven’t performed any editing and summed_articles is still a delayed Dask object.

One last thing to do before uploading the dataframe to the database is creating an empty table in an existing database, so sending an empty table with the right column names will do the trick quite well:pd.

DataFrame(columns=summed_articles.

columns).

to_sql( ‘summed_articles’, con=engine, if_exists=’replace’, index=False)And finally, let’s upload data into it.

Note, that at the time of writing the Dask dataframe offers no to_sql method, so we can use another trick to do it quickly chunk by chunk:for n in range(summed_articles.

npartitions):table_chunk = summed_articles.

get_partition(n).

compute() output = io.

StringIO() table_chunk.

to_csv(output, sep=’ ’, header=False, index=False) output.

seek(0) try: cur.

copy_from(output, ‘summed_articles’, null=””) except Exception: err_tables.

append(table_chunk) conn.

rollback() continue conn.

commit()Next, we create a SELECT statement that partitions the rows by article, orders them locally by the number of visits column ’n’ and indexes the ordered groups incrementally with integers (starting with 1 for every partitioned subset):SELECT row_number() OVER ( PARTITION BY article ORDER BY n DESC ) ArticleNR, article, coming_from, nFROM article_sumThen we aggregate the rows again by the article column and return only those with the index equal to 1, essentially filtering out the rows with the maximum ’n’ values for a given article.

Here is the full SQL query:SELECT t.

article, t.

coming_from, t.

n FROM ( SELECT row_number() OVER ( PARTITION BY article ORDER BY n DESC ) ArticleNR, article, coming_from, n FROM article_sum ) tWHERE t.

ArticleNR = 1ORDER BY n DESC;The above SQL query was then executed against the database via the:q = engine.

execute(‘’’<SELECT statement here>’’’).

fetchall()pd.

DataFrame(q, columns=[‘article’, ‘coming_from’, ‘n’]).

head(20)And voila, our table is ready.

Also, apparently the difference between a hyphen and minus kept a lot of people awake at night in 2018:I hope this guide helps you deal with larger datasets in Python using the Pandas + Dask combo.

It’s clear that some complex analytical tasks are still best handled with other technologies like the good old relational database and SQL.

Note 1: While using Dask, every dask-dataframe chunk, as well as the final output (converted into a Pandas dataframe), MUST be small enough to fit into the memory.

Note 2: Here are some useful tools that help to keep an eye on data-size related issues:%timeit magic function in the Jupyter Notebookdf.

memory_usage()ResourceProfiler from dask.

diagnosticsProgressBarsys.

getsizeofgc.

collect().. More details

Leave a Reply