scala 自定义sink

来源:5-14 自定义Sink之功能测试

chang_bryant

2020-02-08

简单修改了下需求

  1. 通过自定义source函数生成学生姓名,年龄字符串
  2. 处理时只存储18–25岁的数据,其余作为脏数据进行过滤

自定义source

class ScalaStudentSource extends SourceFunction[String] {

  var cur = 0
  val studentStr = "Student"
  var isRunning = true
  var student:String = _
  val ageGenerator = new Random()
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    while(isRunning) {
      student = s"$studentStr-$cur,${ageGenerator.nextInt(25)}"
      ctx.collect(student)
      cur += 1
      Thread.sleep(TimeUnit.SECONDS.toMillis(1))
    }
  }
  override def cancel(): Unit = {
    isRunning = false
  }
}

自定义Sink

class ScalaSinkToMysql extends RichSinkFunction[Student] {
  var conn: sql.Connection = null
  var pstmt: PreparedStatement = null

  override def open(parameters: Configuration): Unit = {
    conn = ScalaDBUtils.getConnection()
    val sql = "insert into student(name,age) values (?,?)"
    pstmt = conn.prepareStatement(sql)
  }
  override def close(): Unit = {
    super.close()
    ScalaDBUtils.closeStat(pstmt)
    ScalaDBUtils.closeConn(conn)
  }
  override def invoke(value: Student): Unit = {
    pstmt.setString(1, value.getName)
    pstmt.setInt(2, value.getAge)
    println(s"save record $value")
    pstmt.executeUpdate
  }
}

主函数

object ScalaCustomSinkApp {
  def main(args: Array[String]): Unit = {
    val env = StreamUtils.getStreamEnv
    val ds = env.addSource(new ScalaStudentSource)

    val studentStream = ds.map(x => {
      println(s"received record $x")
      val splits = x.split(",")
      val record:Student = new Student
      record.setName(splits(0))
      record.setAge(splits(1).toInt)
      record
    }).filter(record => {
        if(record.getAge < 18 || record.getAge > 25) {
          println("年龄范围不符[18-25],过滤")
          false
        }else {
          true
        }
    })

    studentStream.addSink(new ScalaSinkToMysql).setParallelism(1)
    env.execute("ScalaCustomSinkApp")
  }
}
写回答

1回答

Michael_PK

2020-02-08

思路不错,扩展下挺好

0
0

新一代大数据计算引擎 Flink从入门到实战

入行或转型大数据新姿势,多语言系统化讲解,极速入门Flink

969 学习 · 296 问题

查看课程