求和出问题
来源:4-13 Transformation算子之keyBy

慕容3565349
2022-01-15
Exception in thread “main” org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<com.imooc.flink.transformation.Access>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)
at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:224)
at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.(SumAggregator.java:53)
at org.apache.flink.streaming.api.datastream.KeyedStream.sum(KeyedStream.java:782)
at com.imooc.flink.transformation.TransformationApp.keyBy(TransformationApp.java:49)
at com.imooc.flink.transformation.TransformationApp.main(TransformationApp.java:22)
代码:
public static void keyBy(StreamExecutionEnvironment env) {
DataStreamSource source = env.readTextFile(“data/access.log”);
SingleOutputStreamOperator mapStream = source.map(new MapFunction<String, Access>() {
@Override
public Access map(String value) throws Exception {
String[] splits = value.split(",");
Long time = Long.parseLong(splits[0].trim());
String domain = splits[1].trim();
double traffic = Double.parseDouble(splits[2].trim());
return new Access(time, domain, traffic);
}
});
//keyBy(“domain”):表示按照domain字段来进行分组
// mapStream.keyBy(“domain”).sum(“traffic”).print();
mapStream.keyBy(new KeySelector<Access, String>() {
@Override
public String getKey(Access value) throws Exception {
return value.getDomain();
}
}).sum(“traffic”).print();
}
Access.class
package com.imooc.flink.transformation;
public class Access {
private Long time;
private String domain;
private Double traffic;
public Access() {
}
public Access(Long time, String domain, double traffic) {
this.time = time;
this.domain = domain;
this.traffic = traffic;
}
public Long getTime() {
return time;
}
public void setTime(Long time) {
this.time = time;
}
public String getDomain() {
return domain;
}
public void setDomain(String domain) {
this.domain = domain;
}
public double getTraffic() {
return traffic;
}
public void setTraffic(Double traffic) {
this.traffic = traffic;
}
@Override
public String toString() {
return "Access{" +
"time=" + time +
", domain='" + domain + '\'' +
", traffic=" + traffic +
'}';
}
}
1回答
-
慕容3565349
提问者
2022-01-15
已解决,看官网解决
112022-05-09
相似问题