老师我想统计所有进来的数据条数,window设置5s,进行所有条数的累加是这样写吗

来源:8-2 -实战之updateStateByKey算子的使用

慕容128306

2019-09-27

    val dstream = KafkaUtils.createDirectStream[String, String](
        ssc,
        PreferConsistent,
        Subscribe[String, String](Array(top), kafkaParams)
    ).map(_.value()).window(Seconds(5),Seconds(5))
    //按照进来的每一条流进行实时统计(5s)一次

   dstream.map(row => (row,1L)).reduceByKey(_+_).updateStateByKey[Int](updateFunction _)
    ssc.start()
    ssc.awaitTermination()
/**
  * 当前数据去更新已有的或者老的数据
  * @param currentValues 当前的
  * @param preValues 后面的
  */
//定义了一个求和的函数
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Unit ={
    val current = currentValues.sum
    //因为累计加值所以有可能为空,如果为空的话则返回为none,所以给返回值为0
    val pre = preValues.getOrElse(0)
    Some(current + pre)
写回答

1回答

Michael_PK

2019-09-27

看起来是差不多这么写的。官网上update这个算子是有一段示例代码的,可以参考下

0
2
Michael_PK
回复
慕容128306
你先把代码飘红的地方解决掉,先保证你的代码能跑起来,现在飘红,编译都过不去呢。
2019-09-27
共2条回复

Spark Streaming实时流处理项目实战

Flume+Kafka+Spark Streaming 构建通用实时流处理平台

1404 学习 · 571 问题

查看课程