在spark中使用for循环会不会对性能造成影响

来源:5-5 将统计结果写入到MySQL中

孤独观测者

2020-08-11

在spark中使用for循环会不会对性能造成影响

写回答

2回答

Michael_PK

2020-08-11

举个例子看看

0
3
Michael_PK
回复
孤独观测者
groupByKey().collect() 这种方法使用的时候要注意,因为collect是将所有数据返回到driver的,你driver是否能抗的住,这是个问题
2020-08-11
共3条回复

孤独观测者

提问者

2020-08-11

hbaseRDD.filter(x =>{
 val time = Bytes.toString(x._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("time")))
 time > "2020-08-11T00:00:00"
}).map(x => {
 // 根据mac来对数据进行分组
 val id = Bytes.toString(x._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("id")))
 val mac = Bytes.toString(x._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("mac")))
 val time = Bytes.toString(x._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("time")))
 val companyId = Bytes.toString(x._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("companyId")))

 (mac, Bike(mac, time, id, companyId))
}).groupByKey().collect().map(x => {
 //得到同一单车mac下所有的Bike
 val mac = x._1
 val value:Iterable[Bike] = x._2
 var list:ListBuffer[Bike] = new ListBuffer[Bike]()

 var tmp:String = value.iterator.next().id
 list+=value.iterator.next()
 println("单车:"+mac)

 for (elem <- value) {

   println("    时间: "+ elem.time+",单车轨迹"+elem.id)
   if(elem.id != tmp){
     list+=elem
     tmp = elem.id
   }
 }
 println("测试:"+ list)
 list
})

0
4
孤独观测者
回复
Michael_PK
算子用的还不熟练,很多业务不知道用算子怎么处理。举个例子 (id='1',mac='aa',time='2020-08-12 10:10:10'),(id='1',mac='aa',time='2020-08-12 10:20:10'), (id='1',mac='aa',time='2020-08-12 10:20:15'),(id='2',mac='aa',time='2020-08-12 10:30:10'),(id='1',mac='aa',time='2020-08-12 10:40:10'). 我想根据id去重,最后结果是 (id='1',mac='aa',time='2020-08-12 10:10:10'),(id='2',mac='aa',time='2020-08-12 10:30:10'),(id='1',mac='aa',time='2020-08-12 10:40:10'). 如果用算子进行,去重,我只能做到保留 (id='1',mac='aa',time='2020-08-12 10:10:10'),(id='2',mac='aa',time='2020-08-12 10:30:10')。 对于这种处理,老师有什么思路吗
2020-08-12
共4条回复

Spark进阶 大数据离线与实时项目实战

大数据生态圈实用框架(Spark/Hbase/Redis/Hadoop)整合应用及调优

700 学习 · 190 问题

查看课程