Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

Run Spark independently


May 17, 2021 Spark Programming guide


Table of contents


Spark stand-alone deployment mode

Install the Spark stand-alone mode cluster

To install Spark stand-alone mode, you only need to simply put the compiled version of Spark into each node of the cluster. You can get a precompiled version of each stable version, or you can compile it yourself.

Start the cluster manually

You can start a separate master server in the following way.

./sbin/start-master.sh

Once started, master will print a spark://HOST:PORT URL for yourself that you can use to connect to the workers or pass it to SparkContext You can also find this URL on the master web UI, where the default address is http://localhost:8080

Similarly, you can start one or more workers or connect them to master.

./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT

Once you start a worker, check out the master web UI. You can see the new list of nodes, as well as the number of CPUs and memory of the nodes.

The following configuration parameters can be passed to the master and worker.

Argument Meaning
-h HOST, --host HOST The host name of the listening
-i HOST, --ip HOST Irth, it has been eliminated
-p PORT, --port PORT Listen to the port of the service (master default is 7077, worker random)
--webui-port PORT Web UI port (master default is 8080, worker default is 8081)
-c CORES, --cores CORES The CPU cores that spark applications can use (all are available by default);
-m MEM, --memory MEM The amount of memory that a Spark application can use (the default is to subtract 1g from your machine memory);
-d DIR, --work-dir DIR The directory for staging space and work output logs (the default is SPARK_HOME/work);
--properties-file FILE Load directory of custom Spark profiles (conf/spark-defaults.conf by default)

The cluster startup script

In order to start a Spark stand-alone cluster with a startup script, you should set up a conf/slaves must contain the host name of all the machines on which Spark worker is starting, one line at a time. I f conf/slaves not exist, the startup script defaults to a single machine (localhost), which is useful for testing. N ote that the master machine accesses all workers through ssh. B y default, SSH runs in parallel and requires password-free access (with a private key). If you don't have password-free access, you can SPARK_SSH_FOREGROUND variable to provide a password for each worker.

Once you set up this file, you can start or stop your cluster through the shell script below.

  • The sbin/start-master.sh: starts a master instance on the machine
  • The sbin/start-slaves.sh: starts to launch a slave instance on each machine
  • The sbin/start-all.sh: launches one master instance and all slave instances at the same time
  • The sbin/stop-master.sh: stop the master instance
  • sbin/stop-slaves.sh: stop all slave instances
  • sbin/stop-all.sh: stop master instances and all slave instances

Note that these scripts must be executed on the machine on which your Spark master runs, not on your local machine.

You can further configure the cluster env.sh environment variables in conf/spark-env.sh C reate conf/spark-env.sh.template and copy it to all worker machines to make the settings work. The following settings work:

Environment Variable Meaning
SPARK_MASTER_IP Bind master to a specified ip address
SPARK_MASTER_PORT Start master on different ports (default 7077)
SPARK_MASTER_WEBUI_PORT Port of master web UI (8080 by default)
SPARK_MASTER_OPTS The configuration properties applied to the master are in the format "-Dx-y" (none by default), and check the options in the table below to form a possible list
SPARK_LOCAL_DIRS The directory of staging spaces in Spark. I ncludes map output files and RDDs stored on disk (map output files and RDDs that get stored on disk). T his must be on a fast, local disk of your system. It can be a comma-separated list that represents multiple directories of different disks
SPARK_WORKER_CORES The number of cores that Spark applications can use (all available by default)
SPARK_WORKER_MEMORY The total amount of memory used by spark applications (the default is the total amount of memory minus 1G). Note that the memory of each application individual is set spark.executor.memory
SPARK_WORKER_PORT Start Spark worker on the specified port (random by default)
SPARK_WORKER_WEBUI_PORT Port of worker UI (8081 by default)
SPARK_WORKER_INSTANCES The number of worker instances running per machine, the default being 1. I f you have a very large machine and want to run multiple workers, you can set this number to greater than 1. If you set this environment variable, make sure that SPARK_WORKER_CORES environment variable to limit the number of cores per worker or for each worker to try to use all the cores.
SPARK_WORKER_DIR Spark worker runs a directory that includes logs and staging space (the default is SPARK_HOME/work)
SPARK_WORKER_OPTS Apply to the worker's configuration properties in the format "-Dx-y" (the default is none), and review the options in the table below to form a possible list
SPARK_DAEMON_MEMORY Memory allocated to Spark master and worker daemons (512m by default)
SPARK_DAEMON_JAVA_OPTS The JVM option for Spark master and worker daemons in the format "-Dx-y" (none by default)
SPARK_PUBLIC_DNS Spark master and worker public DNS name (none by default)

