scala 自定义sink
来源:5-14 自定义Sink之功能测试
chang_bryant
2020-02-08
简单修改了下需求
- 通过自定义source函数生成学生姓名,年龄字符串
- 处理时只存储18–25岁的数据,其余作为脏数据进行过滤
自定义source
class ScalaStudentSource extends SourceFunction[String] {
var cur = 0
val studentStr = "Student"
var isRunning = true
var student:String = _
val ageGenerator = new Random()
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
while(isRunning) {
student = s"$studentStr-$cur,${ageGenerator.nextInt(25)}"
ctx.collect(student)
cur += 1
Thread.sleep(TimeUnit.SECONDS.toMillis(1))
}
}
override def cancel(): Unit = {
isRunning = false
}
}
自定义Sink
class ScalaSinkToMysql extends RichSinkFunction[Student] {
var conn: sql.Connection = null
var pstmt: PreparedStatement = null
override def open(parameters: Configuration): Unit = {
conn = ScalaDBUtils.getConnection()
val sql = "insert into student(name,age) values (?,?)"
pstmt = conn.prepareStatement(sql)
}
override def close(): Unit = {
super.close()
ScalaDBUtils.closeStat(pstmt)
ScalaDBUtils.closeConn(conn)
}
override def invoke(value: Student): Unit = {
pstmt.setString(1, value.getName)
pstmt.setInt(2, value.getAge)
println(s"save record $value")
pstmt.executeUpdate
}
}
主函数
object ScalaCustomSinkApp {
def main(args: Array[String]): Unit = {
val env = StreamUtils.getStreamEnv
val ds = env.addSource(new ScalaStudentSource)
val studentStream = ds.map(x => {
println(s"received record $x")
val splits = x.split(",")
val record:Student = new Student
record.setName(splits(0))
record.setAge(splits(1).toInt)
record
}).filter(record => {
if(record.getAge < 18 || record.getAge > 25) {
println("年龄范围不符[18-25],过滤")
false
}else {
true
}
})
studentStream.addSink(new ScalaSinkToMysql).setParallelism(1)
env.execute("ScalaCustomSinkApp")
}
}
写回答
1回答
-
Michael_PK
2020-02-08
思路不错,扩展下挺好
00
相似问题