Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

Spark GraphX graph operator


May 17, 2021 Spark Programming guide


Table of contents


Spark GraphX graph operator

Just as RDDs have basic operational maps, filters, and reduceByKeys, property diagrams also have basic collection operations that take user-defined functions and produce new diagrams that contain transformed features and structures. T he core operations defined in Graph are optimized implementations. C onvenient actions represented as combinations of core operations are defined in GraphOps. H owever, because of Scala's implicit transformation, actions GraphOps can be used automatically as members of Graph For example, we can calculate the entry of each verte point (defined in GraphOps) in the following way.

val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees

The reason for distinguishing GraphOps is to support different graph represents in the future. Each graph represents the need to provide an implementation of the core operation and reuse many of the useful actions defined in GraphOps

A list of operations

One is a quick look Graph defined in Graph and GraphOps which are represented as members of the graph for simplicity. Note that some function signatures have been simplified (such as default parameters and type restrictions have been removed), and some more advanced features have been removed, so see the API documentation for an official list of actions.

/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
  // Information about the Graph ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  // Views of the graph as collections =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
  // Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexID, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  // Join RDDs with the graph ======================================================================
  def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
      (mapFunc: (VertexID, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
  // Aggregate information about adjacent triplets =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
  // Iterative graph-parallel computation ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexID, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
  // Basic graph algorithms ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexID, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
}

Property action

Like the map operation of map the property diagram contains the following actions:

class Graph[VD, ED] {
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}

Each action produces a new graph that contains properties of vertes or edges modified by a user-defined map operation.

Note that in each case the graph structure is not affected. A n important feature of these operations is that it allows the resulting graphic to reuse the structural index (indices) of the original graph. The following two lines of code are logically equivalent, but the first does not save the structure index, so it does not benefit from GraphX system optimization.

val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
val newGraph = Graph(newVertices, graph.edges)

Another approach is to save the index using mapVertices (ClassTag (VD2)): Graph (VD2, ED).

val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))

These operations are often used to initialize graphics, as specific calculations, or to work with properties that are not required for a project. For example, given a graph, the verte feature of the graph contains degrees, and we initialize it for PageRank.

// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
  graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
  inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)

Structural operation

The current GraphX supports only a simple set of commonly used structural operations. Below is a basic list of structural operations.

class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

The reverse operation returns a new graph, the direction of the edges of which is reversed. F or example, this operation can be used to calculate the reversed PageRank. Because the invert operation does not modify the properties of vertvertes or edges or change the number of edges, we can effectively implement it without moving or copying the data.

Subgraph: The Graph (VD, ED) operation utilizes vertices and edge predicates, and the returned graph contains only the vertices that satisfy the vertices, the edges that satisfy the edge predicates, and the connecting vertices that satisfy the vertices. subgraph can be used in many scenarios, such as getting a diagram of vertes and edges of interest, or getting a diagram that clears a broken link. The following example removes the broken link.

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
                       (4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
                       Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
    triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
  ).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
    triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
  ).collect.foreach(println(_))

Note that in the example above, only vertred predicates are provided. If no vertred or edge predicates are subgraph operation defaults to true.

The mask operation constructs a subgraph that contains the vertes and edges contained in the input diagram. T his operation can be subgraph operation to constrain a graph based on the characteristics of another related graph. F or example, we might run a connectivity (? Connect the components to connect components), and then return a valid subgraph.

/ Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

GroupEdges: Graph (VD, ED)) operation merges parallel edges (such as repetitive edges between vertred pairs) in multiple diagrams. In a large number of applications, parallel edges can be merged (their weights combined) into one edge, reducing the size of the graph.

The connection operation

In many cases, it is necessary to add external data to the diagram. F or example, we might have additional user attributes that need to be merged into an existing diagram or we might want to take verte point features from one diagram and add them to another. T hese tasks can be done using the joy operation. The main joy operations are listed below.

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}

JoinVertices (VertexId, VD, U) ⇒VD) (ClassTag(U)): The Graph (VD, ED)) operation combines the input RDD with the vertex to return a new graph with vertex characteristics. T hese characteristics are obtained by using a user-defined map function on the result map vertes. Vertes with no matching values in the RDD retain their original values.

Note that for a given verte point, if there is more than one matching value in the RDD, only one of them is used. I t is recommended that the following methods be used to ensure the uniqueness of entering RDD. The following method also pre-indexes the values returned to speed up subsequent joy operations.

val nonUniqueCosts: RDD[(VertexID, Double)]
val uniqueCosts: VertexRDD[Double] =
  graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
  (id, oldCost, extraCost) => oldCost + extraCost)

In addition to using user-defined map functions for all vertices and changing vertex property types, the more general outerJoinVertices (VertexId, VD, Option(U)) ⇒VD2) (ClassTag(U), ClassTag (VD2)): Graph (VD2, ED)) is similar to joinVertices Because not all vertes have matching values in the RDD, the map function requires an option type.

val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
  outDegOpt match {
    case Some(outDeg) => outDeg
    case None => 0 // No outDegree means zero outDegree
  }
}

