May 17, 2021 Spark Programming guide
Spark provides three locations to configure the system:
conf/spark-env.sh
of each node.
For example, an IP address
Spark properties control most application settings and configure it separately for each application. T
hese properties can be
configured directly on SparkConf
and then passed
SparkContext
SparkConf
you to configure common properties such as master URL, application
set()
set by the set() method. F
or example, we can create an application with two threads as follows.
Note that we run
local[2]
that two threads - representing minimal parallelity - can help us detect errors that occur when running in a distributed environment.
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
.set("spark.executor.memory", "1g")
val sc = new SparkContext(conf)
Note that we have more than 1 thread in local mode. As in the case of Spark Streaming, we may need a thread to prevent any form of hunger.
In some cases, you may want to avoid hard-coded configurations in
SparkConf
F
or example, you want to run the same application with a different master or a different amount of memory.
Spark allows you to simply create an empty conf.
val sc = new SparkContext(new SparkConf())
Then you provide a value at runtime.
./bin/spark-submit --name "My app" --master local[4] --conf spark.shuffle.spill=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
Spark shell and
spark-submit
support two ways to dynamically load configurations. T
he first is the command line option, such
--master
as shown in the shell above.
spark-submit
can accept any Spark property, represented
--conf
tag. H
owever, the properties that participate in the Launch of the Spark application are represented by specific tags.
Running
./bin/spark-submit --help
the entire list of options.
bin/spark-submit
conf/spark-defaults.conf
where each row contains a pair of keys and values separated by spaces.
For example:
spark.master spark://5.6.7.8:7077
spark.executor.memory 512m
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
Any values specified by the label (flags) or values in the configuration file are passed to the application and merged through
SparkConf
Properties
SparkConf
have the highest priority,
spark-submit
values passed to
spark-shell
and finally in
spark-defaults.conf
file.
The application web UI on
http://<driver>:4040
lists all Spark properties in the "Environment" tab. T
his is useful for you to ensure that the properties you set are correct. N
ote that values specified directly by spark-defaults.conf, SparkConf, and the command line are displayed.
For other configuration properties, you can think of the program as using the default value.
Most properties that control internal settings have reasonable default values, and some of the most common options are set as follows:
Property Name | Default | Meaning |
---|---|---|
spark.app.name | (none) | The name of your application. This will appear in the UI and log data |
spark.master | (none) | Where the cluster manager is connected |
spark.executor.memory | 512m | The amount of memory used by each executor process. The same format as the JVM memory string (e.g. 512m, 2g) |
spark.driver.memory | 512m | The amount of memory used by the driver process |
spark.driver.maxResultSize | 1g | The total size limit for serialized results for all partitions of each Spark action, such as colect. T he set value should be no less than 1m, and 0 represents no limit. I f the total size exceeds this limit, the work will be terminated. L arge limit values can cause memory overflow errors in the driver (depending on the memory consumption of objects in spark.driver.memory and JVM). Set reasonable limits to avoid memory overflow errors. |
spark.serializer | org.apache.spark.serializer.JavaSerializer | The class used by the serialized object. T he default java serialization class can serialize any serialized java object but it is slow. All of us recommend using org.apache.spark.serializer.KryoSerializer |
spark.kryo.classesToRegister | (none) | If you serialize with Kryo, a given list of custom class names separated by commas represents the classes to register |
spark.kryo.registrator | (none) |
If you serialize with Kryo, set this class to register your custom class. T
his property is useful if you need to register your class in a custom way. O
therwise
spark.kryo.classesToRegister
be simpler.
It should set up a class
inherited from KryoRegistrator
|
spark.local.dir | /tmp | The usage directory for staging space in Spark. In Spark 1.0 and later, this property SPARK_LOCAL_DIRS overwritten by the Standalone, Mesos, LOCAL_DIRS (YARN) environment variables. |
spark.logConf | false | When SparkContext starts, a valid SparkConf is recorded as INFO. |
Property Name | Default | Meaning |
---|---|---|
spark.executor.extraJavaOptions | (none) |
JVM option string passed to executors. F
or example, GC settings or other log settings. N
ote that setting spark properties or heap sizes in this option is illegal. S
park properties need to be set with
spark-submit
files used by
spark-defaults.conf
script.
Heap memory can
spark.executor.memory
|
spark.executor.extraClassPath | (none) | Additional classpath entities attached to the classpath of executors. T he main purpose of this setting is that Spark is backward compatible with older versions. Users generally do not need to set this option |
spark.executor.extraLibraryPath | (none) | Specifies the library path used to start the JVM of executor |
spark.executor.logs.rolling.strategy | (none) |
Set the rolling policy for the executor log. N
ot on by default. C
an be
time
(time-based scrolling) and
size
(size-based scrolling).
For
time
set the scroll interval with
spark.executor.logs.rolling.time.interval
and for
size
set the maximum scroll size with
spark.executor.logs.rolling.size.maxBytes
|
spark.executor.logs.rolling.time.interval | daily |
The interval at which the executor log scrolls. N
ot on by default.
The legal values
daily
hourly
minutely
and any second.
|
spark.executor.logs.rolling.size.maxBytes | (none) | The maximum scroll size of the executor log. N ot on by default. The value is set to bytes |
spark.executor.logs.rolling.maxRetainedFiles | (none) | Sets the number of recently scrolled log files that are retained by the system. O lder log files will be deleted. It is not turned on by default. |
spark.files.userClassPathFirst | false | (Experimental) When loading classes in Executors, whether the user adds a jar that has a higher priority than Spark's own jar. T his property reduces the conflict between Spark dependency and user dependency. It is still an experimental feature. |
spark.python.worker.memory | 512m | The amount of memory used by each python worker process during aggregation. During aggregation, if memory exceeds this limit, it will cede the data into disk |
spark.python.profile | false |
Turn on profilling in Python worker. T
he
sc.show_profiles()
system (). O
r show the results of the analysis before driver exits. Y
ou
sc.dump_profiles(path)
to disk by using path.
If some of the results of the analysis have been presented manually, they will not be automatically displayed until the driver exits
|
spark.python.profile.dump | (none) |
The directory of the dump file where the analysis results are saved before the driver exits. E
ach RDD has a separate dump file. T
hrough
ptats.Stats()
these files.
If this property is specified, the results of the analysis are not presented automatically
|
spark.python.worker.reuse | true | Whether to reuse python worker. I f so, it will use a fixed number of Python workers, without the need for a Python process for each task fork(). T his setting is useful if you have a very large broadcast. Because broadcasting does not need to be passed once per task from JVM to Python worker |
spark.executorEnv. [EnvironmentVariableName] | (none) |
Add the specified environment variable to the executor process through
EnvironmentVariableName
Users can specify multiple
EnvironmentVariableName
set multiple environment variables
|
spark.mesos.executor.home | driver side SPARK_HOME |
Set up the Spak directory installed on Mesos' executor. B
y default, executors use driver's Spark local (home) directory, which is not visible to them.
Note that this setting
spark.executor.uri
specifies Spark's binary package
|
spark.mesos.executor.memoryOverhead | Executor memory s 0.07, min 384m |
This value is
spark.executor.memory
I
t is used to calculate the total memory of the mesos task. I
n addition, there is a hard-coded setting of 7%.
The final value will
spark.mesos.executor.memoryOverhead
or 7% of
spark.executor.memory
|
Property Name | Default | Meaning |
---|---|---|
spark.shuffle.consolidateFiles | false | If set to true, the merged intermediate file will be created during shuffle. C reating fewer files can provide the efficiency of the file system's shuffle. T hese shuffles are accompanied by a large number of recursive tasks. W hen using the ext4 and dfs file systems, the recommended setting is true. In ext3, this option may reduce the efficiency of the machine (greater than 8 cores) due to file system limitations |
spark.shuffle.spill | true |
If set to true, limit the amount of memory by writing the more data to disk.
Specify
spark.shuffle.memoryFraction
|
spark.shuffle.spill.compress | true |
When shuffle, whether to compress the spilling data.
The compression algorithm is
spark.io.compression.codec
|
spark.shuffle.memoryFraction | 0.2 |
If
spark.shuffle.spill
the java heap memory used by the aggregation and merge group operations in shuffle accounts for the proportion of total memory. A
t any time, the collection size of all in-memory maps used by shuffles is constrained by this limit. B
eyond this limit, spilling data will be saved to disk.
If spilling is too frequent, consider increasing this value
|
spark.shuffle.compress | true | Whether to compress the output file of the map operation. In general, this is a good choice. |
spark.shuffle.file.buffer.kb | 32 | The size of the cache in memory for each shuffle file output stream, in kb. This cache reduces the number of disk searches and system accesses created in only intermediate shuffle files |
spark.reducer.maxMbInFlight | 48 | The maximum size (mb) of map output data obtained simultaneously from recursive tasks. Because each output requires us to create a cache to receive, this setting represents a fixed memory limit for each task, so unless you have more memory, set it a little smaller |
spark.shuffle.manager | sort |
Its implementation is used for shuffle data. T
here are two implementations available:
sort
hash
Sort-based shuffle has higher memory usage
|
spark.shuffle.sort.bypassMergeThreshold | 200 | (Advanced) In the sort-based shuffle manager, avoid merge-sorting data if there is no map-side aggregation and there are at most this many reduce partitions |
spark.shuffle.blockTransferService | netty |
The implementation is used to pass shuffle and cache blocks directly in executor. T
here are two implementations available:
netty
nio
Netty-based block delivery is simpler with the same efficiency
|
Property Name | Default | Meaning |
---|---|---|
spark.ui.port | 4040 | The port of your app dashboard. Displays memory and workload data |
spark.ui.retainedStages | 1000 | The number of stages that Spark UI and the status API remember before garbage collection |
spark.ui.retainedJobs | 1000 | The number of jobs that spark UIs and status APIs remember before garbage collection |
spark.ui.killEnabled | true | Run in the web UI to kill the stage and the corresponding job |
spark.eventLog.enabled | false | Whether to log Spark's events. This is useful for reconstructing the web UI after the application is complete |
spark.eventLog.compress | false |
Whether to compress the event log.
Need
spark.eventLog.enabled
for true
|
spark.eventLog.dir | file:///tmp/spark-events | The basic directory of Spark event logging. I n this basic directory, Spark creates a subdirecte for each application. E ach application logs to the directory up to. Users may want to set this to a unified location, like HDFS, so historical files can be read through the history server |
Property Name | Default | Meaning |
---|---|---|
spark.broadcast.compress | true | Whether to compress the broadcast variable before it is sent |
spark.rdd.compress | true | Whether to compress the serialized RDD partition. Save a lot of space while spending some extra CPU time |
spark.io.compression.codec | snappy |
A codec that compresses internal data such as RDD partitions, broadcast variables, shuffle outputs, and so on. B
y default, Spark offers three options: lz4, lzf, and snappy. Y
ou can also use the full class name to make it.
org.apache.spark.io.LZ4CompressionCodec
,
org.apache.spark.io.LZFCompressionCodec
,
org.apache.spark.io.SnappyCompressionCodec
|
spark.io.compression.snappy.block.size | 32768 | The size of the block used in Snappy compression. Reducing the size of this block also reduces shuffle memory usage |
spark.io.compression.lz4.block.size | 32768 | The size of the block used in LZ4 compression. Reducing the size of this block also reduces shuffle memory usage |
spark.closure.serializer | org.apache.spark.serializer.JavaSerializer | The serialized class used for closure. Currently only java serializers are supported |
spark.serializer.objectStreamReset | 100 |
When
org.apache.spark.serializer.JavaSerializer
the serializer prevents writing excess data by caching objects, which in turn causes garbage collection of those objects to stop. B
y requesting 'reset', you fush this information from the serializer and allow the collection of old data. I
n order to turn off this periodic reset, you can set the value to -1.
By default, reset every 100 objects
|
spark.kryo.referenceTracking | true | When serializing with Kryo, track whether the same object is referenced. I f your object diagram has rings, this is a must-set. T his setting is useful for efficiency if they contain multiple copies of the same object. If you know not in both scenarios, you can disable it to improve efficiency |
spark.kryo.registrationRequired | false | Whether registration is required for Kyro to be available. I f set to true, and then if a class is serialized without registration, Kyro throws an exception. I f set to false, Kryo writes each object and its non-registered class name at the same time. Writing class names can cause significant performance bottlenecks. |
spark.kryoserializer.buffer.mb | 0.064 |
Kyro serializes the size of the cache. T
his way each core on the worker has a cache.
If needed, the cache will rise
spark.kryoserializer.buffer.max.mb
setting.
|
spark.kryoserializer.buffer.max.mb | 64 | The maximum allowed for Kryo serialization caching. This value must be greater than the object you are trying to serialize |
Property Name | Default | Meaning |
---|---|---|
spark.driver.host | (local hostname) | The host name or IP address of the driver listening. This is used to communicate with executors and independent master |
spark.driver.port | (random) | The interface on which the driver listens. This is used to communicate with executors and independent master |
spark.fileserver.port | (random) | The port on which the driver's file server listens |
spark.broadcast.port | (random) | The port on which driver's HTTP broadcast server listens |
spark.replClassServer.port | (random) | The port on which the driver's HTTP class server listens |
spark.blockManager.port | (random) | The port on which the block manager listens. These exist in both driver and executors |
spark.executor.port | (random) | The port on which executor listens. Used to communicate with the driver |
spark.port.maxRetries | 16 | When bound to a port, the maximum number of retrys before discarding |
spark.akka.frameSize | 10 | The maximum message size allowed in the "control plane" communication. If your task needs to send large results to the driver, increase this value |
spark.akka.threads | 4 | The number of actor threads in the communication. When driver has a lot of CPU cores, it is useful to amute it |
spark.akka.timeout | 100 | Communication timeout between spark nodes. Unit is S |
spark.akka.heartbeat.pauses | 6000 |
This is set to a larger value to disable failure detector that comes inbuilt akka.
It can be enabled again, if you plan to use this feature (Not recommended).
Acceptable heart beat pause in seconds for akka.
This can be used to control sensitivity to gc pauses.
Tune this in combination of
spark.akka.heartbeat.interval
and
spark.akka.failure-detector.threshold
if you need to.
|
spark.akka.failure-detector.threshold | 300.0 |
This is set to a larger value to disable failure detector that comes inbuilt akka.
It can be enabled again, if you plan to use this feature (Not recommended).
This maps to akka's
akka.remote.transport-failure-detector.threshold
.
Tune this in combination of
spark.akka.heartbeat.pauses
and
spark.akka.heartbeat.interval
if you need to.
|
spark.akka.heartbeat.interval | 1000 |
This is set to a larger value to disable failure detector that comes inbuilt akka.
It can be enabled again, if you plan to use this feature (Not recommended).
A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector.
Tune this in combination of
spark.akka.heartbeat.pauses
and
spark.akka.failure-detector.threshold
if you need to.
Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick.
However this is usually not the case as gc pauses and network lags are expected in a real Spark cluster.
Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those.
|
Property Name | Default | Meaning |
---|---|---|
spark.authenticate | false |
Whether Spark verifies its internal connection.
If it's not running on YARN,
spark.authenticate.secret
|
spark.authenticate.secret | None | Set the key verification between the two components of Spark. This option must be set if it is not running on YARN but needs validation |
spark.core.connection.auth.wait.timeout | 30 | The actual one waiting to be verified when connecting. In seconds |
spark.core.connection.ack.wait.timeout | 60 | The time the connection waited for an answer. I n seconds. To avoid unwanted timeouts, you can set a larger value |
spark.ui.filters | None |
A comma-separated list of filtered class names applied to the Spark web UI. T
he filter must be
standard javax servlet Filter.
Y
ou can also specify the parameters for each filter by setting java system properties.
spark.<class name of filter>.params='param1=value1,param2=value2'
。
For
-Dspark.ui.filters=com.test.filter1
.
-Dspark.com.test.filter1.params='param1=foo,param2=testing'
|
spark.acls.enable | false | Whether spark acls are turned on. I f turned on, it checks if the user has permission to view or modify the job. N ote this requires the user to be known, so if the user comes across as null no checks are done。 The UI uses filters to authenticate and set up users |
spark.ui.view.acls | empty | A comma-separated list of users who have permission to view the Spark web UI. By default, only users who start Spark job have viewing rights |
spark.modify.acls | empty | A comma-separated list of users who have permission to modify Spark job. By default, only users who start Spark job have permission to modify it |
spark.admin.acls | empty | A comma separates a list of users or administrators who have permission to view and modify all Spark jobs. This option is useful if you are running in a shared cluster and have a group of administrators or developers to help debug |
Property Name | Default | Meaning |
---|---|---|
spark.streaming.blockInterval | 200 | During this interval (ms), the data received through Spark Streaming receivers is chunk as a block of data before it is saved to Spark. The recommended minimum value is 50ms |
spark.streaming.receiver.maxRate | infinite | The maximum number of records per second of data that receiver will receive. I n a valid case, each stream consumes at least this number of records. Setting this configuration to 0 or -1 will be unlimited |
spark.streaming.receiver.writeAheadLogs.enable | false | Enable write ahead logs for receivers. All the input data received through receivers will be saved to write ahead logs that will allow it to be recovered after driver failures |
spark.streaming.unpersist | true | RDDs that are forced to be generated and persisted through Spark Streaming are automatically non-persistent from Spark memory. T he original input data received through Spark Streaming will also be cleared. S etting this property to false allows streaming applications to access raw data and persist RDDs because they are not automatically purged. However, it can result in higher memory costs |
Spark settings determined by the configuration of the environment variables. E
nvironment variables are read from
conf/spark-env.sh
in the Spark installation directory (or
conf/spark-env.cmd
I
n stand-alone or Mesos mode, this file can give the machine identified information, such as the host name.
It also works when you run a local application or submit a script.
Note that when Spark is installed,
conf/spark-env.sh
not exist by default.
You can
conf/spark-env.sh.template
create it.
You
spark-env.sh
the following variables in :
Environment Variable | Meaning |
---|---|
JAVA_HOME | The path to the java installation |
PYSPARK_PYTHON | PyThon binary execution file path used by PySpark |
SPARK_LOCAL_IP | The IP address of the machine binding |
SPARK_PUBLIC_DNS | Your Spark application notifies other machines of the host name |
In addition to the above, Spark standalone cluster scripts can also set some options. Examples are the number of cores used per machine and the maximum memory.
Because
spark-env.sh
a shell script, some of which can be programmed.
For example, you can calculate the value of
SPARK_LOCAL_IP
Spark uses
log4j
logging. Y
ou can configure this by adding a
log4j.properties
the conf directory.
One way is to copy
log4j.properties.template
file.