May 17, 2021 Spark Programming guide
A streaming application must be running around the clock, and all must be able to resolve application logic-independent failures (e.g. system errors, JVM crashes, etc.). To make this possible, Spark Streaming needs to checkpoint enough information into the fault- caused storage system to recover the system from a failure.
Metadata checkpointing: Saves definition information for stream calculations into fault- 0ulity storage systems such as HDFS. T his is used to recover the failure of the node running the worker in the application. Metadata is included
Incomplete batches: The operation has an incomplete batch in the queue
Metadata checkpoint is primarily intended to recover data from a driver failure. If the transformation operation is used, data checkpoint is required even in simple operations.
The application must turn checkpoint on in the following two cases
updateStateByKey
reduceByKeyAndWindow
the checkpoint directory must provide a regular checkpoint RDD.
Note that simple streaming applications without the stately transformation described above can run without turning on checkpoint. I n this case, the recovery from the driver failure will be partially recovered (data received but not yet processed will be lost). This is usually acceptable, as many Running Spark Streaming applications do.
Set up a directory in a fault-07, reliable file system (HDFS, s3, etc.) to hold checkpoint information. t
his
streamingContext.checkpoint(checkpointDirectory)
T
his runs you with the stately transformation described earlier.
Also, if you want to recover from a drive failure, you should rewrite your Streaming application the following way.
start()
method
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
If
checkpointDirectory
the context is recreated with checkpoint data. I
f this directory does not exist,
functionToCreateContext
called to create a new context to establish DStreams.
Take a look
at the Recover Network WordCount
example.
In addition to
getOrCreate
developers must ensure that driver processing restarts automatically in the event of a failure. T
his can only be achieved by deploying the infrastructure that runs the application.
There will be further discussion in the deployment section.
Note that RDD's checkpointing has storage costs. T
his results in an increase in processing time for batch data (including RDDs that are checkpointed). T
herefore, care needs to be taken to set the interval between batches. W
ith a minimum batch capacity of 1 second of data, checkpoints significantly reduce the throughput of operations per batch of data. C
onversely, too little checkpointing can lead to an increase in genealogy and task size, which can have harmful effects. B
ecause a stately transformation requires RDD checkpoint. T
he default interval is a multiplied number of batch intervals, at least 10 seconds. I
t can
dstream.checkpoint
Typically, setting the checkpoint interval is a good attempt for DStream's sliding interval size of 5-10.