Real-world Python workloads on Spark: Standalone clusters

Perhaps it generates dynamic SQL for Spark to execute, or refreshes models using Spark’s output.

As your Python code becomes more of an app (with a directory structure, configuration files, and library dependencies), submitting it to Spark requires a bit more consideration.

Below are the alternatives I recently considered when taking one such Python application to production using Spark 2.

3.

This first article focuses on Spark standalone clusters.

A separate article covers EMR Spark (YARN).

I am far from an authority on Spark let alone Python.

My decisions attempted to balance correctness with ease of deployment, and the limitations imposed by the app with those of the cluster.

Let me know what you think.

Sample Python applicationTo simulate a complete application, the scenarios below assume a Python 3 application with the following structure:project.

pydata/ data_source.

py data_source.

inidata_source.

ini contains various configuration parameters:[spark]app_name = My PySpark Appmaster_url = spark://sparkmaster:7077data_source.

py is a module responsible for sourcing and processing data in Spark, making math transformations with NumPy, and returning a Pandas dataframe to the client.

Dependencies:from pyspark import SparkConf, SparkContextfrom pyspark.

sql import SparkSessionfrom pyspark.

sql.

types import StructType, StructField, FloatTypeimport pandas as pdimport numpy as npimport configparserIt defines a DataSource class that creates a SparkContext and SparkSession on initialization…class DataSource: def __init__(self): config = configparser.

ConfigParser() config.

read('.

/data/data_source.

ini') master_url = config['spark']['master_url'] app_name = config['spark']['app_name'] conf = SparkConf().

setAppName(app_name) .

setMaster(master_url) self.

sc = SparkContext(conf=conf) self.

spark = SparkSession.

builder .

config(conf=conf) .

getOrCreate()…and a get_data() method that:Creates an RDD from a NumPy normal distribution.

Applies a function to double the value of every element.

Converts the RDD into a Spark dataframe and defines a temp view on top.

Applies a Python UDF that squares the contents of every dataframe element using SQL.

Returns the results to the client as a Pandas dataframe.

def get_data(self, num_elements=1000) -> pd.

DataFrame: mu, sigma = 2, 0.

5 v = np.

random.

normal(mu, sigma, num_elements) rdd1 = self.

sc.

parallelize(v) def mult(x): return x * np.

array([2]) rdd2 = rdd1.

map(mult).

map(lambda x: (float(x),)) schema = StructType([StructField("value", FloatType(), True)]) df1 = self.

spark.

createDataFrame(rdd2, schema) df1.

registerTempTable("test") def square(x): return x ** 2 self.

spark.

udf.

register("squared", square) df2 = self.

spark.

sql("SELECT squared(value) squared FROM test") return df2.

toPandas()project.

py is our main program, acting as a client of the above module:from data.

data_source import DataSourcedef main(): src = DataSource() df = src.

get_data(num_elements=100000) print(f"Got Pandas dataframe with {df.

size} elements") print(df.

head(10))main()Clone the repo: https://bitbucket.

org/calidoteam/pyspark.

gitBefore we begin, let’s review the options available when submitting work to Spark.

spark-submit, client and cluster modesSpark supports various cluster managers: Standalone (i.

e.

built into Spark), Hadoop’s YARN, Mesos, Kubernetes, all of which control how your workload runs on a set of resources.

spark-submit is the only interface that works consistently with all cluster managers.

For Python applications, spark-submit can upload and stage all dependencies you provide as .

py, .

zip or .

egg files when needed.

In client mode, your Python program (i.

e.

driver) will run on the same host where spark-submit runs.

It is in your best interest to make sure such host is close to your worker nodes to reduce network latency.

In cluster mode, your Python program (i.

e.

driver) and dependencies will be uploaded to and run from some worker node.

This is useful when submitting jobs from a remote host.

As of Spark 2.

4.

0 cluster mode is not an option when running on Spark standalone.

Alternatively, it is possible to bypass spark-submit by configuring the SparkSession in your Python app to connect to the cluster.

This requires the right configuration and matching PySpark binaries.

Your Python app will effectively be running in client mode: It will run from wherever host you launched it.

The following sections describe several deployment alternatives and what configuration was required in each.

#1: Direct connect to Spark (client mode, no spark-submit)Python application in client mode against Spark standaloneThis is the simplest deployment scenario: The Python app directly establishes a spark context by pointing to a Spark master URL, and uses it to submit work:conf = SparkConf().

setAppName("My PySpark App") .

