timeWindow 使用问题

来源:5-9 Transformation函数map和filter之Java实现

慕粉2110073833

2020-07-08

pk老师您好:
我在实践中遇到个问题请帮忙分析下,是我哪里有问题?
1、接收kafka数据
2、提取MySQL中的一些配置信息,整合map
(13820323689,1590845607000,128,39,450.0,0.9932432,-29.121622)
到这个环节可以正常工作
3、创建一个滑动窗口对之前的数据进行统计,加入这部分代码就不运行了,也没有提示错误
图片描述

   // 1、接收kafka数据
    val data = env.addSource(consumer)
    // 数据清洗过滤
    val logData = data.map(x => {
      val splits = x.split("	")
      val deviceNo = splits(0)
      val timeStr = splits(1)
      var time = 0l
      try {
        val sourceFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        time = sourceFormat.parse(timeStr).getTime
      } catch {
        case e: Exception => {
          logger.error(s"time parse err : $timeStr", e.getMessage)
        }
      }
      val oil = splits(2).toInt
      val id = splits(3)
      (deviceNo, time, oil, id)
    }).filter(_._3 > 10)
      .map(x => {
        (x._1, x._2, x._3, x._4)
      })
  // 2.连接MySQL做了connect操作,补充MySQL中的配置信息
val mysqlData = env.addSource(new MySQLSource("select device_no,tank_vol,k,b from device_config"))
    val connectedStream = logData.connect(mysqlData)
    val coMapDataStream = connectedStream.flatMap(new CoFlatMapFunction[
      (String, Long, Int, String),
      mutable.HashMap[String, (Float, Float, Float)],
      (String, Long, Int, String, Float, Float, Float)] {

      var deviceConfig = mutable.HashMap[String, (Float, Float, Float)]()

      // oilData
      override def flatMap1(value: (String, Long, Int, String),
                            out: Collector[(String, Long, Int, String, Float, Float, Float)]): Unit = {
        val config = deviceConfig.getOrElse(value._1, (0.0f, 0.0f, 0.0f))
        out.collect((value._1, value._2, value._3, value._4, config._1, config._2, config._3))
      }

      // MySQL
      override def flatMap2(value: mutable.HashMap[String, (Float, Float, Float)],
                            out: Collector[(String, Long, Int, String, Float, Float, Float)]): Unit = {
        deviceConfig = value
      }
    })
    // 整理后的数据,此处打印
// // (13820323689,1590845607000,128,39,450.0,0.9932432,-29.121622)

  val resultData = coMapDataStream
       .assignAscendingTimestamps(_._2)
          .keyBy(0)
          .timeWindow(Time.seconds(60 * 5), Time.seconds(60))
          .apply(new WindowFunction[(String, Long, Int, String, Float, Float, Float), ArrayBuffer[(String, String, Float)], Tuple, TimeWindow] {
            override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long, Int, String, Float, Float, Float)], out: Collector[ArrayBuffer[(String, String, Float)]]): Unit = {
              val deviceNo = key.getField(0).toString
              val max = input.iterator.maxBy(_._3)
              val min = input.iterator.minBy(_._3)
              val sumOil = input.iterator.foldLeft(0)(_ + _._3)
              val avg = sumOil / input.iterator.size
              // 定义准备收集的数据
              val dataArray = ArrayBuffer[(String, String, Float)]()
              var addOil = false
              // 如果偏差值在某大于某一范围初步判定为加油
              if ((max._3 - min._3) > 35) {
                addOil = true
                System.out.println("========识别加油点=========" + addOil + "============" + max + "====" + min + "===")
              } else {
                System.out.print(".")
              }

              val iterator = input.iterator
              // 记录时间 求平均时提取一个最小的时间,滑动窗口 每次往后滑动一两个数据
              val times = ArrayBuffer[Long]()
              var str = new mutable.StringBuilder()
              var k = 0.0f
              var b = 0.0f
              while (iterator.hasNext) {
                val next = iterator.next()
                val time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(next._2))
                // 判断是否加油,有加油最大最小值保留 无加油启用
                if (addOil) {

                  dataArray.append((next._1, time, next._3 * next._6 + next._7))
                } else {
                  if (next._4 != min._4 || next._4 != max._4) {
                    dataArray.append((next._1, time, next._3 * next._6 + next._7))
                  }
                }
                times.append(next._2)
                str.append(time + "," + next._3 + ";")
                k = next._6
                b = next._7
              }
              val time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(times.min))
              // 没加油只返回一个平均值
              if (!addOil) {
                dataArray.clear()
                dataArray.append((deviceNo, time, avg * k + b))
              }
              //          println(str.toString())
              // 转换输出数据
              /**
               * 第一个参数:设备号
               * 第二个参数:时间
               * 第三个参数:油位传感器采集值
               */
              out.collect(dataArray)
            }
          })
        resultData.print().setParallelism(1)
写回答

1回答

Michael_PK

2020-07-08

这个逻辑不清楚。  你打印出来那个地方有数据,我看你代码有这个assignAscendingTimestamps,理论上应该带水印的,那么你是否设置了根据eventtime进行处理了呢? 然后你map后出来的数据是否会触发window的执行呢?这个需要你debug去调试下了

1
2
Michael_PK
回复
慕粉2110073833
功能实现可以采用不同的方式,你这个方式不行也可以换其他方式实现。以现在这方式,你还是得断点debug进去,明确到到底是哪里的问题导致没执行。看代码很难看出啥,一定要借助于debug
2020-07-08
共2条回复

新一代大数据计算引擎 Flink从入门到实战

入行或转型大数据新姿势,多语言系统化讲解,极速入门Flink

969 学习 · 296 问题

查看课程