作业:Scala实现MySQL自定义Sink
来源:5-14 自定义Sink之功能测试
慕移动6222658
2020-03-12
Student类
case class Student(id: Int, name: String, age: Int)
自定义SinkToMySQL类
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
class SinkToMySQL extends RichSinkFunction[Student] {
var connect: Connection = null
var pstm: PreparedStatement = null
def getConnection = {
Class.forName("com.mysql.jdbc.Driver")
val url = "jdbc:mysql://localhost:3306/imooc?useSSL=false"
connect = DriverManager.getConnection(url, "root", "root")
connect
}
/**
* 在open 方法中创建连接
*
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
val conn = getConnection
val sql = "insert into student(id,name,age) values (?,?,?) ON DUPLICATE KEY UPDATE name=?,age=?"
pstm = conn.prepareStatement(sql)
println("open~~~~~~~")
}
/**
* 每条记录插入时调用一次
*
* @param value
* @param context
*/
override def invoke(value: Student, context: SinkFunction.Context[_]): Unit = {
pstm.setInt(1, value.id)
pstm.setString(2, value.name)
pstm.setInt(3, value.age)
pstm.setString(4, value.name)
pstm.setInt(5, value.age)
pstm.executeUpdate()
println("invoke~~~~~~~~~~~")
}
/**
* 关闭资源
*/
override def close(): Unit = {
if (pstm != null) {
pstm.close()
}
if (connect != null) {
connect.close()
}
}
}
执行类CustomSinkToMySQL
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
object CustomSinkToMySQL {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = env.socketTextStream("localhost",9999)
val ds: DataStream[Student] = data.map(x => {
val strings = x.split(",")
Student(strings(0).toInt, strings(1), strings(2).toInt)
})
ds.addSink(new SinkToMySQL)
//.setParallelism(1)
env.execute("CustomSinkToMySQL")
}
}
交作业啦,请老师过目,open 方法打印多次,应该是cpu线程数导致的吧,老师上课时应该是口误了,说成cpu核数是8核才打印8次,但是我电脑也是8核16线程的,打印了16次
还有就是为什么设置并行度在最后面,才会影响到open方法的调用次数呢?还希望老师有时间帮忙解答一下,非常感谢老师!
写回答
1回答
-
设置的并行度后面有就以后面的为准。我的是8core是八次,你的8core16线程是什么地方能看出来?这个我真没注意到
032020-03-12
相似问题