批量插入作业

来源:7-5 API操作之插入数据&删除表&数据查询

慕九州3016327

2020-09-20

经过半天研究,利用多线程实现
object kudu {
var Number=20
val ob =“aa"
def getNumber(): Int ={
Number
}
def deNumber(number:Int):Int={
Number=number-1
Number
}
def getOb():String={
ob
}
class insertTread(client: KuduClient, tableName: String,ThreadName:String) extends Runnable{
val table: KuduTable = client.openTable(tableName)
val session: KuduSession = client.newSession()
override def run(): Unit = {
while (deNumber(getNumber) > 11) {
val tem=getNumber()
synchronized(getOb())
if (tem>0){
val insert: Insert = table.newInsert()
val row: PartialRow = insert.getRow
row.addString(“word”, s"mengfansong_$getNumber”)
row.addInt(“cnd”, getNumber)
session.apply(insert)
println(ThreadName + “—” + tem)
}else{
println(“没了”)
}
}
}
}
def doubleInsert(): Unit ={
val KUDU_MASTARS="hadoop000"
val client: KuduClient = new KuduClient.KuduClientBuilder(KUDU_MASTARS).build()
val tableName="meng"
val threadPool:ExecutorService=Executors.newFixedThreadPool(5)
try {
for(i <- 0 to 2){
threadPool.execute(new insertTread(client, tableName, “ThreadName” + i))
}
}finally {
threadPool.shutdown()
}
}
def main(args: Array[String]): Unit = {
val KUDU_MASTARS="hadoop000"
val client: KuduClient = new KuduClient.KuduClientBuilder(KUDU_MASTARS).build()
val tableName="meng"
doubleInsert()
//creatTable(client,tableName)
//insertRows(client,tableName)
//deleteTable(client,tableName)
//query(client,tableName)
client.close()
}

已测试可以,请老师批阅。

写回答

1回答

Michael_PK

2020-09-20

这种实现方式可以的。但是你还可以在深究是否能以batch的方式插入呢

0
1
慕九州3016327
意思就是说,先不写入不执行apply操作,先把数据放到一个容器内,之后进行批量写入吗
2020-09-20
共1条回复

SparkSQL入门 整合Kudu实现广告业务数据分析

大数据工程师干货课程 带你从入门到实战掌握SparkSQL

535 学习 · 192 问题

查看课程