调用2次foreach方法分别保存到redis的2个key时,只有第2次保存成功

来源:9-9 调优及作业

qq_梦也_1

2020-10-29

pk老师您好,调用2次foreach方法,分别保存offsets和统计结果表到redis时,第1次foreach并没有将结果保存的到redis中,编译和运行时都不报错。是不是方法用的方式不正确?还是foreach不支持同时保存2张表的操作,必须用foreachbatch来实现?
代码如下:

object SSSOffsetsApp {

def main(args: Array[String]): Unit = {
Logger.getLogger(this.getClass.getSimpleName).setLevel(Level.ERROR)
val spark = SparkSession.builder()
.master(“local[4]”)
.appName(this.getClass.getName)
.config(“spark.sql.shuffle.partitions”,“10”)
.getOrCreate()

	val topic = "access-topic-prod"
val groupid = ""
val startingOffsets: String = RedisUtils.getJsonOffsets(topic, groupid)


import spark.implicits._
val frame: DataFrame = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "MacBookAir.local:9092,MacBookAir.local:9093,MacBookAir.local:9094")
  .option("subscribe", "access-topic-prod")
  .option("startingOffsets", startingOffsets)
  .load()
  
  storeOffsets(frame, spark)
  save2DB(frame, spark)

}

/**

  • 保存offsets
  • @param dataFrame
  • @param spark
    */
    def storeOffsets(dataFrame:DataFrame,spark:SparkSession){
    import spark.implicits._
    dataFrame.selectExpr(“topic”,“partition”,“offset”)//注释掉该行,获取包含offset的全部信息,根据已有offset,合理设置startingOffsets中的值。
    .as[(String,Int,Long)]
    .map(x => {
    val topic = x._1
    val partition = x._2
    val offset = x._3
    //topic,partition,offset
    (topic,partition,offset)
    })
    .toDF(“topic”,“partition”,“offset”)
    .writeStream
    /*输出到redis/
    .outputMode(OutputMode.Update())
    .foreach(new RedisOffsetsForeachWriter())
    .start()
    .awaitTermination()

}

/**

  • 保存统计结果
  • @param dataFrame
  • @param spark
    */
    def save2DB(dataFrame:DataFrame,spark:SparkSession){
    import spark.implicits._
    dataFrame.selectExpr(“cast (value as String)”)
    .as[String]
    .map(x => {
    val splits = x.split("\t")
    val time = splits(0)
    val ip = splits(2)
    //eventTime,日期,省份
    (DateUtils.parseToTimestamp(time),DateUtils.parseToDay(time),IPUtils.parseIP(ip))
    })
    .toDF(“ts”,“day”,“province”)
    .withWatermark(“ts” ,“10 minutes”)
    .groupBy(“day”,“province”)
    .count()
    .writeStream
    /*输出到redis/
    .outputMode(OutputMode.Update())
    .foreach(new RedisDBForeachWriter())
    .start()
    .awaitTermination()
    }
    }

class RedisOffsetsForeachWriter extends ForeachWriter[Row] {
var client:Jedis = _
override def open(partitionId: Long, epochId: Long): Boolean = {
println(s"打开connection:partitionId,partitionId,partitionId,epochId")
client = RedisUtils.getJedisClient()
client != null
}
override def process(value: Row): Unit = {
//从Row中获取offsets,然后保存到redis中
val topic = value.getAsString
val partition = value.getAsInt
val offset = value.getAsLong
client.hset(topic,partition+"",offset+"")
println(s"topic:topic,partition:topic,partition:topic,partition:partition,offset:$offset")
}
override def close(errorOrNull: Throwable): Unit = {
if(null != client){
RedisUtils.returnResource(client)
}
}
}

class RedisDBForeachWriter extends ForeachWriter[Row] {
var client:Jedis = _
override def open(partitionId: Long, epochId: Long): Boolean = {
client = RedisUtils.getJedisClient()
client != null
}
override def process(value: Row): Unit = {
val day = value.getString(0)
val province = value.getString(1)
val cnts = value.getLong(2)
client.hset(“day-province-cnts-”+day, province,cnts+"")
println(s"day:day,province:day,province:day,province:province,cnts:$cnts")
}
override def close(errorOrNull: Throwable): Unit = {
if(null != client){
RedisUtils.returnResource(client)
}
}
}

写回答

1回答

Michael_PK

2020-10-29

第一次报什么错?Redis是覆盖的,第二次会覆盖前面的吧

0
8
qq_梦也_1
回复
Michael_PK
看官方文档,强调foreachbatch能sink到多处,误以为foreach做不到呢,还得多动手求证啊。
2020-10-29
共8条回复

Spark3实时处理-Streaming+StructuredStreaming实战

实战Spark3实时处理,掌握两套企业级处理方案

340 学习 · 238 问题

查看课程