Scala 实现 Custom Sink to MySQL
来源:5-14 自定义Sink之功能测试
慕标4491493
2020-02-03
作业运行成功。请老师指正!
package com.imooc.flink.course05
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
/**
- Scala实现CustomSinkToMySQLApp
*/
object CustomSinkToMySQLApp {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("localhost", 7777)
val studentStream = source.map(new MapFunction[String, Student] {
override def map(value: String): Student = {
val splits = value.split(",")
val stu = new Student
stu.setId(splits(0).toInt)
stu.setName(splits(1))
stu.setAge(splits(2).toInt)
stu
}
})
studentStream.addSink(new SinkToMySQL)
env.execute("JavaCustomSinkToMySQL")
}
}
写回答
1回答
-
Michael_PK
2020-02-03
sinktomysql代码看看
012020-02-04
相似问题
dbcp2 全局 连接池
回答 3
自定义sink在哪里
回答 1