As you may have noticed, the multi-argument list of curry functions is used in the example above. A lthough we can write f(a) (b) as f (a, b), f (a, b) means that the type inference of b will not depend on a. Therefore, the user needs to provide type labels for defined functions.

val joinedGraph = graph.joinVertices(uniqueCosts,
  (id: VertexID, oldCost: Double, extraCost: Double) => oldCost + extraCost)

Neighborhood Aggregation

A key step in the graph analysis task is to summarize information near each verte point. F or example, we might want to know the number of followers per user or the average age of each user's followers. Many iterative graph algorithms, such as PageRank, Shortest Path, and Connect, aggregate the properties of adjacent vertes multiple times.

To improve performance, the main aggregation graph.mapReduceTriplets to a new graph.AggregateMessages Although API changes are relatively small, we still provide guidance on transition.

Aggregate Messages (aggregateMessages)

The core aggregation operation in GraphX is aggregateMessages (ClassTags(A): VertexRDD (A). This applies a sendMsg to each side triplet of the graph, and then mergeMsg aggregate these messages at its destination verte.

class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]
}

The sendMsg function is an EdgeContext type. I t exposes the source and destination properties, as well as the edge properties, as well as functions (sendToSrc and sendToDst) that send messages to the source and destination properties. T he sendMsg can be seen as a map function in the map-reduce process. T he mergeMsg specifies two messages to the same verte and saves them as a message. Y ou mergeMsg function as the reduce function in the map-reduce process. ages (ClassTag): The VertexRDD(A)) operation returns a VertexRDD[Msg] that contains an aggregated message (destination for each vertex). Vertexes that do not receive messages are not included in VertexRDD

In addition, AggregateMessages (ClassTags): VertexRDD (A)) has an optional tripletFields data is EdgeContext (such as the source vertex feature rather than the destination vertex feature). tripletsFields are defined in Triplet Fields. tripletFields is used to notify GraphX that only a portion EdgeContext is required to allow GraphX to select an optimized connection strategy. F or example, if we want to calculate the average age of each user's followers, we only need the source field. So we use TripletFields.Src that we just need the source field.

In the following example, we use aggregateMessages calculate the age of each user's older followers.

// Import random graph generation library
import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property.  Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
  triplet => { // Map Function
    if (triplet.srcAttr > triplet.dstAttr) {
      // Send message to destination vertex containing counter and age
      triplet.sendToDst(1, triplet.srcAttr)
    }
  },
  // Add counter and age
  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
  olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))

aggregateMessages works best when the message (and the total number of messages) is constant size (lists and connections are replaced with floats and additions).

Map Reduce triple transition guide

In previous versions of GraphX, adjacent aggregations were performed using the operation mapReduceTriplets.

class Graph[VD, ED] {
  def mapReduceTriplets[Msg](
      map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
      reduce: (Msg, Msg) => Msg)
    : VertexRDD[Msg]
}

mapReduceTriplets a user-defined map function on each triple, and then saves the message aggregated with the user-defined reduce function. H owever, we found that the iterator returned by the user was expensive, which inhibited our ability to add additional optimizations, such as the numbering of local vertes. ages (ClassTag(A): VertexRDD (A)) exposes the sending of messages to the source and destination vertexes displayed by the triple group fields and functions. Also, we removed bytecode detection and instead needed the user to indicate which fields of the 3D group actually needed.

The following code uses mapReduceTriplets

val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
  Iterator((triplet.dstId, "Hi"))
}
def reduceFun(a: Int, b: Int): Int = a + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)

The following code uses aggregateMessages

val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
  triplet.sendToDst("Hi")
}
def reduceFun(a: Int, b: Int): Int = a + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)

Calculated degree information

The most common aggregation task is to calculate the degree of vertes, that is, the number of edges adjacent to each verte. I n a graph, it is often necessary to know the in, out, and total degrees of the verte point. hOps class contains a collection of actions to calculate the degree of each verte. For example, the following example calculates the maximum in, out, and total degrees.

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

Collecting Neighbors

In some cases, it may be easier to express calculations by collecting vertests adjacent to each vertest and their properties. This can be done simply by means of collectNeighbor Ids and collectNeighbors operations

class GraphOps[VD, ED] {
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}

These operations are very expensive because they require duplicate information and a lot of communication. If possible, try to express the same calculation directly with the aggregateMessages operation.

Cache and do not cache

In Spark, RDDs are not cached by default. T o avoid duplicate calculations, we must cache them on display when we need to use them multiple times. T he diagrams in GraphX do the same. When you take advantage of the graph multiple times, make sure that Graph.cache() method.

In iterative calculations, non-caching may be necessary for optimal performance. B y default, cached RDDs and diagrams remain in memory until memory pressure forces them to be deleted in LRU order. F or iterative calculations, the intermediate results of previous iterations are populated into the cache. A lthough they will eventually be deleted, unwanted data stored in memory will slow garbage collection. O nly intermediate results are not required, and it is more efficient not to cache them. T his involves materializing one graph or RDD in each iteration without caching all the other datasets. O nly materialized datasets will be used in future iterations. H owever, because diagrams are made up of multiple RDDs, it is difficult to correctly not persist them. For iterative computing, we recommend using the Pregel API, which correctly does not persist intermediate results.