老师我想统计所有进来的数据条数,window设置5s,进行所有条数的累加是这样写吗
来源:8-2 -实战之updateStateByKey算子的使用

慕容128306
2019-09-27
val dstream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](Array(top), kafkaParams)
).map(_.value()).window(Seconds(5),Seconds(5))
//按照进来的每一条流进行实时统计(5s)一次
dstream.map(row => (row,1L)).reduceByKey(_+_).updateStateByKey[Int](updateFunction _)
ssc.start()
ssc.awaitTermination()
/**
* 当前数据去更新已有的或者老的数据
* @param currentValues 当前的
* @param preValues 后面的
*/
//定义了一个求和的函数
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Unit ={
val current = currentValues.sum
//因为累计加值所以有可能为空,如果为空的话则返回为none,所以给返回值为0
val pre = preValues.getOrElse(0)
Some(current + pre)
写回答
1回答
-
Michael_PK
2019-09-27
看起来是差不多这么写的。官网上update这个算子是有一段示例代码的,可以参考下
022019-09-27
相似问题