Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

Spark shares variables


May 17, 2021 Spark Programming guide


Table of contents


Share variables

In general, when a function passed to a Spark operation, such as map and reduce, runs on top of a remote node, the Spark operation actually operates on a separate copy of the variable used by the function. T hese variables are copied to each machine, and all updates to these variables on the remote machine are not passed back to the driver. Cross-task read and write variables are typically inefficient, but Spark provides two limited shared variables for two common usage patterns: broadcast variables and accumulator.

Broadcast variables

Broadcast variables allow programmers to cache a read-only variable on top of each machine instead of keeping a copy of each task. F or example, with broadcast variables, we can assign a copy of a large data input collection to each node in a more efficient way. ( Broadcast variables allow theprogrammer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.) Spark also tries to use effective broadcast algorithms to allocate broadcast variables to reduce the cost of communication.

A broadcast variable can be SparkContext.broadcast(v) method. The broadcast variable is a wrapper variable of v whose value can be accessed by value method, and the following code illustrates the process:

 scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
 broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
 scala> broadcastVar.value
 res0: Array[Int] = Array(1, 2, 3)

Once the broadcast variable is created, we can use it in place of variable v in any function of the cluster, so we don't have to pass variable v to each node again. In addition, object v cannot be modified after broadcast in order to ensure that all nodes get the same value as the broadcast variable.

Accumulator

As the name implies, the adder is a variable that can only be "plus" by association, so it can be efficiently applied to parallel operations. T hey can be used to counters sums S park natively supports numeric types of adders, and developers can add support types themselves. I f you create a named add-on, it can be displayed in spark's UI. T his is important for understanding the running stage process. (Note: This is not supported in python)

An accumulator can be created from an initial variable v by calling SparkContext.accumulator(v) method. T asks running on a cluster can add += the add method or by using the operation. H owever, they cannot read this value. O nly the driver can value method to read the value of the adder. The following code shows how to use the adder to add up all the elements in an array:

scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10

This example leverages the built-in integer type adder. D evelopers can create their own accumulator types with sub-class Accumulator Param. T he Accumulator Param interface has two methods: the zero method addInPlace value" for your data type; For example, if we have a Vector that represents a mathematical vector, we can define the adder as follows:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

In scala, Spark supports accumulating data using a more general Accumulable interface - the result type is different from the element type used for accumulation (for example, by building a list of collected elements). Spark also supports the addition of general scala collection types using the SparkContext.accumulableCollection method.