May 17, 2021 Spark Programming guide
Similar to RDD, transformation allows data from the input DStream to be modified. D Streams supports many transformation calculations that are available in RDD. Some commonly used calculations are as follows:
Transformation | Meaning |
---|---|
map(func) |
Each element of the original DStream is processed using the function
func
returning a new DStream
|
flatMap(func) | Similar to map, but each input can be mapped to 0 or more output items |
filter(func) | Returns a new DStream that contains only the items in the source DStream that satisfy the function func |
repartition(numPartitions) | Change the parallel level of this DStream by creating more or fewer partitions |
union(otherStream) | Returns a new DStream that contains the combined elements of the source DStream and theotherStream |
count() | By calculating the number of elements per RDD in the source DStream, a new DStream containing single-element RDDs is returned |
reduce(func) | A new DStream containing single-element (single-element) RDDs is returned using the function func aggregates elements of each RDD in the source DStream. Functions should be associated so that calculations can be parallelized |
countByValue() | This actuation is applied to DStream of element type K, returning a (K, long) pair of new DStreams, the value of each key being the frequency in each RDD of the original DStream. |
reduceByKey(func, [numTasks]) |
When this calculator is called on a DStream consisting of (K,V) pairs, a new DStream consisting of (K,V) pairs is returned, and the values of each key are aggregated by a given reduce function. N
ote: By default, this calculater uses Spark's default number of synth tasks to group.
You can set the number of different tasks with the
numTasks
parameter
|
join(otherStream, [numTasks]) | When applied to two DStream (one containing (K,V) pairs and one containing (K,W) pairs), a new DStream with (K, (V, W)) pairs is returned |
cogroup(otherStream, [numTasks]) | When applied to two DStream (one containing (K,V) pairs and one containing (K,W) pairs), a group containing (K, Seq,V, Seq(W)) is returned |
transform(func) | Create a new DStream by applying the RDD-to-RDD function to each RDD of the source DStream. This can be used in any RDD operation in DStream |
updateStateByKey(func) | Update the state of DStream with a given function and return a new "state" DStream. |
The last two transformations need to be highlighted:
The updateStateByKey operation allows it to be continuously updated with new information while remaining in any state. You need to use it in two steps
Let's give you an example. In the example, you want to keep the number of runs per word in a text stream, represented by a state, whose type is an integer
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}
This function is used on the words contained in DStream
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
The update function will be called by each word,
newValues
series of 1 (from (word, 1) pairs), and runningCount has the previous number of times.
To see the full code, see
the example
transform
operation (and its form of change,
transformWith
allows any RDD-to-RDD function to be run on DStream. I
t can be used to apply any RDD operation that is not available in the DStream API (It can be used to apply any RDD operation that is not exposed in the DStream API). F
or example, the ability to connect each batch and another dataset in a data stream is not available in the DStream API, but you can do so
transform
method.
If you want to clean up real-time data by connecting input streams with pre-calculated spam information, and then pass them, you can do so as follows:
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
})
In fact, you can
transform
machine learning and
graph calculation algorithms
in
the
transform method
Spark Streaming also supports window computing, which allows you to apply transformation calculations to a sliding window data. The following illustration illustrates this sliding window.
As shown in the image above, the window slides on the source DStream, merges and manipulates the source RDDs that fall into the window, resulting in the windowed DStream RDDs. I n this particular example, the program performs window operations on the data of three time units and slides each of the two time units. This means that any window operation needs to specify two parameters:
These two parameters must be a multiply of the batch interval of the source DStream.
Here are some examples of window operations. F
or example, you want to extend the
previous example
to calculate the word frequency for the last 30 seconds, with an interval of 10 seconds. T
o do this, we have to apply the
reduceByKey
operation
pairs
DStream for the last 30 seconds.
Implemented
reduceByKeyAndWindow
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
Some common window operations are shown below and require the two parameters mentioned above: the length of the window and the time interval between slides
Transformation | Meaning |
---|---|
window(windowLength, slideInterval) | A new DStream is calculated based on the windowed batch data generated by the source DStream |
countByWindow(windowLength, slideInterval) | Returns the number of sliding windows for elements in the stream |
reduceByWindow(func, windowLength, slideInterval) | Returns a single-element flow. C reate this single-element flow by using the function func to gather elements of the flow at sliding intervals. The function must be associated so that the calculation can be calculated correctly in parallel. |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) |
Apply to a (K,V) pair of DStream and return a new DStream consisting of (K,V) pairs. T
he value of each key is aggregated by the given reduce function. N
ote: By default, this calculater uses Spark's default number of synth tasks to group.
You can set the number of different tasks with the
numTasks
parameter
|
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enter the sliding window, and "inverse reducing" the old data that leave the window. An example would be that of "adding" and "subtracting" counts of keys as the window slides. However, it is applicable to only "invertible reduce functions", that is, those reduce functions which have a corresponding "inverse reduce" function (taken as parameter invFunc. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | Applying to a (k, v) to the composed DSTREAM, a new DSTREAM consisting of (k, v) is applied. The value of each key is the frequency that they appear in the sliding window. |