自定义watermark生成器如何使用

来源:7-16 Flink watermark概述

bking3629688

2021-04-29

/**
 * 该 watermark 生成器可以覆盖的场景是:数据源在一定程度上乱序。
 * 即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。
 */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxOutOfOrderness = 3500L // 3.5 秒

    var currentMaxTimestamp: Long = _

    override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = {
        currentMaxTimestamp = max(eventTimestamp, currentMaxTimestamp)
    }

    override def onPeriodicEmit(): Unit = {
        // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }
}

官方文档中给出了一个自定义watermark生成器的方法,请问这个要怎么调用呢?

input.assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessGenerator)

会报错Cannot resolve overloaded method ‘assignTimestampsAndWatermarks’

写回答

1回答

Michael_PK

2021-04-30

你要看看入参传什么呢,你的代码里面java new东西都不需要括号的吗?官网上是给参考的,具体的还是要适当调整的

0
2
Michael_PK
回复
bking3629688
你这代码idea不报错吗
2021-04-30
共2条回复

新一代大数据计算引擎 Flink从入门到实战

入行或转型大数据新姿势,多语言系统化讲解,极速入门Flink

969 学习 · 296 问题

查看课程