求和出问题

来源:4-13 Transformation算子之keyBy

慕容3565349

2022-01-15

Exception in thread “main” org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<com.imooc.flink.transformation.Access>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)
at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:224)
at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.(SumAggregator.java:53)
at org.apache.flink.streaming.api.datastream.KeyedStream.sum(KeyedStream.java:782)
at com.imooc.flink.transformation.TransformationApp.keyBy(TransformationApp.java:49)
at com.imooc.flink.transformation.TransformationApp.main(TransformationApp.java:22)

代码:
public static void keyBy(StreamExecutionEnvironment env) {
DataStreamSource source = env.readTextFile(“data/access.log”);
SingleOutputStreamOperator mapStream = source.map(new MapFunction<String, Access>() {
@Override
public Access map(String value) throws Exception {
String[] splits = value.split(",");
Long time = Long.parseLong(splits[0].trim());
String domain = splits[1].trim();
double traffic = Double.parseDouble(splits[2].trim());
return new Access(time, domain, traffic);
}
});
//keyBy(“domain”):表示按照domain字段来进行分组
// mapStream.keyBy(“domain”).sum(“traffic”).print();
mapStream.keyBy(new KeySelector<Access, String>() {
@Override
public String getKey(Access value) throws Exception {
return value.getDomain();
}
}).sum(“traffic”).print();
}

Access.class
package com.imooc.flink.transformation;

public class Access {
private Long time;
private String domain;
private Double traffic;

public Access() {
}

public Access(Long time, String domain, double traffic) {
    this.time = time;
    this.domain = domain;
    this.traffic = traffic;
}

public Long getTime() {
    return time;
}

public void setTime(Long time) {
    this.time = time;
}

public String getDomain() {
    return domain;
}

public void setDomain(String domain) {
    this.domain = domain;
}

public double getTraffic() {
    return traffic;
}

public void setTraffic(Double traffic) {
    this.traffic = traffic;
}

@Override
public String toString() {
    return "Access{" +
            "time=" + time +
            ", domain='" + domain + '\'' +
            ", traffic=" + traffic +
            '}';
}

}

写回答

1回答

慕容3565349

提问者

2022-01-15

已解决,看官网解决

1
1
故渊7613667
请问一下怎么解决的,我也出现过这种问题,后来把sum改成reduce,自己重写了,但是不知道为什么sum会出错
2022-05-09
共1条回复

Flink+ClickHouse 玩转企业级实时大数据开发

已经在做大数据,Flink助力轻松提薪;尚未入行,让你弯道超车

1000 学习 · 225 问题

查看课程