作业:Scala实现MySQL自定义Sink

来源:5-14 自定义Sink之功能测试

慕移动6222658

2020-03-12

Student类

case class Student(id: Int, name: String, age: Int)

自定义SinkToMySQL类

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}

class SinkToMySQL extends RichSinkFunction[Student] {

  var connect: Connection = null
  var pstm: PreparedStatement = null

  def getConnection = {

    Class.forName("com.mysql.jdbc.Driver")
    val url = "jdbc:mysql://localhost:3306/imooc?useSSL=false"

    connect = DriverManager.getConnection(url, "root", "root")
    connect
  }

  /**
    * 在open 方法中创建连接
    *
    * @param parameters
    */
  override def open(parameters: Configuration): Unit = {

    val conn = getConnection
    val sql = "insert into student(id,name,age) values (?,?,?) ON DUPLICATE KEY UPDATE name=?,age=?"

    pstm = conn.prepareStatement(sql)
    println("open~~~~~~~")
  }

  /**
    * 每条记录插入时调用一次
    *
    * @param value
    * @param context
    */
  override def invoke(value: Student, context: SinkFunction.Context[_]): Unit = {
    
    pstm.setInt(1, value.id)
    pstm.setString(2, value.name)
    pstm.setInt(3, value.age)
    pstm.setString(4, value.name)
    pstm.setInt(5, value.age)

    pstm.executeUpdate()
    println("invoke~~~~~~~~~~~")
  }

  /**
    * 关闭资源
    */
  override def close(): Unit = {
    
    if (pstm != null) {
      pstm.close()
    }
    if (connect != null) {
      connect.close()
    }
  }
}

执行类CustomSinkToMySQL

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

object CustomSinkToMySQL {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val data = env.socketTextStream("localhost",9999)

    val ds: DataStream[Student] = data.map(x => {
      val strings = x.split(",")
      Student(strings(0).toInt, strings(1), strings(2).toInt)
    })

    ds.addSink(new SinkToMySQL)
      //.setParallelism(1)
    env.execute("CustomSinkToMySQL")

  }
}

交作业啦,请老师过目,open 方法打印多次,应该是cpu线程数导致的吧,老师上课时应该是口误了,说成cpu核数是8核才打印8次,但是我电脑也是8核16线程的,打印了16次

还有就是为什么设置并行度在最后面,才会影响到open方法的调用次数呢?还希望老师有时间帮忙解答一下,非常感谢老师!

写回答

1回答

Michael_PK

2020-03-12

设置的并行度后面有就以后面的为准。我的是8core是八次,你的8core16线程是什么地方能看出来?这个我真没注意到

0
3
慕移动6222658
回复
Michael_PK
好的??,感谢老师
2020-03-12
共3条回复

新一代大数据计算引擎 Flink从入门到实战

入行或转型大数据新姿势,多语言系统化讲解,极速入门Flink

969 学习 · 296 问题

查看课程