ListState 搞了三小时 没搞出来 实在看不出问题

来源:9-23 【任务题】使用ListState实现求平均数

qq_无妄_3

2021-07-22

sum.get() iterator一直没有值;不知道为什么。

package com.shentu.state;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

import java.util.Iterator;

public class AvgWithListState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    private transient ListState<Tuple2<Long,Long>> sum;

    @Override
    public void open(Configuration parameters) {
        ListStateDescriptor<Tuple2<Long, Long>> listStateDescriptor = new ListStateDescriptor<>(
                "average",
                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
                })
        );
        sum = getRuntimeContext().getListState(listStateDescriptor);
    }

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        Iterator<Tuple2<Long, Long>> iterator = sum.get().iterator();

        Long countNum=0L;

        while (iterator.hasNext()){
            countNum+=1;
        }

        if(countNum==2){
            Long count = 0L;
            Long sum2 = 0L;

            while (iterator.hasNext()){
                Tuple2<Long, Long> next = iterator.next();
                count+=1;
                sum2+=next.f1;
            }

            out.collect(new Tuple2<>(input.f0, sum2/count));
            sum.clear();
        }


    }

}

package com.shentu.state;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TestStreamState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.fromElements(Tuple2.of(1L,3L),Tuple2.of(1L, 5L), Tuple2.of(1L, 7L)
                , Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)).keyBy(value -> value.f0)
                .flatMap(new AvgWithListState()).print();

        env.execute("TestStreamState");
    }
}

输出为空

写回答

1回答

Michael_PK

2021-07-23

两个思路:

1)拿你的代码和git上提供的代码找个对比工具对比下,看看哪里的问题。 这种方式仅供参考

2)建议的方式:加断点,以debug的方式运行。

0
3
qq_无妄_3
已经解决了;感谢老师;debug发现并未向state里保存数据
2021-07-23
共3条回复

Flink+ClickHouse 玩转企业级实时大数据开发

已经在做大数据,Flink助力轻松提薪;尚未入行,让你弯道超车

1000 学习 · 225 问题

查看课程