timeWindow 使用问题
来源:5-9 Transformation函数map和filter之Java实现
慕粉2110073833
2020-07-08
pk老师您好:
我在实践中遇到个问题请帮忙分析下,是我哪里有问题?
1、接收kafka数据
2、提取MySQL中的一些配置信息,整合map
(13820323689,1590845607000,128,39,450.0,0.9932432,-29.121622)
到这个环节可以正常工作
3、创建一个滑动窗口对之前的数据进行统计,加入这部分代码就不运行了,也没有提示错误
// 1、接收kafka数据
val data = env.addSource(consumer)
// 数据清洗过滤
val logData = data.map(x => {
val splits = x.split(" ")
val deviceNo = splits(0)
val timeStr = splits(1)
var time = 0l
try {
val sourceFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
time = sourceFormat.parse(timeStr).getTime
} catch {
case e: Exception => {
logger.error(s"time parse err : $timeStr", e.getMessage)
}
}
val oil = splits(2).toInt
val id = splits(3)
(deviceNo, time, oil, id)
}).filter(_._3 > 10)
.map(x => {
(x._1, x._2, x._3, x._4)
})
// 2.连接MySQL做了connect操作,补充MySQL中的配置信息
val mysqlData = env.addSource(new MySQLSource("select device_no,tank_vol,k,b from device_config"))
val connectedStream = logData.connect(mysqlData)
val coMapDataStream = connectedStream.flatMap(new CoFlatMapFunction[
(String, Long, Int, String),
mutable.HashMap[String, (Float, Float, Float)],
(String, Long, Int, String, Float, Float, Float)] {
var deviceConfig = mutable.HashMap[String, (Float, Float, Float)]()
// oilData
override def flatMap1(value: (String, Long, Int, String),
out: Collector[(String, Long, Int, String, Float, Float, Float)]): Unit = {
val config = deviceConfig.getOrElse(value._1, (0.0f, 0.0f, 0.0f))
out.collect((value._1, value._2, value._3, value._4, config._1, config._2, config._3))
}
// MySQL
override def flatMap2(value: mutable.HashMap[String, (Float, Float, Float)],
out: Collector[(String, Long, Int, String, Float, Float, Float)]): Unit = {
deviceConfig = value
}
})
// 整理后的数据,此处打印
// // (13820323689,1590845607000,128,39,450.0,0.9932432,-29.121622)
val resultData = coMapDataStream
.assignAscendingTimestamps(_._2)
.keyBy(0)
.timeWindow(Time.seconds(60 * 5), Time.seconds(60))
.apply(new WindowFunction[(String, Long, Int, String, Float, Float, Float), ArrayBuffer[(String, String, Float)], Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long, Int, String, Float, Float, Float)], out: Collector[ArrayBuffer[(String, String, Float)]]): Unit = {
val deviceNo = key.getField(0).toString
val max = input.iterator.maxBy(_._3)
val min = input.iterator.minBy(_._3)
val sumOil = input.iterator.foldLeft(0)(_ + _._3)
val avg = sumOil / input.iterator.size
// 定义准备收集的数据
val dataArray = ArrayBuffer[(String, String, Float)]()
var addOil = false
// 如果偏差值在某大于某一范围初步判定为加油
if ((max._3 - min._3) > 35) {
addOil = true
System.out.println("========识别加油点=========" + addOil + "============" + max + "====" + min + "===")
} else {
System.out.print(".")
}
val iterator = input.iterator
// 记录时间 求平均时提取一个最小的时间,滑动窗口 每次往后滑动一两个数据
val times = ArrayBuffer[Long]()
var str = new mutable.StringBuilder()
var k = 0.0f
var b = 0.0f
while (iterator.hasNext) {
val next = iterator.next()
val time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(next._2))
// 判断是否加油,有加油最大最小值保留 无加油启用
if (addOil) {
dataArray.append((next._1, time, next._3 * next._6 + next._7))
} else {
if (next._4 != min._4 || next._4 != max._4) {
dataArray.append((next._1, time, next._3 * next._6 + next._7))
}
}
times.append(next._2)
str.append(time + "," + next._3 + ";")
k = next._6
b = next._7
}
val time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(times.min))
// 没加油只返回一个平均值
if (!addOil) {
dataArray.clear()
dataArray.append((deviceNo, time, avg * k + b))
}
// println(str.toString())
// 转换输出数据
/**
* 第一个参数:设备号
* 第二个参数:时间
* 第三个参数:油位传感器采集值
*/
out.collect(dataArray)
}
})
resultData.print().setParallelism(1)
写回答
1回答
-
Michael_PK
2020-07-08
这个逻辑不清楚。 你打印出来那个地方有数据,我看你代码有这个assignAscendingTimestamps,理论上应该带水印的,那么你是否设置了根据eventtime进行处理了呢? 然后你map后出来的数据是否会触发window的执行呢?这个需要你debug去调试下了
122020-07-08
相似问题