checkpoint问题
来源:11-6 Mock数据之Kafka生产者代码日志生产开发

慕粉3334211
2019-07-27
PK哥,我想做一个能记住上次结果的wordcount统计,比如不断的输入单词”a“,会出现(a,1), (a,2), (a,3) …,但是,当我关闭再重启它又从头统计了,又出现了(a,1),我想要接着关闭之前统计。我已经设置了checkpoint和state。代码如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val topic = "test"
val properties = new Properties()
properties.setProperty(“bootstrap.servers”, “localhost:9092”)
properties.setProperty(“zookeeper.connect”, “localhost:2181”)
properties.setProperty(“group.id”, “test”)
val kafkaSource = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), properties)
val data = env.addSource(kafkaSource)
env.setStateBackend(new RocksDBStateBackend(“file:///Users/kewei.mao/app/tmp/checkpoint”))
env.enableCheckpointing(1000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setCheckpointTimeout(60000)
env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
val counts: DataStream[(String, Int)] = data
.flatMap(.toLowerCase.split(“W+”))
.filter(.nonEmpty)
.map((_, 1))
.keyBy(0)
.sum(1)
env.execute(“Streaming WordCount”)
2回答
-
liangyuchong_712
2019-11-26
楼主,想知道你重启job的时候也是用上面的代码吗?还是说使用flink run -s 这样的命令来重启作业,如果是在idea中直接重启的代码的话?有没有使用什么特殊的方法来指定重启的时候使用哪个checkpoint文件呀
00 -
Michael_PK
2019-07-27
这种建议每次处理完保存起来,后续的可以进行累加
00
相似问题