ListState 搞了三小时 没搞出来 实在看不出问题
来源:9-23 【任务题】使用ListState实现求平均数

qq_无妄_3
2021-07-22
sum.get() iterator一直没有值;不知道为什么。
package com.shentu.state;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import java.util.Iterator;
public class AvgWithListState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private transient ListState<Tuple2<Long,Long>> sum;
@Override
public void open(Configuration parameters) {
ListStateDescriptor<Tuple2<Long, Long>> listStateDescriptor = new ListStateDescriptor<>(
"average",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
})
);
sum = getRuntimeContext().getListState(listStateDescriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
Iterator<Tuple2<Long, Long>> iterator = sum.get().iterator();
Long countNum=0L;
while (iterator.hasNext()){
countNum+=1;
}
if(countNum==2){
Long count = 0L;
Long sum2 = 0L;
while (iterator.hasNext()){
Tuple2<Long, Long> next = iterator.next();
count+=1;
sum2+=next.f1;
}
out.collect(new Tuple2<>(input.f0, sum2/count));
sum.clear();
}
}
}
package com.shentu.state;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TestStreamState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(Tuple2.of(1L,3L),Tuple2.of(1L, 5L), Tuple2.of(1L, 7L)
, Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)).keyBy(value -> value.f0)
.flatMap(new AvgWithListState()).print();
env.execute("TestStreamState");
}
}
输出为空
写回答
1回答
-
两个思路:
1)拿你的代码和git上提供的代码找个对比工具对比下,看看哪里的问题。 这种方式仅供参考
2)建议的方式:加断点,以debug的方式运行。
032021-07-23
相似问题