Scala 实现 Custom Sink to MySQL

来源:5-14 自定义Sink之功能测试

慕标4491493

2020-02-03

作业运行成功。请老师指正!

package com.imooc.flink.course05

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
/**

  • Scala实现CustomSinkToMySQLApp
    */
    object CustomSinkToMySQLApp {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

val source = env.socketTextStream("localhost", 7777)

val studentStream = source.map(new MapFunction[String, Student] {
  override def map(value: String): Student = {
    val splits = value.split(",")

    val stu = new Student

    stu.setId(splits(0).toInt)
    stu.setName(splits(1))
    stu.setAge(splits(2).toInt)

    stu

  }
})

studentStream.addSink(new SinkToMySQL)

env.execute("JavaCustomSinkToMySQL")

}

}

写回答

1回答

Michael_PK

2020-02-03

sinktomysql代码看看

0
1
慕标4491493
sinktomysql是照用的老师的代码
2020-02-04
共1条回复

新一代大数据计算引擎 Flink从入门到实战

入行或转型大数据新姿势,多语言系统化讲解,极速入门Flink

969 学习 · 296 问题

查看课程