Vectorized R I/O in Upcoming Apache Spark 3.0

R is one of the most popular computer languages in data science, specifically dedicated to statistical analysis with a number of extensions, such as RStudio addins and other R packages, for data processing and machine learning tasks.

Moreover, it enables data scientists to easily visualize their data set.

By using SparkR in Apache SparkTM, R codes can easily be scaled.

To interactively run jobs, you can easily run the distributed computation by running an R shell.

When SparkR does not require interaction with the R process, the performance is virtually identical to other language APIs such as Scala, Java and Python.

However, significant performance degradation happens when SparkR jobs interact with native R functions or data types.

Databricks Runtime introduced vectorization in SparkR to improve the performance of data I/O between Spark and R.

We are excited to announce that using the R APIs from Apache Arrow 0.


1, the vectorization is now available in the upcoming Apache Spark 3.

0 with the substantial performance improvements.

This blog post outlines Spark and R interaction inside SparkR, the current native implementation and the vectorized implementation in SparkR with benchmark results.

Spark and R interaction SparkR supports not only a rich set of ML and SQL-like APIs but also a set of APIs commonly used to directly interact with R code — for example, the seamless conversion of Spark DataFrame from/to R DataFrame, and the execution of R native functions on Spark DataFrame in a distributed manner.

In most cases, the performance is virtually consistent across other language APIs in Spark — for example, when user code relies on Spark UDFs and/or SQL APIs, the execution happens entirely inside the JVM with no performance penalty in I/O.

See the cases below which take ~1 second similarly.

// Scala API // ~1 second sql(“SELECT id FROM range(2000000000)”).

filter(“id > 10”).

count() # R API # ~1 second count(filter(sql(“SELECT * FROM range(2000000000)”), “id > 10”)) However, in cases where it requires to execute the R native function or convert it from/to R native types, the performance is hugely different as below.

// Scala API val ds = (1L to 100000L).

toDS // ~1 second ds.

mapPartitions(iter => iter.

filter(_ < 50000)).

count() # R API df.

Leave a Reply