老师请问用flink本地跑没问题,在集群上跑就报空指针,加了判断还是有,详细报错如下:

来源:11-19 完成两个流关联的数据清洗功能

小蜗牛快点儿跑

2020-09-10

用flink的时候,有咩有遇到在本地跑是正常没问题的,在集群上跑的时候就报错如下:
java.lang.NullPointerException
at com.datamysql.program.CountDao.open(WsgCount.scala:123)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)

写回答

2回答

小蜗牛快点儿跑

提问者

2020-09-10

val PlayerUid = WarDataStream.map(data=>((data.strPlayerUid,"player_num"),1L)).keyBy(_._1)
  .timeWindow(Time.minutes(5)).sum(1)
if (PlayerUid != null){
   PlayerUid.addSink(new CountDao())
}


0
1
Michael_PK
123行代码,加上日志,判断可能的对象是否是空就行了
2020-09-10
共1条回复

Michael_PK

2020-09-10

空指针异常,肯定是你的代码open里面没有真正获取到对象,随意导致后续该对象的方法调用时出现的npe异常了。根据异常信息直接定位到你的代码,你可以加个日志把空指针的对象输出就知道肯定没有获取到对象了

0
1
小蜗牛快点儿跑
老师我的是流数据进来的,可能会没有数据,可是我设置了判断空值 val PlayerUid = WarDataStream.map(data=>((data.strPlayerUid,"player_num"),1L)).keyBy(_._1) .timeWindow(Time.minutes(5)).sum(1) if (PlayerUid != null){ PlayerUid.addSink(new CountDao()) } open的是MySQL jdbc 代码如下: try { classOf[com.mysql.jdbc.Driver] conn = DriverManager.getConnection(mysql_conn, mysql_user, mysql_pass) insertStmt = conn.prepareStatement("INSERT into count SQL") upavgStmt = conn.prepareStatement("update count SQL") } catch { case e: Exception => e.printStackTrace() } finally { if (conn == null) { conn.close() } }
2020-09-10
共1条回复

新一代大数据计算引擎 Flink从入门到实战

入行或转型大数据新姿势,多语言系统化讲解,极速入门Flink

969 学习 · 296 问题

查看课程