May 17, 2021 Spark Programming guide
To install Spark stand-alone mode, you only need to simply put the compiled version of Spark into each node of the cluster. You can get a precompiled version of each stable version, or you can compile it yourself.
You can start a separate master server in the following way.
./sbin/start-master.sh
Once started, master will print a
spark://HOST:PORT
URL for yourself that you can use to connect to the workers or pass it to
SparkContext
You can also find this URL on the master web UI, where the default address is
http://localhost:8080
Similarly, you can start one or more workers or connect them to master.
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT
Once you start a worker, check out the master web UI. You can see the new list of nodes, as well as the number of CPUs and memory of the nodes.
The following configuration parameters can be passed to the master and worker.
Argument | Meaning |
---|---|
-h HOST, --host HOST | The host name of the listening |
-i HOST, --ip HOST | Irth, it has been eliminated |
-p PORT, --port PORT | Listen to the port of the service (master default is 7077, worker random) |
--webui-port PORT | Web UI port (master default is 8080, worker default is 8081) |
-c CORES, --cores CORES | The CPU cores that spark applications can use (all are available by default); |
-m MEM, --memory MEM | The amount of memory that a Spark application can use (the default is to subtract 1g from your machine memory); |
-d DIR, --work-dir DIR | The directory for staging space and work output logs (the default is SPARK_HOME/work); |
--properties-file FILE | Load directory of custom Spark profiles (conf/spark-defaults.conf by default) |
In order to start a Spark stand-alone cluster with a startup script, you should set up a
conf/slaves
must contain the host name of all the machines on which Spark worker is starting, one line at a time. I
f
conf/slaves
not exist, the startup script defaults to a single machine (localhost), which is useful for testing. N
ote that the master machine accesses all workers through ssh. B
y default, SSH runs in parallel and requires password-free access (with a private key).
If you don't have password-free access, you can
SPARK_SSH_FOREGROUND
variable to provide a password for each worker.
Once you set up this file, you can start or stop your cluster through the shell script below.
Note that these scripts must be executed on the machine on which your Spark master runs, not on your local machine.
You can further configure the cluster env.sh environment variables in
conf/spark-env.sh
C
reate
conf/spark-env.sh.template
and copy it to all worker machines to make the settings work.
The following settings work:
Environment Variable | Meaning |
---|---|
SPARK_MASTER_IP | Bind master to a specified ip address |
SPARK_MASTER_PORT | Start master on different ports (default 7077) |
SPARK_MASTER_WEBUI_PORT | Port of master web UI (8080 by default) |
SPARK_MASTER_OPTS | The configuration properties applied to the master are in the format "-Dx-y" (none by default), and check the options in the table below to form a possible list |
SPARK_LOCAL_DIRS | The directory of staging spaces in Spark. I ncludes map output files and RDDs stored on disk (map output files and RDDs that get stored on disk). T his must be on a fast, local disk of your system. It can be a comma-separated list that represents multiple directories of different disks |
SPARK_WORKER_CORES | The number of cores that Spark applications can use (all available by default) |
SPARK_WORKER_MEMORY |
The total amount of memory used by spark applications (the default is the total amount of memory minus 1G).
Note that the memory of each application individual is set
spark.executor.memory
|
SPARK_WORKER_PORT | Start Spark worker on the specified port (random by default) |
SPARK_WORKER_WEBUI_PORT | Port of worker UI (8081 by default) |
SPARK_WORKER_INSTANCES |
The number of worker instances running per machine, the default being 1. I
f you have a very large machine and want to run multiple workers, you can set this number to greater than 1.
If you set this environment variable, make sure that
SPARK_WORKER_CORES
environment variable to limit the number of cores per worker or for each worker to try to use all the cores.
|
SPARK_WORKER_DIR | Spark worker runs a directory that includes logs and staging space (the default is SPARK_HOME/work) |
SPARK_WORKER_OPTS | Apply to the worker's configuration properties in the format "-Dx-y" (the default is none), and review the options in the table below to form a possible list |
SPARK_DAEMON_MEMORY | Memory allocated to Spark master and worker daemons (512m by default) |
SPARK_DAEMON_JAVA_OPTS | The JVM option for Spark master and worker daemons in the format "-Dx-y" (none by default) |
SPARK_PUBLIC_DNS | Spark master and worker public DNS name (none by default) |
Note that the startup script does not support windows yet. In order to start the Spark cluster on windows, master and workers need to be started manually.
SPARK_MASTER_OPTS
system properties:
Property Name | Default | Meaning |
---|---|---|
spark.deploy.retainedApplications | 200 | Show the maximum number of completed applications. Older applications are removed to meet this limit |
spark.deploy.retainedDrivers | 200 | Show the maximum number of drivers completed. Older applications are removed to meet this limit |
spark.deploy.spreadOut | true | This option controls whether a stand-alone cluster manager should deliver applications across nodes or should strive to consolidate programs into as few nodes as possible. In HDFS, the passer is a better choice for data localization, but consolidation is more efficient for computationally intensive loads. |
spark.deploy.defaultCores | (infinite) |
In Spark stand-alone mode, give the application the default number of cores (if
spark.cores.max
I
f not set, the total number of applications gets all available cores unless
spark.cores.max
Setting a lower number of cores on a shared cluster prevents users from grabbing the entire cluster by default.
|
spark.worker.timeout | 60 | The interval between when the independently deployed master believes that the worker failed (no heartbeat message was received). |
SPARK_WORKER_OPTS
system properties:
Property Name | Default | Meaning |
---|---|---|
spark.worker.cleanup.enabled | false | Periodically empty the worker/application directory. N ote that this only affects stand-alone deployment patterns. The program directory is emptied regardless of whether the application is still executing |
spark.worker.cleanup.interval | 1800 (30 min) | On the local machine, the worker emptys the interval between the old application working directories |
spark.worker.cleanup.appDataTtl | 7 24 3600 (7 days) | The retention time of the application working directory in each worker. T his time depends on the amount of disk space available to you. A pplication logs and jar packages are uploaded to the working directory of each application. Over time, the working directory fills up disk space quickly, especially if you are running frequent jobs. |
To run an application in a Spark cluster, simply pass
spark://IP:PORT
URL
to SparkContext
In order to run an interactive Spark shell on the cluster, run the command:
./bin/spark-shell --master spark://IP:PORT
You can also pass an
--total-executor-cores <numCores>
to control the number of cores in the spark-shell.
Spark-submit scripts
support the most direct submission of a Spark application to the cluster. F
or stand-alone clusters, Spark currently supports two deployment modes. I
n
client
the driver startup process is the same process as the process in which the client submits the application.
However, in
cluster
the driver starts in one of the cluster's worker processes, and only if the client process completes the commit task, it does not exit until the application is complete.
If your application starts with Spark submit, your application jar package will be automatically distributed to all worker nodes.
For other jar packages on which your application depends, you
--jars
symbol
--jars jar1,jar2
In addition,
cluster
mode supports the automatic restart of your application (if the program exits with a non-zero exit code). T
o use this feature, when you start the application, you can pass
--supervise
symbol
spark-submit
If you want to kill repeatedly failed apps, you can do so by:
./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>
You can find the driver ID in the http://:8080's Master web UI (7000).
The independently deployed cluster pattern supports only simple FIFO schedulers. H
owever, to allow multiple parallel users, you can control the maximum number of resources that each application can use. B
y default, it gets all the cores of the cluster, which makes sense only if only one application is allowed at a certain point.
You can set the number of .max
in SparkConf via
spark.cores.max
val conf = new SparkConf()
.setMaster(...)
.setAppName(...)
.set("spark.cores.max", "10")
val sc = new SparkContext(conf)
In addition, you can configure
spark.deploy.defaultCores
to change the default values.
Add
conf/spark-env.sh
the table:
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"
This is useful in shared clusters in which the user does not have a maximum number of cores configured.
By default, a stand-alone scheduling cluster is resilient to worker failures (within Spark's own range, and for lost work by transferring it to another worker). H owever, the scheduler executes the scheduling decision through the master, which causes a single point of failure: if the master dies, the new application cannot be created. To avoid this, we have two highly available modes.
With ZooKeeper to support leadership elections and some status stores, you can launch multiple masters in your cluster that connect to the same ZooKeeper instance. O ne was chosen as the "leader" and the other remained in standby mode. I f the current leader dies, another master will be selected to restore the old master's state, and then resume scheduling. T he entire recovery process takes about 1 to 2 minutes. Note that this recovery time only affects scheduling new applications - applications running in failed master are not affected.
To turn on this recovery mode, you can use the following
spark-env
up the
SPARK_DAEMON_JAVA_OPTS
System property | Meaning |
---|---|
spark.deploy.recoveryMode | Set ZOOKEEPER to start the alternate master mode (none by default) |
spark.deploy.zookeeper.url | zookeyer cluster url (e.g. 192.168.1.100:2181,192.168.1.101:2181) |
spark.deploy.zookeeper.dir | Zookeeper saves a directory of restored states (default is /spark) |
Possible pitfalls: If you have multiple masters in the cluster, but you don't configure them correctly with zookeyer, these masters won't find each other and will think they're all leaders. This will result in an unhealthy cluster state (because all masters are scheduled independently).
After the zookeyer cluster is started, it is easy to turn on high availability. U nder the same zookeeper configuration (zookeeper URL and directory), multiple master processes are simply started on different nodes. Master can be added and removed at any time.
In order to schedule a new application or add a worker to the cluster, it needs to know the IP address of the current leader. T
his can be done by simply passing a master list. F
or example, you might start your SparkContext pointing
spark://host1:port1,host2:port2
This will cause your SparkContext to register both masters at the same time -
host1
this profile will always be correct as we will find the new
host2
There is an important difference between "registering with a Master" and normal operation. W hen launched, an application or worker needs to be able to discover and register the current leader master. O nce it is successfully registered, it is in the system. If an error occurs, the new leader will contact all previously registered applications and workers to notify them of changes in their leadership relationships, so they don't even need to know in advance that the newly launched leader exists.
Because of this property, a new master can be created at any time. The only thing you need to worry about is that the new app and worker can discover it and register it in case it becomes leader master.
Zookeyer is the best choice in a production environment, but if you want to restart master after he dies,
FILESYSTEM
can solve it.
When applications and workers are registered, they have enough state writes to provide a directory that they can recover when the master process is restarted.
To turn on this recovery mode, you can use the following
spark-env
up the
SPARK_DAEMON_JAVA_OPTS
System property | Meaning |
---|---|
spark.deploy.recoveryMode | Set to FILESYSTEM on single-node recovery mode (none default) |
spark.deploy.recoveryDirectory | The directory used to restore the state |
stop-master.sh
by stop-master.sh does not clear its recovery state, so whenever you start a new master, it will go into recovery mode.
This may increase the startup time to 1 minute.