为何checkpoint data size大小 持续增长?
来源:9-6 Flink Checkpoint机制
幕布斯7407014
2025-02-17
.keyBy(new TwoPhaseSelector())
.process(new LogKeyedProcessFunction())
public class LogKeyedProcessFunction extends KeyedProcessFunction<Tuple3<String,String,String>, CollectorGroup<SlowSqlBody>, CollectorGroup<SlowSqlBody>> {
private final static Logger logger = LoggerFactory.getLogger(LogKeyedProcessFunction.class);
private final int phaseWait;
private volatile ListState<CollectorGroup<SlowSqlBody>> listState;
@Override
public void open(Configuration parameters) throws Exception {
ListStateDescriptor<CollectorGroup<SlowSqlBody>> slowSqlCollectorGroupState = new ListStateDescriptor<>("collect-group-list", TypeInformation.of(new TypeHint<CollectorGroup<SlowSqlBody>>() {}));
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(120))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
slowSqlCollectorGroupState.enableTimeToLive(ttlConfig);
listState = getRuntimeContext().getListState(slowSqlCollectorGroupState);
}
@Override
public void processElement(CollectorGroup<SlowSqlBody> value, KeyedProcessFunction<Tuple3<String,String,String>, CollectorGroup<SlowSqlBody>, CollectorGroup<SlowSqlBody>>.Context ctx, Collector<CollectorGroup<SlowSqlBody>> out) throws Exception {
listState.add(value);
ctx.timerService().registerProcessingTimeTimer(value.getTimestamp()+phaseWait);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Tuple3<String,String,String>, CollectorGroup<SlowSqlBody>, CollectorGroup<SlowSqlBody>>.OnTimerContext ctx, Collector<CollectorGroup<SlowSqlBody>> out) throws Exception {
try {
Tuple3<String,String,String> currentKey = ctx.getCurrentKey();
Iterable<CollectorGroup<SlowSqlBody>> collectorGroups = listState.get();
CollectorGroup<SlowSqlBody> collectorGroup = null;
int index=0;
for (CollectorGroup<SlowSqlBody> group : collectorGroups) {
if(collectorGroup == null){
collectorGroup = group;
}else{
collectorGroup = collectorGroup.merge(group);
}
index++;
}
if(collectorGroup!=null){
out.collect(collectorGroup);
}
}finally {
listState.clear();
}
}
}

注册的定时任务是2分钟执行一次
现在发现 LogKeyedProcessFunction 这个算子的checkpoint data size大小 从几kb 持续慢慢涨到几十Mb , 感觉有点不正常。状态做了清空,为何还一直涨,没有停下来的意思
1回答
-
Michael_PK
2025-02-20
在LogKeyedProcessFunction中,虽然你在onTimer方法中调用了listState.clear()来清空状态,但状态大小仍然持续增长,可能有以下几个原因:
1. 状态未及时清理
虽然你在onTimer中调用了listState.clear(),但可能由于某些原因,状态并没有被及时清理。例如,定时器触发的时间间隔可能不足以让状态在每次触发时都被清理干净。
你可以通过在 `onTimer` 方法中添加日志来确认状态是否被正确清理。
2. 状态 TTL 配置问题
你为状态配置了 TTL(Time-To-Live),并且设置为 120 秒。TTL 的清理是异步的,可能不会立即生效。即使你调用了 listState.clear(),状态的物理删除可能会延迟。
你可以尝试调整 TTL 的配置,或者手动触发状态的清理。
3. 状态后端配置问题
如果你使用的是 RocksDB 状态后端,RocksDB 的压缩和清理机制可能会导致状态数据的物理删除延迟。你可以检查 RocksDB 的配置,确保它能够及时清理过期数据。
你可以尝试调整 RocksDB 的压缩策略,或者增加压缩的频率。
4. 定时器未正确触发
如果定时器没有正确触发,状态可能不会被清理。你可以通过日志确认定时器是否按预期触发。
你可以尝试在processElement方法中添加日志,确认定时器是否正确注册。
5. 状态合并导致的状态增长
在onTimer方法中,你通过collectorGroup.merge(group)合并了多个CollectorGroup对象。如果合并操作导致状态对象变大,可能会导致状态大小持续增长。
你可以检查merge方法的实现,确保它不会导致状态对象的无限增长。
6. 状态序列化问题
如果CollectorGroup<SlowSqlBody>的序列化方式有问题,可能会导致状态大小异常增长。你可以检查CollectorGroup和SlowSqlBody的序列化实现,确保它们不会导致状态大小异常增长。
7. 状态分区问题
如果你的 key 空间非常大,可能会导致状态分区过多,从而导致状态大小增长。你可以尝试减少 key 空间的大小,或者调整状态分区的配置。
综上所述:参考解决方案如下
1. 增加日志:在onTimer方法中添加日志,确认状态是否被正确清理。
2. 调整 TTL 配置:尝试调整 TTL 的配置,或者手动触发状态的清理。
3. 检查状态后端配置:检查 RocksDB 的配置,确保它能够及时清理过期数据。
4. 检查定时器触发:确认定时器是否正确触发。
5. 检查状态合并逻辑:确保merge方法不会导致状态对象的无限增长。
6. 检查序列化实现:确保CollectorGroup和SlowSqlBody的序列化实现没有问题。
7. 减少 key 空间:尝试减少key空间的大小,或者调整状态分区的配置。
032025-02-26
相似问题