sparksql读取外部数据,存储到hive数据库的问题
来源:5-2 -A SQLContext的使用
慕斯卡4516047
2017-08-14
老师,您好:
我写了一个读取外部数据然后存储到hive的程序,有些疑惑需要您的帮忙,谢谢。
代码是打jar包放到服务器上用spark-submit方式执行,代码是读取服务器上“/home/hadoop/data/student.data”数据,然后在服务器上的hive上创建一个student表,并把student.data数据保存到student表里。代码如下:
object SaveDataToHive {
def main(args: Array[String]) {
val path = args(0)
println("============================= " + path + " =============================")
val spark = SparkSession.builder().appName("SaveDataToHive").getOrCreate()
val rdd = spark.sparkContext.textFile(path)
import spark.implicits._
val studentDF = rdd.map(_.split("\\|")).map(line => Student(line(0).toInt, line(1), line(2), line(3))).toDF()
studentDF.show(30, false)
studentDF.registerTempTable("table1")
val hiveContext = new HiveContext(spark.sparkContext)
hiveContext.sql("drop table if exists student")
hiveContext.sql("CREATE TABLE IF NOT EXISTS student(id string, name string, phone string,email string )")
hiveContext.sql("insert into student from table1")
hiveContext.table("student").show()
spark.stop()
}
case class Student(id:Int, name: String, phone: String, email: String)
}
启动脚本:
spark-submit \
--name SaveDataToHive \
--class com.imooc.spark.SaveDataToHive \
--jars /home/hadoop/software/mysql-connector-java-5.1.27-bin.jar \
--master local[2] \
/home/hadoop/lib/sql-1.0.jar \
/home/hadoop/data/student.data \
在服务器上执行脚本,运行到“hiveContext.sql("insert into student from table1")”报错,报错内容:
Exception in thread "main" org.apache.spark.sql.AnalysisException:
Hive support is required to insert into the following tables:
`default`.`student`
;;
'InsertIntoTable 'SimpleCatalogRelation default, CatalogTable(
Table: `default`.`student`
Created: Mon Aug 14 05:50:09 PDT 2017
Last Access: Wed Dec 31 15:59:59 PST 1969
Type: MANAGED
Schema: [StructField(id,StringType,true), StructField(name,StringType,true), StructField(phone,StringType,true), StructField(email,StringType,true)]
Provider: hive
Storage(Location: file:/home/hadoop/shell/spark-warehouse/student, InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), OverwriteOptions(false,Map()), false
+- SubqueryAlias table1, `table1`
+- SerializeFromObject [assertnotnull(input[0, com.imooc.spark.SaveDataToHive$Student, true], top level Product input object).id AS id#5, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.imooc.spark.SaveDataToHive$Student, true], top level Product input object).name, true) AS name#6, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.imooc.spark.SaveDataToHive$Student, true], top level Product input object).phone, true) AS phone#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.imooc.spark.SaveDataToHive$Student, true], top level Product input object).email, true) AS email#8]
+- ExternalRDD [obj#4]
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:383)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:699)
at com.imooc.spark.SaveDataToHive$.main(SaveDataToHive.scala:25)
at com.imooc.spark.SaveDataToHive.main(SaveDataToHive.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
然后我又修改代码如下:
object SaveDataToHive {
def main(args: Array[String]) {
val path = args(0)
// val path = "file:///home/hadoop/data/student.data"
println("============================= " + path + " =============================")
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val rdd = sc.textFile(path)
val hiveContext = new HiveContext(sc)
import sqlContext.implicits._
val studentDF = rdd.map(_.split("\\|")).map(line => Student(line(0).toInt, line(1), line(2), line(3))).toDF()
// studentDF.show(30, false)
studentDF.registerTempTable("table1")
hiveContext.sql("drop table if exists student")
hiveContext.sql("CREATE TABLE IF NOT EXISTS student(id string, name string, phone string,email string )")
hiveContext.sql("insert into student from table1")
hiveContext.table("student").show()
sc.stop()
}
case class Student(id:Int, name: String, phone: String, email: String)
}
这样执行脚本就执行正常了,我不太明白是不是因为创建sparkContext的方式不一样导致,这2种创建方式有什么区别?
2回答
-
df能saveastable方法不?
042017-08-14 -
慕斯卡4516047
提问者
2017-08-14
您是指studentDF对象么?我试了下,studentDF没有saveastable方法
00
相似问题