May 17, 2021 Spark Programming guide
GraphX includes a set of algorithms to simplify analysis tasks.
These algorithms are
org.apache.spark.graphx.lib
and can be accessed directly.
PageRank measures the importance of each verte in a graph, assuming that an edge from u to v represents the importance label of v. F or example, a Twitter user is powdered by many others, and the user ranks highly. G raphX comes with static and dynamic PageRank implementation methods, which are found in PageRank object. S tatic PageRank runs a fixed number of iterations, while dynamic PageRank runs until convergence. [ GraphOps] () Allows these algorithms to be called directly as methods on the graph.
GraphX contains an example of a social network dataset where we can run PageRank. T
he user set is
graphx/data/users.txt
the relationship between the users is in the
graphx/data/followers.txt
data.
We calculate PageRank for each user using the following method.
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("graphx/data/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))
The connectivity algorithm labels each connection in the diagram with id, using the id of the vertest point with the smallest serial number in the connecting body as the id of the connecting body. F or example, in a social network, connectivity can be approximated as a cluster. GraphX includes an algorithmic implementation in ConnectedComponents object, and we calculate the connectivity in the social network dataset using the following method.
/ Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
// Join the connected components with the usernames
val users = sc.textFile("graphx/data/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
case (id, (username, cc)) => (username, cc)
}
// Print the result
println(ccByUsername.collect().mkString("\n"))
When a vertest has two adjacent vertes and an edge between adjacent vertes, the vertest is part of a triangle. G
raphX
implements a triangular
counting algorithm in TriangleCount object, which calculates the number of triangles passing through each verte.
It is important to note that when calculating the triangular count of social network datasets, The direction of the edges required by
TriangleCount
is the direction of the specification (srcId slt; dstId), and the
Graph.partitionBy
// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("graphx/data/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
(username, tc)
}
// Print the result
println(triCountByUsername.collect().mkString("\n"))