setMaster("spark://192.

168.

1.

10:7077")sc = SparkContext(conf=conf)spark = SparkSession.

builder .

config(conf=conf) .

getOrCreate()In a standalone cluster, resources get allocated for the duration of the job, and the default configuration gives client applications all available resources, thus requiring minor tuning for multi-tenant environments.

Executor processes (JVM or python) are launched by Worker processes local to each node.

This resembles a traditional client-server application in that the client simply “connects” to a “remote” cluster.

Recommendations:Ensure there is plenty of bandwidth between your driver and the cluster.

Most of the network activity happens between the driver and its executors, so this “remote” cluster must actually be within close proximity (LAN).

Improve Java-Python serialization by enabling Apache Arrow: Python workloads (NumPy, Pandas and other transformations applied to Spark RDDs, dataframes and datasets) require by default lots of serialization and deserialization to and from Java and Python processes and will quickly degrade performance.

Starting with Spark 2.

3, enabling Apache Arrow (included in the steps listed below) makes those transfers vastly more efficient.

Deploy dependencies across all cluster nodes and driver host.

This includes downloading and installing Python 3, pip-installing PySpark (must match the version of the target cluster), PyArrow, as well as other library dependencies:sudo yum install python36pip install pyspark==2.

3.

1 pip install pyspark[sql]pip install numpy pandas msgpack sklearnNote: While installing a large library like PySpark (~200MB), you might run into an error ending in “MemoryError".

If so, try:pip install –no-cache-dir pyspark==2.

3.

1Configuration and environment variables: On the client side, $SPARK_HOME must point to the location where pip installed PySpark:$ pip show pysparkName: pysparkVersion: 2.

3.

1Summary: Apache Spark Python APIHome-page: https://github.

com/apache/spark/tree/master/pythonAuthor: Spark DevelopersAuthor-email: dev@spark.

apache.

orgLicense: http://www.

apache.

org/licenses/LICENSE-2.

0Location: /opt/anaconda/lib/python3.

6/site-packagesRequires: py4j$ export SPARK_HOME=/opt/anaconda/lib/python3.

6/site-packagesOn every cluster node, set additional default parameters and environment variables.

Specifically, for Python apps:$SPARK_HOME/conf/spark-defaults.

shspark.

sql.

execution.

arrow.

enabled true$SPARK_HOME/conf/spark-env.

shexport PYSPARK_PYTHON=/usr/bin/python3 : Python executable, all nodes.

export PYSPARK_DRIVER_PYTHON=/usr/bin/python3 : Python executable for the driver, if different from executor nodes.

Note: Environment variables are read from wherever spark-submit is launched, not necessarily from within cluster hosts.

Running itSubmitting the workload to the cluster is simply matter of running the Python application (e.

g.

spark-submit is not required):$ cd my-project-dir/$ python3 project.

pyAt runtime, it’s possible to see slave nodes running multiple python3 processes working on the job:Python processes on slave node#2: Containerized app (client mode, no spark-submit)Containerized Python application in client mode against Spark standaloneThis is an extension of the previous scenario whereby it be desirable to run the Python app as a Docker container as part of a CI/CD pipeline, for portability reasons, etc.

In addition to the configuration recommended for the previous scenario, the following is required:Build the container to include all dependencies: Start from an image that includes Python 3 and/or the Java 8 OpenJDK, then pip-install PySpark, PyArrow and all other libraries required by the application.

Configure Spark driver host and ports, open them in the container: This is required in order for executors to reach the driver inside the container.

Spark properties for the driver can be set programmatically (spark.

conf.

set(“property”, “value”)):spark.

driver.

host : host_ip_address (e.

g.

192.

168.

1.

10)spark.

driver.

port : static_port (e.

g.

51400)spark.

driver.

bindAddress : container_internal_ip (e.

g.

10.

192.

6.

81)spark.

driver.

blockManagerPort : static_port (e.

g.

51500)In Docker, ports then can be exposed to the outside from the command line using the -p option: -p 51400:51400 -p 51500:51500.

Other articles suggest simply publishing this port range: -p 5000–5010:5000–5010Running itAs with the previous scenario, running the container will start the Python driver program:docker run -p 51400:51400 -p 51500:51500 <docker_image_url>#3: Python app via spark-submit (client mode)This scenario is virtually identical to scenario #1 and included here only for clarity.

The only difference is the fact that the Python app is launched using the spark-submitprocess.

Cluster events are sent to stdout in addition to log files:$ cd my-project-dir/$ ls -l rwxrwxr-x.

3 centos centos 70 Feb 25 02:11 data-rw-rw-r–.

1 centos centos 220 Feb 25 01:09 project.

py$ spark-submit project.

pyNotes:In my experience, it wasn’t necessary to pass dependent subdirectories/files when calling spark-submit as long as it was invoked from the project root directory (my-project-dir/).

Since the sample app already specifies a master URL, it isn’t necessary to pass one to spark-submit.

Otherwise, a more complete command would be:$ spark-submit –master spark://sparkcas1:7077 –deploy-mode client project.

pyAs of Spark 2.

3, it is not possible to submit Python apps in cluster mode to a standalone Spark cluster.

Doing so yields an error:$ spark-submit –master spark://sparkcas1:7077 –deploy-mode cluster project.

pyError: Cluster deploy mode is currently not supported for python applications on standalone clusters.

Takeaways— Python on Spark standalone clusters:Although standalone clusters aren’t popular in production (maybe because commercially supported distributions include a cluster manager), they have a smaller footprint and do a good job as long as multi-tenancy and dynamic resource allocation aren’t a requirement.

For Python apps, deployment options are limited to client mode.

Using Docker to containerize the Python app has all the expected advantages and is well suited for client mode deployments.

.. More details

Leave a Reply