老师:RichMapFunction 这个函数能通俗解释一下作用么,为什么做计数器的时候要用到这个啊,还有其他的函数可以辅助完成计数器的功能吗

来源:4-34 基于Flink编程的计时器之Scala实现

passmefive

2019-06-03

val counterName="DataSetCounterApp"
val info = data.map(new RichMapFunctionString, String {
// step1:定义计算器
var counter = new LongCounter()

  override def open(parameters: Configuration): Unit = {
    //step2:注册计数器,参数1:计数器名,自定义,计数器类
    getRuntimeContext.addAccumulator(counterName, counter)
  }
  override def map(value: String): String = {
    // step3计数器 累加数字
    counter.add(1)
    value
  }
})
info.setParallelism(4)
info.writeAsText("data/sink/counter", WriteMode.OVERWRITE)
//step4:根据计数名,获取计数结果
val jobReuslt = env.execute("DataSetCounterApp")

问题:RichMapFunction 这个函数能通俗解释一下作用么,为什么做计数器的时候要用到这个啊,还有其他的函数可以辅助完成计数器的功能吗

写回答

2回答

passmefive

提问者

2019-06-13

那第三个参数呢。应该如何传入什么样的参数。或者说应该根据什么依据传入参数呢

0
0

Michael_PK

2019-06-03

这是一个增强的方法,这里计数只是个例子

0
0

新一代大数据计算引擎 Flink从入门到实战

入行或转型大数据新姿势,多语言系统化讲解,极速入门Flink

969 学习 · 296 问题

查看课程