May 17, 2021 Spark Programming guide
Spark supports two ways to convert existing RDDs to SchemaRDDs. T he first method uses reflection to infer patterns (schemas) that contain RDDs of a particular object type. As you write the spark program, when you already know the patterns, this reflective-based approach can make the code cleaner and the program work better.
The second way to create SchemaRDDs is through a programming interface that allows you to construct a pattern and then use it on the existing RDDs. Although this approach is more lengthy, it allows you to construct SchemaRDDs without knowing the columns and the type of columns before the run time.
Spark SQL's Scala interface automatically converts RDDs containing sample classes to SchemaRDD. This sample class defines the pattern of the table.
The name of the parameter given to the sample class is read by reflection and then as the name of the column. S ample classes can be nested or contain complex types such as sequences or arrays. T his RDD can be implicitly converted into a SchemaRDD and then registered as a table. Tables can be used in subsequent sql statements.
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
A SchemaRDD can be created in three steps when the sample class cannot be determined in advance (for example, the structure of the record is an encoded string, or a collection of text is parsed and different fields are projected onto different users).
StructType
matches the row structure of the RDD created in the first step
applySchema
method
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Import Spark SQL data types and Row.
import org.apache.spark.sql._
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
// Register the SchemaRDD as a table.
peopleSchemaRDD.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
results.map(t => "Name: " + t(0)).collect().foreach(println)