table.incrementColumnValue这个函数的作用是什么?不太理解

来源:7-18 需求三功能实现

BillyWebber

2021-01-04

logStream.map(x => {
  ((x._1,x._3), x._2)
}).reduceByKey(_+_)
  .foreachRDD(rdd => {
    rdd.foreachPartition(partition => {
      val table = HBaseClient.getTable("access_user_hour")
      partition.foreach(x => {
        table.incrementColumnValue(
          (x._1._1+"_"+x._1._2).getBytes,
          "o".getBytes,
          "time".getBytes,
          x._2
        )
      })
      table.close()
    })
  })

课程7-18的这段代码里面,视频上说下一个批次也会存在同样key的值,因此需要追加进去,您说的追加是什么意思?是覆盖掉之前那个批次的同样key的value值吗?

写回答

1回答

Michael_PK

2021-01-04

根据库里存的已有值进行increment操作

0
3
Michael_PK
回复
BillyWebber
假设a列原来有值是2,又来了3,就加起来是5
2021-01-04
共3条回复

Spark3实时处理-Streaming+StructuredStreaming实战

实战Spark3实时处理,掌握两套企业级处理方案

340 学习 · 238 问题

查看课程