Guest Blog: How Virgin Hyperloop One reduced processing time from hours to minutes with Koalas

For most of the pandas scripts, you can even try to change the import pandas databricks.

koalas as pd, and some scripts will run fine with minor adjustments, with some limitations explained below.

Results All the snippets have been verified to return the same pod-trip-times results.

The describe and summary methods for pandas and Spark are slightly different, as explained here but this should not affect performance too much.

Sample results: Advanced Example: UDFs and complicated operations We’re now going to try to solve a more complex problem with the same dataframe, and see how pandas and Koalas implementations differ.

Goal: Analyze the average speed per pod-trip: Group by [pod_id,trip id] For every pod-trip calculate the total distance travelled by finding the area below the velocity (time) chart (method explained here): Sort the grouped df by timestamp column.

Calculate diffs of timestamps.

Multiply the diffs with the speed – this will result in the distance traveled in that time diff.

Sum the distance_travelled column – this will give us total distance travelled per pod-trip.

Calculate the trip time as timestamp.

last – timestamp.

first (as in the previous paragraph).

Calculate the average_speed as distance_travelled / trip time.

Calculate distribution of the pod-trip times (mean, stddev).

We decided to implement this task using a custom apply function and UDF (user defined functions).

The pandas way: (snippet #4) import pandas as pd def calc_distance_from_speed( gdf ): gdf = gdf.

sort_values(timestamp) gdf[time_diff] = gdf[timestamp].

diff() return pd.

DataFrame({ distance_miles:[ (gdf[time_diff]*gdf[speed_mph]).

sum()], travel_time_sec: [ gdf[timestamp].

iloc[-1] – gdf[timestamp].

iloc[0] ] }) results = df.

groupby([pod_id,trip_id]).

apply( calculate_distance_from_speed) results[distance_km] = results[distance_miles] * 1.

609 results[avg_speed_mph] = results[distance_miles] / results[travel_time_sec] / 60.

0 results[avg_speed_kph] = results[avg_speed_mph] * 1.

609 results.

describe() The PySpark way: (snippet #5) import databricks.

koalas as ks from pyspark.

sql.

functions import pandas_udf, PandasUDFType from pyspark.

sql.

types import * import pyspark.

sql.

functions as F schema = StructType([ StructField(“pod_id”, StringType()), StructField(“trip_id”, StringType()), StructField(“distance_miles”, DoubleType()), StructField(“travel_time_sec”, DoubleType()) ]) @pandas_udf(schema, PandasUDFType.

GROUPED_MAP) def calculate_distance_from_speed( gdf ): gdf = gdf.

sort_values(timestamp) print(gdf) gdf[time_diff] = gdf[timestamp].

diff() return pd.

DataFrame({ pod_id:[gdf[pod_id].

iloc[0]], trip_id:[gdf[trip_id].

iloc[0]], distance_miles:[ (gdf[time_diff]*gdf[speed_mph]).

sum()], travel_time_sec: [ gdf[timestamp].

iloc[-1]-gdf[timestamp].

iloc[0] ] }) sdf = spark_df.

groupby(“pod_id”,”trip_id”).

apply(calculate_distance_from_speed) sdf = sdf.

withColumn(distance_km,F.

col(distance_miles) * 1.

609) sdf = sdf.

withColumn(avg_speed_mph,F.

col(distance_miles)/ F.

col(travel_time_sec) / 60.

0) sdf = sdf.

withColumn(avg_speed_kph,F.

col(avg_speed_mph) * 1.

609) sdf = sdf.

orderBy(sdf.

pod_id,sdf.

trip_id) sdf.

summary().

toPandas() # summary calculates almost the same results as describe The Koalas way: (snippet #6) import databricks.

koalas as ks def calc_distance_from_speed_ks( gdf ) -> ks.

DataFrame[ str, str, float , float]: gdf = gdf.

sort_values(timestamp) gdf[meanspeed] = (gdf[timestamp].

diff()*gdf[speed_mph]).

sum() gdf[triptime] = (gdf[timestamp].

iloc[-1] – gdf[timestamp].

iloc[0]) return gdf[[pod_id,trip_id,meanspeed,triptime]].

iloc[0:1] kdf = ks.

from_pandas(df) results = kdf.

groupby([pod_id,trip_id]).

apply( calculate_distance_from_speed_ks) # due to current limitations of the package, groupby.

apply() returns c0 .

c3 column names results.

columns = [pod_id, trip_id, distance_miles, travel_time_sec] # spark groupby does not set the groupby cols as index and does not sort them results = results.

set_index([pod_id,trip_id]).

sort_index() results[distance_km] = results[distance_miles] * 1.

609 results[avg_speed_mph] = results[distance_miles] / results[travel_time_sec] / 60.

0 results[avg_speed_kph] = results[avg_speed_mph] * 1.

609 results.

describe() Koalas’ implementation of apply is based on PySpark’s pandas_udf which requires schema information, and this is why the definition of the function has to also define the type hint.

The authors of the package introduced new custom type hints, ks.

DataFrame and ks.

Series.

Unfortunately, the current implementation of the apply method is quite cumbersome, and it took a bit of an effort to arrive at the same result (column names change, groupby keys not returned).

However, all the behaviors are appropriately explained in the package documentation.

Performance To assess the performance of Koalas, we profiled the code snippets for different number of rows.

The profiling experiment was done on Databricks platform, using the following cluster configurations: Spark driver node (also used to execute the pandas scripts): 8 CPU cores, 61GB RAM.

15 Spark worker nodes: 4CPU cores, 30.

5GB RAM each (sum: 60CPUs / 457.

5GB ).

Every experiment was repeated 10 times, and the clips shown below are indicating the min and max times for the executions.

Basic ops When the data is small, the initialization operations and data transfer are huge in comparison to the computations, so pandas is much faster (marker a).

For larger amounts of data, pandas’ processing times exceed distributed solutions (marker b).

We can then observe some performance hits for Koalas, but it gets closer to PySpark as data increases (marker c).

UDFs For the UDF profiling, as specified in PySpark and Koalas documentation, the performance decreases dramatically.

This is why we needed to decrease the number of rows we tested with by 100x vs the basic ops case.

For each test case, Koalas and PySpark show a striking similarity in performance, indicating a consistent underlying implementation.

During experimentation, we discovered that there exists a much faster way of executing that set of operations using PySpark windows functionality, however this is not currently implemented in Koalas so we decided to only compare UDF versions.

Discussion Koalas seems to be the right choice if you want to make your pandas code immediately scalable and executable on bigger datasets that are not possible to process on a single node.

After the quick swap to Koalas, just by scaling your Spark cluster, you can allow bigger datasets and improve the processing times significantly.

Your performance should be comparable (but 5 to 50% lower, depending on the dataset size and the cluster) with PySpark’s.

On the other hand, the Koalas API layer does cause a visible performance hit, especially in comparison to the native Spark.

At the end of the day, if computational performance is your key priority, you should consider switching from Python to Scala.

Limitations and differences During your first few hours with Koalas, you might wonder, “Why is this not implemented?!” Currently, the package is still under development and is missing some pandas API functionality, but much of it should be implemented in the next few months (for example groupby.

diff() or kdf.

rename()).

Also from my experience as a contributor to the project, some of the features are either too complicated to implement with Spark API or were skipped due to a significant performance hit.

For example, DataFrame.

values requires materializing the entire working set in a single node’s memory, and so is suboptimal and sometimes not even possible.

Instead if you need to retrieve some final results on the driver, you can call DataFrame.

to_pandas() or DataFrame.

to_numpy().

Another important thing to mention is that Koalas’ execution chain is different from pandas’: when executing the operations on the dataframe, they are put on a queue of operations but not executed.

Only when the results are needed, e.

g.

when calling kdf.

head() or kdf.

to_pandas() the operations will be executed.

That could be misleading for somebody who never worked with Spark, since pandas does everything line-by-line.

Conclusions Koalas helped us to reduce the burden to “Spark-ify” our pandas code.

If you’re also struggling with scaling your pandas code, you should try it too!.If you are desperately missing any behavior or found inconsistencies with pandas, please open an issue so that as a community we can ensure that the package is actively and continually improved.

Also, feel free to contribute!.Resources Koalas github: https://github.

com/databricks/koalas Koalas documentation: https://koalas.

readthedocs.

io Code snippets from this article: https://gist.

github.

com/patryk-oleniuk/043f97ae9c405cbd13b6977e7e6d6fbc .

Try Databricks for free.

Get started today.

. More details

Leave a Reply