Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

Spark Streaming Checkpointing


May 17, 2021 Spark Programming guide


Table of contents


Spark Streaming Checkpointing

A streaming application must be running around the clock, and all must be able to resolve application logic-independent failures (e.g. system errors, JVM crashes, etc.). To make this possible, Spark Streaming needs to checkpoint enough information into the fault- caused storage system to recover the system from a failure.

  • Metadata checkpointing: Saves definition information for stream calculations into fault- 0ulity storage systems such as HDFS. T his is used to recover the failure of the node running the worker in the application. Metadata is included

  • Configuration: Create configuration information for the Spark Streaming application
  • DStream Operations: Defines the set of actions for the Streaming application
  • Incomplete batches: The operation has an incomplete batch in the queue

  • Data checkpointing: Saves the resulting RDD to a reliable storage system, which is required in statetransformation, such as combining data across multiple batches. I n such a transformation, the resulting RDD depends on the previous batch of RDD, and the length of the dependency chain continues to grow over time. I n the process of recovery, in order to avoid this infinite growth. The intermediate RDD of a stated transformation is stored on a timed and reliable storage system to truncated the dependency chain.

Metadata checkpoint is primarily intended to recover data from a driver failure. If the transformation operation is used, data checkpoint is required even in simple operations.

When checkpoint

The application must turn checkpoint on in the following two cases

  • Use a stately transformation. If you use updateStateByKey reduceByKeyAndWindow the checkpoint directory must provide a regular checkpoint RDD.
  • Recover from a driver failure that ran the application. Use metadata checkpoint to recover processing information.

Note that simple streaming applications without the stately transformation described above can run without turning on checkpoint. I n this case, the recovery from the driver failure will be partially recovered (data received but not yet processed will be lost). This is usually acceptable, as many Running Spark Streaming applications do.

How to configure Checkpointing

Set up a directory in a fault-07, reliable file system (HDFS, s3, etc.) to hold checkpoint information. t his streamingContext.checkpoint(checkpointDirectory) T his runs you with the stately transformation described earlier. Also, if you want to recover from a drive failure, you should rewrite your Streaming application the following way.

  • When the application is first started, create a new StreamingContext, start all Stream, and then call start() method
  • When the application restarts due to a failure, it will re-create StreamingContext from the checkpoint directory checkpoint data
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

If checkpointDirectory the context is recreated with checkpoint data. I f this directory does not exist, functionToCreateContext called to create a new context to establish DStreams. Take a look at the Recover Network WordCount example.

In addition to getOrCreate developers must ensure that driver processing restarts automatically in the event of a failure. T his can only be achieved by deploying the infrastructure that runs the application. There will be further discussion in the deployment section.

Note that RDD's checkpointing has storage costs. T his results in an increase in processing time for batch data (including RDDs that are checkpointed). T herefore, care needs to be taken to set the interval between batches. W ith a minimum batch capacity of 1 second of data, checkpoints significantly reduce the throughput of operations per batch of data. C onversely, too little checkpointing can lead to an increase in genealogy and task size, which can have harmful effects. B ecause a stately transformation requires RDD checkpoint. T he default interval is a multiplied number of batch intervals, at least 10 seconds. I t can dstream.checkpoint Typically, setting the checkpoint interval is a good attempt for DStream's sliding interval size of 5-10.