Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

Spark GraphX Vertes and Edge RDDs


May 17, 2021 Spark Programming guide


Table of contents


Spark GraphX Vertes and Edge RDDs

GraphX exposes the RDD of the vertes and edges saved in the figure. H owever, because GraphX contains vertes and edges that have optimized data structures, these data structures provide additional functionality. V ertexRDD and VertexRDD and EdgeRDD In this chapter we will learn some useful features of them.

VertexRDDs

VertexRDD[A] inherited from RDD[(VertexID, A)] and adds an additional limitation that VertexID can only occur once. I n addition, VertexRDD[A] of a set of property types A. I nternally, this is achieved by saving vertred properties to a reusable hash-map data structure. T herefore, if two VertexRDDs VertexRDD (e.g. through filter or mapValues), they can be connected for a fixed period of time without the need for hash evaluation. To take advantage of this index data VertexRDD exposes additional features:

class VertexRDD[VD] extends RDD[(VertexID, VD)] {
  // Filter the vertex set but preserves the internal index
  def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
  // Transform the values without changing the ids (preserves the internal index)
  def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
  def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
  // Remove vertices from this set that appear in the other set
  def diff(other: VertexRDD[VD]): VertexRDD[VD]
  // Join operators that take advantage of the internal indexing to accelerate joins (substantially)
  def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
  def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
  // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
  def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}

For example, filter operation returns a VertexRDD. T he filter actually uses a BitSet so it can reuse indexes and retain the ability to make connections with other VertexRDDs S imilarly, mapValues does map function to change the VertexID so the HashMap structure can be reused. Both leftJoin and innerJoin can be used when connecting two VertexRDDs obtained from the same hashmap and using linear innerJoin point lookups for connection operations.

The aggregateUsingIndex operation is useful for efficiently building a new VertexRDD RDD[(VertexID, A)] C onceptually, if I construct a VertexRDD[B] a set of VertexRDD[B] is an overset of RDD[(VertexID, A)] we can reuse the index in the aggregation and subsequent index RDD[(VertexID, A)] . For example:

val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)

EdgeRDDs

EdgeRDD[ED] inherited from RDD[Edge[ED]] and uses one of the various partitioning policies defined in PartitionStrategy to organize edges in a block partition. In each partition, edge properties and adjacent structures are saved separately and can be reused to the maximum when property values change.

EdgeRDD exposes three additional functions

// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Revere the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]

In most applications, we find that EdgeRDD operations can be done either by graph operators or by defining operations in basic RDDs.