May 17, 2021 Spark Programming guide
Just as RDDs have basic operational maps, filters, and reduceByKeys, property diagrams also have basic collection operations that take user-defined functions and produce new diagrams that contain transformed features and structures. T
he core
operations defined
in Graph are optimized implementations. C
onvenient actions represented as combinations of core operations are
defined in GraphOps.
H
owever, because of Scala's implicit transformation, actions
GraphOps
can be used automatically as members of
Graph
For example, we can calculate the entry of each verte point (defined in GraphOps) in the following way.
val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees
The reason for distinguishing
GraphOps
is to support different graph represents in the future.
Each graph represents the need to provide an implementation of the core operation and reuse many of the useful actions defined in
GraphOps
One is a quick look
Graph
defined in Graph and
GraphOps
which are represented as members of the graph for simplicity.
Note that some function signatures have been simplified (such as default parameters and type restrictions have been removed), and some more advanced features have been removed, so see the API documentation for an official list of actions.
/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
// Information about the Graph ===================================================================
val numEdges: Long
val numVertices: Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
// Views of the graph as collections =============================================================
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
// Functions for caching graphs ==================================================================
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
// Change the partitioning heuristic ============================================================
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
// Transform vertex and edge attributes ==========================================================
def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
: Graph[VD, ED2]
// Modify the graph structure ====================================================================
def reverse: Graph[VD, ED]
def subgraph(
epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (VertexID, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
// Join RDDs with the graph ======================================================================
def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
(mapFunc: (VertexID, VD, Option[U]) => VD2)
: Graph[VD2, ED]
// Aggregate information about adjacent triplets =================================================
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A]
// Iterative graph-parallel computation ==========================================================
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
// Basic graph algorithms ========================================================================
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def connectedComponents(): Graph[VertexID, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
}
Like the map operation of
map
the property diagram contains the following actions:
class Graph[VD, ED] {
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
Each action produces a new graph that contains properties of vertes or edges modified by a user-defined map operation.
Note that in each case the graph structure is not affected. A n important feature of these operations is that it allows the resulting graphic to reuse the structural index (indices) of the original graph. The following two lines of code are logically equivalent, but the first does not save the structure index, so it does not benefit from GraphX system optimization.
val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
val newGraph = Graph(newVertices, graph.edges)
Another approach is to save the index using mapVertices (ClassTag (VD2)): Graph (VD2, ED).
val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
These operations are often used to initialize graphics, as specific calculations, or to work with properties that are not required for a project. For example, given a graph, the verte feature of the graph contains degrees, and we initialize it for PageRank.
// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
The current GraphX supports only a simple set of commonly used structural operations. Below is a basic list of structural operations.
class Graph[VD, ED] {
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}
The reverse operation returns a new graph, the direction of the edges of which is reversed. F or example, this operation can be used to calculate the reversed PageRank. Because the invert operation does not modify the properties of vertvertes or edges or change the number of edges, we can effectively implement it without moving or copying the data.
Subgraph:
The Graph (VD, ED) operation utilizes vertices and edge predicates, and the returned graph contains only the vertices that satisfy the vertices, the edges that satisfy the edge predicates, and the connecting vertices that satisfy the vertices.
subgraph
can be used in many scenarios, such as getting a diagram of vertes and edges of interest, or getting a diagram that clears a broken link.
The following example removes the broken link.
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
Note that in the example above, only vertred predicates are provided.
If no vertred or edge predicates are
subgraph
operation defaults to true.
The mask
operation constructs a subgraph that contains the vertes and edges contained in the input diagram. T
his operation can be
subgraph
operation to constrain a graph based on the characteristics of another related graph. F
or example, we might run a connectivity (?
Connect the components to connect components), and then return a valid subgraph.
/ Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)
GroupEdges: Graph (VD, ED)) operation merges parallel edges (such as repetitive edges between vertred pairs) in multiple diagrams. In a large number of applications, parallel edges can be merged (their weights combined) into one edge, reducing the size of the graph.
In many cases, it is necessary to add external data to the diagram. F or example, we might have additional user attributes that need to be merged into an existing diagram or we might want to take verte point features from one diagram and add them to another. T hese tasks can be done using the joy operation. The main joy operations are listed below.
class Graph[VD, ED] {
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
: Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
}
JoinVertices
(VertexId, VD, U) ⇒VD) (ClassTag(U)): The Graph (VD, ED)) operation combines the input RDD with the vertex to return a new graph with vertex characteristics. T
hese characteristics are obtained by using a user-defined map function on the result
map
vertes.
Vertes with no matching values in the RDD retain their original values.
Note that for a given verte point, if there is more than one matching value in the RDD, only one of them is used. I t is recommended that the following methods be used to ensure the uniqueness of entering RDD. The following method also pre-indexes the values returned to speed up subsequent joy operations.
val nonUniqueCosts: RDD[(VertexID, Double)]
val uniqueCosts: VertexRDD[Double] =
graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
(id, oldCost, extraCost) => oldCost + extraCost)
In addition to using user-defined map functions for all vertices and changing vertex property types, the more general
outerJoinVertices
(VertexId, VD, Option(U)) ⇒VD2) (ClassTag(U), ClassTag (VD2)): Graph (VD2, ED)) is similar to
joinVertices
Because not all vertes have matching values in the RDD, the map function requires an option type.
val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
outDegOpt match {
case Some(outDeg) => outDeg
case None => 0 // No outDegree means zero outDegree
}
}
As you may have noticed, the multi-argument list of curry functions is used in the example above. A lthough we can write f(a) (b) as f (a, b), f (a, b) means that the type inference of b will not depend on a. Therefore, the user needs to provide type labels for defined functions.
val joinedGraph = graph.joinVertices(uniqueCosts,
(id: VertexID, oldCost: Double, extraCost: Double) => oldCost + extraCost)
A key step in the graph analysis task is to summarize information near each verte point. F or example, we might want to know the number of followers per user or the average age of each user's followers. Many iterative graph algorithms, such as PageRank, Shortest Path, and Connect, aggregate the properties of adjacent vertes multiple times.
To improve performance, the main aggregation
graph.mapReduceTriplets
to a new
graph.AggregateMessages
。
Although API changes are relatively small, we still provide guidance on transition.
The core aggregation operation in GraphX is
aggregateMessages
(ClassTags(A): VertexRDD (A).
This applies a
sendMsg
to each side triplet of the graph, and then
mergeMsg
aggregate these messages at its destination verte.
class Graph[VD, ED] {
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[Msg]
}
The
sendMsg
function is an
EdgeContext
type. I
t exposes the source and destination properties, as well as the edge properties, as well as functions
(sendToSrc and sendToDst) that
send
messages to the source and destination properties.
T
he
sendMsg
can be seen as a map function in the map-reduce process. T
he
mergeMsg
specifies two messages to the same verte and saves them as a message. Y
ou
mergeMsg
function as the reduce function in the map-reduce process.
ages
(ClassTag): The VertexRDD(A)) operation returns a
VertexRDD[Msg]
that contains an aggregated message (destination for each vertex).
Vertexes that do not receive messages are not included in
VertexRDD
In addition,
AggregateMessages
(ClassTags): VertexRDD (A)) has an optional
tripletFields
data is
EdgeContext
(such as the source vertex feature rather than the destination vertex feature).
tripletsFields
are
defined in Triplet
Fields.
tripletFields
is used to notify GraphX that only a portion
EdgeContext
is required to allow GraphX to select an optimized connection strategy. F
or example, if we want to calculate the average age of each user's followers, we only need the source field.
So we use
TripletFields.Src
that we just need the source field.
In the following example, we use
aggregateMessages
calculate the age of each user's older followers.
// Import random graph generation library
import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
triplet.sendToDst(1, triplet.srcAttr)
}
},
// Add counter and age
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))
aggregateMessages
works best when the message (and the total number of messages) is constant size (lists and connections are replaced with floats and additions).
In previous versions of GraphX, adjacent aggregations were performed using the operation mapReduceTriplets.
class Graph[VD, ED] {
def mapReduceTriplets[Msg](
map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
reduce: (Msg, Msg) => Msg)
: VertexRDD[Msg]
}
mapReduceTriplets
a user-defined map function on each triple, and then saves the message aggregated with the user-defined reduce function. H
owever, we found that the iterator returned by the user was expensive, which inhibited our ability to add additional optimizations, such as the numbering of local vertes.
ages
(ClassTag(A): VertexRDD (A)) exposes the sending of messages to the source and destination vertexes displayed by the triple group fields and functions.
Also, we removed bytecode detection and instead needed the user to indicate which fields of the 3D group actually needed.
The following code uses
mapReduceTriplets
val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
Iterator((triplet.dstId, "Hi"))
}
def reduceFun(a: Int, b: Int): Int = a + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)
The following code uses
aggregateMessages
val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
triplet.sendToDst("Hi")
}
def reduceFun(a: Int, b: Int): Int = a + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)
The most common aggregation task is to calculate the degree of vertes, that is, the number of edges adjacent to each verte. I n a graph, it is often necessary to know the in, out, and total degrees of the verte point. hOps class contains a collection of actions to calculate the degree of each verte. For example, the following example calculates the maximum in, out, and total degrees.
// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
In some cases, it may be easier to express calculations by collecting vertests adjacent to each vertest and their properties. This can be done simply by means of collectNeighbor Ids and collectNeighbors operations
class GraphOps[VD, ED] {
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}
These operations are very expensive because they require duplicate information and a lot of communication.
If possible, try to express the same calculation directly with the
aggregateMessages
operation.
In Spark, RDDs are not cached by default. T
o avoid duplicate calculations, we must cache them on display when we need to use them multiple times. T
he diagrams in GraphX do the same.
When you take advantage of the graph multiple times, make sure that
Graph.cache()
method.
In iterative calculations, non-caching may be necessary for optimal performance. B y default, cached RDDs and diagrams remain in memory until memory pressure forces them to be deleted in LRU order. F or iterative calculations, the intermediate results of previous iterations are populated into the cache. A lthough they will eventually be deleted, unwanted data stored in memory will slow garbage collection. O nly intermediate results are not required, and it is more efficient not to cache them. T his involves materializing one graph or RDD in each iteration without caching all the other datasets. O nly materialized datasets will be used in future iterations. H owever, because diagrams are made up of multiple RDDs, it is difficult to correctly not persist them. For iterative computing, we recommend using the Pregel API, which correctly does not persist intermediate results.