RDD与Dataframe相互转化的课时一按照老师给的代码在IDE里面run会报如下错误,该怎么处理?如果打包放在机器上运行就不会报错
来源:6-6 -DataFrame与RDD互操作方式一
人唯优2018
2017-08-28
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD
3回答
-
Michael_PK
2017-08-29
你仔细对比和上课的代码一致吗?肯定不是,自己仔细对比下!
022017-08-29 -
人唯优2018
提问者
2017-08-29
package com.immoc.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession, types}
/**
* DataFrame与RDD的互操作
*/
object DataFrameRDDApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("DataFrameRDDApp").master("spark://master:7077").getOrCreate()
//inferReflection(spark)
program(spark)
spark.stop()
}
private def program (spark: SparkSession) = {
val rdd = spark.sparkContext.textFile("file:///usr/local/app/spark/examples/src/main/resources/info.txt")
val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))
val structType = StructType(Array(StructField("id",IntegerType, true),
StructField("name",StringType, true),
StructField("age",IntegerType, true)
))
val infoDF = spark.createDataFrame(infoRDD,structType)
infoDF.printSchema()
infoDF.show()
//通过df的api进行操作
infoDF.filter(infoDF.col("age") > 30).show
//通过sql的方式进行操作
infoDF.createOrReplaceTempView("infos")
spark.sql("select * from infos where age > 30").show()
}
private def inferReflection(spark: SparkSession) = {
//RDD ===> DataFrame
val rdd = spark.sparkContext.textFile("file:///usr/local/app/spark/examples/src/main/resources/info.txt")
//需要导入隐式转换
import spark.implicits._
val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()
infoDF.show()
}
case class Info(id:Int, name:String, age:Int)
}
00 -
Michael_PK
2017-08-28
贴上你的代码
012017-08-29
相似问题