Note that the startup script does not support windows yet. In order to start the Spark cluster on windows, master and workers need to be started manually.

SPARK_MASTER_OPTS system properties:

Property Name Default Meaning
spark.deploy.retainedApplications 200 Show the maximum number of completed applications. Older applications are removed to meet this limit
spark.deploy.retainedDrivers 200 Show the maximum number of drivers completed. Older applications are removed to meet this limit
spark.deploy.spreadOut true This option controls whether a stand-alone cluster manager should deliver applications across nodes or should strive to consolidate programs into as few nodes as possible. In HDFS, the passer is a better choice for data localization, but consolidation is more efficient for computationally intensive loads.
spark.deploy.defaultCores (infinite) In Spark stand-alone mode, give the application the default number of cores (if spark.cores.max I f not set, the total number of applications gets all available cores unless spark.cores.max Setting a lower number of cores on a shared cluster prevents users from grabbing the entire cluster by default.
spark.worker.timeout 60 The interval between when the independently deployed master believes that the worker failed (no heartbeat message was received).

SPARK_WORKER_OPTS system properties:

Property Name Default Meaning
spark.worker.cleanup.enabled false Periodically empty the worker/application directory. N ote that this only affects stand-alone deployment patterns. The program directory is emptied regardless of whether the application is still executing
spark.worker.cleanup.interval 1800 (30 min) On the local machine, the worker emptys the interval between the old application working directories
spark.worker.cleanup.appDataTtl 7 24 3600 (7 days) The retention time of the application working directory in each worker. T his time depends on the amount of disk space available to you. A pplication logs and jar packages are uploaded to the working directory of each application. Over time, the working directory fills up disk space quickly, especially if you are running frequent jobs.

Connect an application to the cluster

To run an application in a Spark cluster, simply pass spark://IP:PORT URL to SparkContext

In order to run an interactive Spark shell on the cluster, run the command:

./bin/spark-shell --master spark://IP:PORT

You can also pass an --total-executor-cores <numCores> to control the number of cores in the spark-shell.

Start the Spark application

Spark-submit scripts support the most direct submission of a Spark application to the cluster. F or stand-alone clusters, Spark currently supports two deployment modes. I n client the driver startup process is the same process as the process in which the client submits the application. However, in cluster the driver starts in one of the cluster's worker processes, and only if the client process completes the commit task, it does not exit until the application is complete.

If your application starts with Spark submit, your application jar package will be automatically distributed to all worker nodes. For other jar packages on which your application depends, you --jars symbol --jars jar1,jar2

In addition, cluster mode supports the automatic restart of your application (if the program exits with a non-zero exit code). T o use this feature, when you start the application, you can pass --supervise symbol spark-submit If you want to kill repeatedly failed apps, you can do so by:

./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>

You can find the driver ID in the http://:8080's Master web UI (7000).

Resource scheduling

The independently deployed cluster pattern supports only simple FIFO schedulers. H owever, to allow multiple parallel users, you can control the maximum number of resources that each application can use. B y default, it gets all the cores of the cluster, which makes sense only if only one application is allowed at a certain point. You can set the number of .max in SparkConf via spark.cores.max

val conf = new SparkConf()
             .setMaster(...)
             .setAppName(...)
             .set("spark.cores.max", "10")
val sc = new SparkContext(conf)

In addition, you can configure spark.deploy.defaultCores to change the default values. Add conf/spark-env.sh the table:

export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"

This is useful in shared clusters in which the user does not have a maximum number of cores configured.

Highly available

By default, a stand-alone scheduling cluster is resilient to worker failures (within Spark's own range, and for lost work by transferring it to another worker). H owever, the scheduler executes the scheduling decision through the master, which causes a single point of failure: if the master dies, the new application cannot be created. To avoid this, we have two highly available modes.

With ZooKeeper's spare master

With ZooKeeper to support leadership elections and some status stores, you can launch multiple masters in your cluster that connect to the same ZooKeeper instance. O ne was chosen as the "leader" and the other remained in standby mode. I f the current leader dies, another master will be selected to restore the old master's state, and then resume scheduling. T he entire recovery process takes about 1 to 2 minutes. Note that this recovery time only affects scheduling new applications - applications running in failed master are not affected.

Configuration

To turn on this recovery mode, you can use the following spark-env up the SPARK_DAEMON_JAVA_OPTS

System property Meaning
spark.deploy.recoveryMode Set ZOOKEEPER to start the alternate master mode (none by default)
spark.deploy.zookeeper.url zookeyer cluster url (e.g. 192.168.1.100:2181,192.168.1.101:2181)
spark.deploy.zookeeper.dir Zookeeper saves a directory of restored states (default is /spark)

Possible pitfalls: If you have multiple masters in the cluster, but you don't configure them correctly with zookeyer, these masters won't find each other and will think they're all leaders. This will result in an unhealthy cluster state (because all masters are scheduled independently).

Details

After the zookeyer cluster is started, it is easy to turn on high availability. U nder the same zookeeper configuration (zookeeper URL and directory), multiple master processes are simply started on different nodes. Master can be added and removed at any time.

In order to schedule a new application or add a worker to the cluster, it needs to know the IP address of the current leader. T his can be done by simply passing a master list. F or example, you might start your SparkContext pointing spark://host1:port1,host2:port2 This will cause your SparkContext to register both masters at the same time - host1 this profile will always be correct as we will find the new host2

There is an important difference between "registering with a Master" and normal operation. W hen launched, an application or worker needs to be able to discover and register the current leader master. O nce it is successfully registered, it is in the system. If an error occurs, the new leader will contact all previously registered applications and workers to notify them of changes in their leadership relationships, so they don't even need to know in advance that the newly launched leader exists.

Because of this property, a new master can be created at any time. The only thing you need to worry about is that the new app and worker can discover it and register it in case it becomes leader master.

Use the local file system for single-node recovery

Zookeyer is the best choice in a production environment, but if you want to restart master after he dies, FILESYSTEM can solve it. When applications and workers are registered, they have enough state writes to provide a directory that they can recover when the master process is restarted.

Configuration

To turn on this recovery mode, you can use the following spark-env up the SPARK_DAEMON_JAVA_OPTS

System property Meaning
spark.deploy.recoveryMode Set to FILESYSTEM on single-node recovery mode (none default)
spark.deploy.recoveryDirectory The directory used to restore the state

Details

  • This solution can work with monitors/managers, such as monit, or simply restart to turn on manual recovery.
  • While file system recovery may seem better than not doing any recovery, this pattern may be sub-optimal for specific development or experimental purposes. I n particular, stop-master.sh by stop-master.sh does not clear its recovery state, so whenever you start a new master, it will go into recovery mode. This may increase the startup time to 1 minute.
  • Although it is not officially supported in the way, you can also create an NFS directory as a recovery directory. I f the original master node is completely dead, you can start the master at a different node, and it can correctly recover all the applications and workers registered before. Future applications will find this new master.