sum出错
来源:6-15 功能二实现之统计分析及入库

begin_0002
2022-03-15
Exception in thread “main” org.apache.flink.api.common.typeutils.CompositeTypeInvalidFieldReferenceException:CannotreferencefieldbypositiononGenericType<scala.Tuple3>Referencingafieldbypositionissupportedontuples,caseclasses,andarrays.Additionally,youcanselectthe0thfieldofaprimitive/basictype(e.g.int).atorg.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:93)atorg.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:41)atorg.apache.flink.streaming.api.datastream.KeyedStream.sum(KeyedStream.java:768)atcom.hnyd.flink.app.ProvinceUserCntAppInvalidFieldReferenceException: Cannot reference field by position on GenericType<scala.Tuple3>Referencing a field by position is supported on tuples, case classes, and arrays. Additionally, you can select the 0th field of a primitive/basic type (e.g. int).
at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:93)
at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:41)
at org.apache.flink.streaming.api.datastream.KeyedStream.sum(KeyedStream.java:768)
at com.hnyd.flink.app.ProvinceUserCntAppInvalidFieldReferenceException:CannotreferencefieldbypositiononGenericType<scala.Tuple3>Referencingafieldbypositionissupportedontuples,caseclasses,andarrays.Additionally,youcanselectthe0thfieldofaprimitive/basictype(e.g.int).atorg.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:93)atorg.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:41)atorg.apache.flink.streaming.api.datastream.KeyedStream.sum(KeyedStream.java:768)atcom.hnyd.flink.app.ProvinceUserCntApp.main(ProvinceUserCntApp.scala:49)
at com.hnyd.flink.app.ProvinceUserCntApp.main(ProvinceUserCntApp.scala)
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment()
val source: DataStream[String] = env.readTextFile("data/access.log")
val dsAccess: DataStream[Access] = source.map(new MapFunction[String, Access] {
override def map(value: String): Access = {
val access = JSON.parseObject(value, classOf[Access])
access
}
}).filter(new FilterFunction[Access] {
override def filter(value: Access): Boolean = {
value != null
}
}).filter(new FilterFunction[Access] {
override def filter(value: Access): Boolean = {
"startup".equals(value.getEvent)
}
}).map(new GaoDelLocMapFunction())
val dsAccessProvince: DataStream[(String, Int, Int)] = dsAccess.map(new MapFunction[Access, (String, Int, Int)] {
override def map(value: Access): (String, Int, Int) = {
(value.getProvince, value.getNu, 1)
}
})
dsAccessProvince.keyBy(new KeySelector[(String,Int,Int),(String,Int)] {
override def getKey(value: (String, Int, Int)): (String,Int) = {
(value._1,value._2)
}
}).sum(2)
.print("省份维度统计新老客户:").setParallelism(1)
env.execute("province")
}
package com.hnyd.flink.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hnyd.flink.domain.Access;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
public class GaoDelLocMapFunction extends RichMapFunction<Access,Access> {
CloseableHttpClient client =null;
@Override
public void close() throws Exception {
client.close();
}
@Override
public void open(Configuration parameters) throws Exception {
client = HttpClients.createDefault();
}
@Override
public Access map(Access value) throws Exception {
String key="972608fa6a7ffb4219a204ebf4ba6e45";
String url=String.format("https://restapi.amap.com/v3/ip?ip=%s&output=json&key=%s",value.getIp(),key);
HttpGet httpGet = new HttpGet(url);
CloseableHttpResponse response = client.execute(httpGet);
HttpEntity entity = response.getEntity();
String result = EntityUtils.toString(entity,"utf-8");
JSONObject jsonObject = JSON.parseObject(result);
String province = jsonObject.get("province").toString();
String city = jsonObject.getString("city");
value.setProvince(province);
value.setCity(city);
return value;
}
}
1回答
-
Michael_PK
2022-03-18
错误信息截图贴下,上面的堆栈信息,密密麻麻的,看不了
00
相似问题