flink1.9版直接运行出错

来源:2-8 Flink批处理应用Java开发之功能实现

杰夫斯基

2019-10-31

使用了1.9版本的Flink模板来创建项目的。
根据本课学习的内容,运行代码如下:

    public static void main(String[] args) throws Exception{

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> input = env.readTextFile("file:///Users/jeff/projects/study/flink-train-java/test.txt");
        input.print();
        input.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] tokens = s.toLowerCase().split(" ");
                for (String token: tokens){
                    if (token.length() > 0) {
                        collector.collect(new Tuple2<String, Integer>(token, 1));
                    }

                }
            }
        }).groupBy(0).sum(1).print();
        env.execute("Flink Batch Java API Skeleton");

    }

执行该Main函数时,并没有输出想要的结果,并且程序不会自动停止,而是处于一个运行着的状态。
翻了一下日志,有报一个错:
00:42:34,972 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource (at main(BatchWCJavaApp.java:12) (org.apache.flink.api.java.io.TextInputFormat)) (1/8) (98a5cbecb493ed76e173902afdfaad42) switched from CREATED to SCHEDULED.
00:42:34,980 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{858aa7d798c6f34199c63394476feacd}]

如何解决这个问題?

写回答

2回答

杰夫斯基

提问者

2019-11-10

Java运行版本问題,虽然是Java8,但是几年前的环境,手动升级到java8的最新版本u231后,问題就解决了。

1
0

Michael_PK

2019-10-31

请先使用上课的版本,不同版本有区别。另外请提供error日志

0
11
Michael_PK
回复
杰夫斯基
换个环境试试吧
2019-10-31
共11条回复

新一代大数据计算引擎 Flink从入门到实战

入行或转型大数据新姿势,多语言系统化讲解,极速入门Flink

969 学习 · 296 问题

查看课程