Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

Spark GraphX Pregel API


May 17, 2021 Spark Programming guide


Table of contents


Spark GraphX Pregel API

The graph itself is a recursive data structure, and the properties of vertests depend on the properties of their neighbors, which in turn depend on the properties of their neighbors. S o many important graph algorithms are iterative to recalculate the properties of each vertest until a certain determined condition is met. A series of graph-parallel abstractions have been proposed to express these iterative algorithms. GraphX exposes an operation similar to Pregel, a fusion of the widely used Pregel and GraphLab abstractions.

In GraphX, the more advanced Pregel operation is a bulk-synchronous parallel message abstraction that constrains the topology of the graph. T he Pregel operator performs a series of super steps in which the vertend receives the sum of inbound messages from the previous superseed, calculates a new value for the vertend property, and then sends the message to the neighbor vertrel in a later super step. U nlike Pregel and more like GraphLab, messages are calculated in parallel as a function of an edge triple, and message calculations access both the source vertrel feature and the destination vertrel feature. I n Superseed, the vertes that do not receive a message are skipped. When there is no message left, the Pregel operation stops iterating and returns the final graph.

Note that unlike the more standard Pregel implementation, vertes in GraphX can only send information to neighbor vertends and construct messages using user-defined message functions. These restrictions allow for additional optimization at GraphX.

Here's the Pregel operation (ClassTag(A): type signature of Graph (VD, ED)) and implementation sketch (note that access to graph.cache has been deleted)

class GraphOps[VD, ED] {
  def pregel[A]
      (initialMsg: A,
       maxIter: Int = Int.MaxValue,
       activeDir: EdgeDirection = EdgeDirection.Out)
      (vprog: (VertexId, VD, A) => VD,
       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
       mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    // Receive the initial message at each vertex
    var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
    // compute the messages
    var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
    var activeMessages = messages.count()
    // Loop until no messages remain or maxIterations is achieved
    var i = 0
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages: -----------------------------------------------------------------------
      // Run the vertex program on all vertices that receive messages
      val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
      // Merge the new vertex values back into the graph
      g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
      // Send Messages: ------------------------------------------------------------------------------
      // Vertices that didn't receive a message above don't appear in newVerts and therefore don't
      // get to send messages.  More precisely the map phase of mapReduceTriplets is only invoked
      // on edges in the activeDir of vertices in newVerts
      messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
      activeMessages = messages.count()
      i += 1
    }
    g
  }
}

Note that pregel has two parameter lists (graph.pregel (list1) (list2). T he first parameter list contains the initial message for the configuration parameter, the maximum number of iterations, and the direction of the edge that sent the message (the default is out of the edge direction). The second argument list contains user-defined functions to receive messages (vprog), compute messages (sendMsg), and merge messages (mergeMsg).

We can calculate the single source shortest path with the Pregel operation expression.

import org.apache.spark.graphx._
// Import random graph generation library
import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Int, Double] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  triplet => {  // Send Message
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  (a,b) => math.min(a,b) // Merge Message
  )
println(sssp.vertices.collect.mkString("\n"))