checkpoint问题

来源:11-6 Mock数据之Kafka生产者代码日志生产开发

慕粉3334211

2019-07-27

PK哥,我想做一个能记住上次结果的wordcount统计,比如不断的输入单词”a“,会出现(a,1), (a,2), (a,3) …,但是,当我关闭再重启它又从头统计了,又出现了(a,1),我想要接着关闭之前统计。我已经设置了checkpoint和state。代码如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment

val topic = "test"
val properties = new Properties()
properties.setProperty(“bootstrap.servers”, “localhost:9092”)
properties.setProperty(“zookeeper.connect”, “localhost:2181”)
properties.setProperty(“group.id”, “test”)
val kafkaSource = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), properties)

val data = env.addSource(kafkaSource)

env.setStateBackend(new RocksDBStateBackend(“file:///Users/kewei.mao/app/tmp/checkpoint”))
env.enableCheckpointing(1000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setCheckpointTimeout(60000)
env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

val counts: DataStream[(String, Int)] = data
.flatMap(.toLowerCase.split(“W+”))
.filter(
.nonEmpty)
.map((_, 1))
.keyBy(0)
.sum(1)

env.execute(“Streaming WordCount”)

写回答

2回答

liangyuchong_712

2019-11-26

楼主,想知道你重启job的时候也是用上面的代码吗?还是说使用flink run -s 这样的命令来重启作业,如果是在idea中直接重启的代码的话?有没有使用什么特殊的方法来指定重启的时候使用哪个checkpoint文件呀

0
0

Michael_PK

2019-07-27

这种建议每次处理完保存起来,后续的可以进行累加

0
0

新一代大数据计算引擎 Flink从入门到实战

入行或转型大数据新姿势,多语言系统化讲解,极速入门Flink

969 学习 · 296 问题

查看课程