R is one of the most popular computer languages in data science, specifically dedicated to statistical analysis with a number of extensions, such as RStudio addins and other R packages, for data processing and machine learning tasks.
Moreover, it enables data scientists to easily visualize their data set.
By using SparkR in Apache SparkTM, R codes can easily be scaled.
To interactively run jobs, you can easily run the distributed computation by running an R shell.
When SparkR does not require interaction with the R process, the performance is virtually identical to other language APIs such as Scala, Java and Python.
However, significant performance degradation happens when SparkR jobs interact with native R functions or data types.
Databricks Runtime introduced vectorization in SparkR to improve the performance of data I/O between Spark and R.
We are excited to announce that using the R APIs from Apache Arrow 0.
15.
1, the vectorization is now available in the upcoming Apache Spark 3.
0 with the substantial performance improvements.
This blog post outlines Spark and R interaction inside SparkR, the current native implementation and the vectorized implementation in SparkR with benchmark results.
Spark and R interaction SparkR supports not only a rich set of ML and SQL-like APIs but also a set of APIs commonly used to directly interact with R code — for example, the seamless conversion of Spark DataFrame from/to R DataFrame, and the execution of R native functions on Spark DataFrame in a distributed manner.
In most cases, the performance is virtually consistent across other language APIs in Spark — for example, when user code relies on Spark UDFs and/or SQL APIs, the execution happens entirely inside the JVM with no performance penalty in I/O.
See the cases below which take ~1 second similarly.
// Scala API // ~1 second sql(“SELECT id FROM range(2000000000)”).
filter(“id > 10”).
count() # R API # ~1 second count(filter(sql(“SELECT * FROM range(2000000000)”), “id > 10”)) However, in cases where it requires to execute the R native function or convert it from/to R native types, the performance is hugely different as below.
// Scala API val ds = (1L to 100000L).
toDS // ~1 second ds.
mapPartitions(iter => iter.
filter(_ < 50000)).
count() # R API df.