May 17, 2021 Spark Programming guide
GraphX exposes the RDD of the vertes and edges saved in the figure. H
owever, because GraphX contains vertes and edges that have optimized data structures, these data structures provide additional functionality. V
ertexRDD and
VertexRDD
and
EdgeRDD
In this chapter we will learn some useful features of them.
VertexRDD[A]
inherited from
RDD[(VertexID, A)]
and adds an additional limitation that
VertexID
can only occur once. I
n addition,
VertexRDD[A]
of a set of property types A. I
nternally, this is achieved by saving vertred properties to a reusable hash-map data structure. T
herefore, if two
VertexRDDs
VertexRDD
(e.g. through filter or mapValues), they can be connected for a fixed period of time without the need for hash evaluation.
To take advantage of this index data
VertexRDD
exposes additional features:
class VertexRDD[VD] extends RDD[(VertexID, VD)] {
// Filter the vertex set but preserves the internal index
def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
// Transform the values without changing the ids (preserves the internal index)
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
// Remove vertices from this set that appear in the other set
def diff(other: VertexRDD[VD]): VertexRDD[VD]
// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}
For example,
filter
operation returns a VertexRDD. T
he filter actually uses a
BitSet
so it can reuse indexes and retain the ability to make connections with other
VertexRDDs
S
imilarly,
mapValues
does
map
function to change the
VertexID
so the
HashMap
structure can be reused.
Both
leftJoin
and innerJoin can be used when connecting two VertexRDDs obtained from the same
hashmap
and using linear
innerJoin
point lookups for connection operations.
The
aggregateUsingIndex
operation is useful for efficiently building a new
VertexRDD
RDD[(VertexID, A)]
C
onceptually, if I construct a
VertexRDD[B]
a set of
VertexRDD[B]
is an overset of
RDD[(VertexID, A)]
we can reuse the index in the aggregation and subsequent index
RDD[(VertexID, A)]
.
For example:
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
EdgeRDD[ED]
inherited from
RDD[Edge[ED]]
and uses one of the various partitioning
policies defined in PartitionStrategy
to organize edges in a block partition.
In each partition, edge properties and adjacent structures are saved separately and can be reused to the maximum when property values change.
EdgeRDD
exposes three additional functions
// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Revere the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
In most applications, we find that EdgeRDD operations can be done either by graph operators or by defining operations in basic RDDs.