老师:RichMapFunction 这个函数能通俗解释一下作用么,为什么做计数器的时候要用到这个啊,还有其他的函数可以辅助完成计数器的功能吗
来源:4-34 基于Flink编程的计时器之Scala实现

passmefive
2019-06-03
val counterName="DataSetCounterApp"
val info = data.map(new RichMapFunctionString, String {
// step1:定义计算器
var counter = new LongCounter()
override def open(parameters: Configuration): Unit = {
//step2:注册计数器,参数1:计数器名,自定义,计数器类
getRuntimeContext.addAccumulator(counterName, counter)
}
override def map(value: String): String = {
// step3计数器 累加数字
counter.add(1)
value
}
})
info.setParallelism(4)
info.writeAsText("data/sink/counter", WriteMode.OVERWRITE)
//step4:根据计数名,获取计数结果
val jobReuslt = env.execute("DataSetCounterApp")
问题:RichMapFunction 这个函数能通俗解释一下作用么,为什么做计数器的时候要用到这个啊,还有其他的函数可以辅助完成计数器的功能吗
写回答
2回答
-
passmefive
提问者
2019-06-13
那第三个参数呢。应该如何传入什么样的参数。或者说应该根据什么依据传入参数呢
00 -
Michael_PK
2019-06-03
这是一个增强的方法,这里计数只是个例子
00
相似问题